"""MCP server implementation for OpManager - Credential-free mode.
This module provides the core MCP server that handles tool registration
and execution for OpManager operations.
Example:
>>> from opmanager_mcp.config import load_config
>>> from opmanager_mcp.server import OpManagerMCPServer
>>>
>>> config = load_config()
>>> server = OpManagerMCPServer(config)
>>> await server.initialize()
>>> print(f"Server ready with {len(server.tools)} tools")
"""
from __future__ import annotations
import json
from typing import Any
import mcp.types as types
from mcp.server.lowlevel import Server
from .api_client import OpManagerAPIClient
from .config import Config
from .exceptions import (
InvalidToolArgumentsError,
OpenAPILoadError,
OpManagerAPIError,
ToolNotFoundError,
)
from .logging_config import get_logger
from .tool_generator import ToolGenerator, load_openapi_spec
logger = get_logger(__name__)
# Connection parameters that are handled separately (not passed to OpManager API)
CONNECTION_PARAMS: frozenset[str] = frozenset(
{
"host",
"apiKey",
"api_key",
"port",
"use_ssl",
"verify_ssl",
}
)
def detect_protocol_and_port(
port: int | None = None,
use_ssl: bool | None = None,
config_defaults: tuple[int, bool] = (8060, False)
) -> tuple[int, bool, str]:
"""Detect protocol and port based on inputs and OpManager conventions.
Args:
port: Explicitly provided port number.
use_ssl: Explicitly provided SSL preference.
config_defaults: Tuple of (default_port, default_use_ssl).
Returns:
Tuple of (port, use_ssl, protocol_string).
"""
default_port, default_use_ssl = config_defaults
# If SSL preference is explicit, use it
if use_ssl is not None:
final_use_ssl = use_ssl
# Set appropriate default port if none provided
if port is None:
final_port = 8061 if final_use_ssl else 8060
else:
final_port = port
# Warn about common misconfigurations
if use_ssl is False and port == 8061:
logger.warning(
f"SSL explicitly disabled (use_ssl=False) but using HTTPS port {port}. "
f"This may cause connection failures. Consider setting use_ssl=True or using port 8060."
)
elif use_ssl is True and port == 8060:
logger.warning(
f"SSL explicitly enabled (use_ssl=True) but using HTTP port {port}. "
f"This may cause connection failures. Consider setting use_ssl=False or using port 8061."
)
else:
# Auto-detect based on port
if port is not None:
# OpManager convention: 8060 = HTTP, 8061 = HTTPS
final_use_ssl = port == 8061
final_port = port
logger.debug(
f"Auto-detected SSL setting based on port: use_ssl={final_use_ssl} for port {port}"
)
else:
# Use config defaults
final_use_ssl = default_use_ssl
final_port = default_port
protocol = "https" if final_use_ssl else "http"
return final_port, final_use_ssl, protocol
class OpManagerMCPServer:
"""OpManager MCP Server with credential-free initialization.
This server loads tool definitions from an OpenAPI spec at startup
but doesn't connect to any OpManager system. Authentication
happens per-request when tools are called.
Attributes:
config: Server configuration.
server: Underlying MCP server instance.
tools: List of generated tool definitions.
"""
def __init__(self, config: Config) -> None:
"""Initialize OpManager MCP Server.
Args:
config: Server configuration object.
"""
self.config = config
self.server = Server("opmanager-mcp-server")
self.tools: list[dict[str, Any]] = []
self.tool_generator: ToolGenerator | None = None
self._initialized = False
# Register handlers
self._setup_handlers()
def _validate_connection_params(
self, host: str, port: int, use_ssl: bool, protocol: str
) -> str:
"""Validate connection parameters and return full URL.
Args:
host: Host address.
port: Port number.
use_ssl: SSL usage flag.
protocol: Protocol string.
Returns:
Validated full URL.
Raises:
ValueError: If parameters are invalid.
"""
if not host or host.strip() == "":
raise ValueError("Host cannot be empty")
if not (1 <= port <= 65535):
raise ValueError(f"Port {port} is out of valid range (1-65535)")
full_url = f"{protocol}://{host}:{port}"
logger.debug(
f"Validated connection: {full_url}",
extra={"host": host, "port": port, "use_ssl": use_ssl, "protocol": protocol}
)
return full_url
@property
def is_initialized(self) -> bool:
"""Check if the server has been initialized.
Returns:
True if initialized, False otherwise.
"""
return self._initialized
async def initialize(self) -> None:
"""Initialize server by loading OpenAPI spec and generating tools.
This method loads the OpenAPI specification and generates MCP tools.
It does NOT connect to OpManager - authentication happens per-request.
Raises:
OpenAPILoadError: If the OpenAPI spec cannot be loaded.
ConfigurationError: If required configuration is missing.
"""
if self._initialized:
logger.debug("Server already initialized, skipping")
return
logger.info("Initializing MCP server (credential-free mode)")
# Load OpenAPI spec from local file
spec_path = self.config.opmanager.local_spec_path
if not spec_path:
raise OpenAPILoadError(
"unknown",
message="LOCAL_OPENAPI_SPEC_PATH not configured",
)
logger.info(f"Loading OpenAPI spec from {spec_path}")
try:
spec = load_openapi_spec(spec_path)
except Exception as e:
raise OpenAPILoadError(spec_path, e) from e
# Generate tools (only for configured HTTP methods)
logger.info("Generating MCP tools from OpenAPI spec")
allowed_methods = self.config.server.allowed_http_methods
self.tool_generator = ToolGenerator(spec, allowed_methods=allowed_methods)
self.tools = self.tool_generator.generate_tools()
self._initialized = True
logger.info(
"MCP server initialized successfully",
extra={
"mode": "credential-free",
"tool_count": len(self.tools),
},
)
def _setup_handlers(self) -> None:
"""Set up MCP protocol handlers."""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List all available tools.
Returns:
List of MCP tool definitions.
"""
logger.debug(f"Listing {len(self.tools)} tools")
return [
types.Tool(
name=tool["name"],
description=tool["description"],
inputSchema=tool["inputSchema"],
)
for tool in self.tools
]
@self.server.call_tool()
async def handle_call_tool(
name: str,
arguments: dict[str, Any] | None,
) -> types.CallToolResult:
"""Execute a tool with per-request credentials.
Per MCP spec, tool execution errors are returned with isError=True
rather than raised as exceptions. This allows the LLM to understand
the error and potentially self-correct.
Args:
name: Tool name.
arguments: Tool arguments including host, apiKey.
Returns:
CallToolResult with content and isError flag.
Note:
Protocol errors (invalid params, unknown tool) are still raised
as exceptions per MCP spec. Tool execution errors return isError=True.
"""
return await self._execute_tool(name, arguments)
async def _execute_tool(
self,
name: str,
arguments: dict[str, Any] | None,
) -> types.CallToolResult:
"""Execute a tool with the given arguments.
Per MCP spec, this method distinguishes between:
- Protocol errors (invalid params, unknown tool): Raised as exceptions
- Tool execution errors (API failures): Returned with isError=True
Args:
name: Tool name.
arguments: Tool arguments.
Returns:
CallToolResult with content and isError flag.
Raises:
InvalidToolArgumentsError: If required arguments are missing (protocol error).
ToolNotFoundError: If the tool doesn't exist (protocol error).
"""
if not arguments:
raise InvalidToolArgumentsError(
name,
missing_args=["host", "apiKey"],
)
# Log received arguments for debugging
logger.debug(
f"Tool call: {name}",
extra={
"arguments": arguments,
"arg_types": {k: type(v).__name__ for k, v in arguments.items()},
},
)
# Extract and validate credentials
host = arguments.get("host")
api_key = arguments.get("apiKey") or arguments.get("api_key")
provided_port = arguments.get("port", self.config.opmanager.port)
provided_use_ssl = arguments.get("use_ssl")
# Dynamic protocol and port detection
port, use_ssl, protocol = detect_protocol_and_port(
port=provided_port,
use_ssl=provided_use_ssl,
config_defaults=(self.config.opmanager.port, self.config.opmanager.use_https)
)
verify_ssl = arguments.get("verify_ssl", self.config.opmanager.tls_verify)
# Validate connection parameters and build full URL
full_url = ""
if host: # Only validate if host is provided
try:
full_url = self._validate_connection_params(host, port, use_ssl, protocol)
except ValueError as e:
raise InvalidToolArgumentsError(
name,
missing_args=[],
message=f"Invalid connection parameters: {e}",
)
missing_creds = []
if not host:
missing_creds.append("host")
if not api_key:
missing_creds.append("apiKey")
if missing_creds:
raise InvalidToolArgumentsError(
name,
missing_args=missing_creds,
message="Missing required credentials",
)
# Find the tool definition
tool = next((t for t in self.tools if t["name"] == name), None)
if not tool:
raise ToolNotFoundError(name)
# Get API path for tool
path = tool.get("_path") or self._get_path_for_tool(name)
if not path:
raise ToolNotFoundError(
name,
message=f"Could not determine API path for tool: {name}",
)
# Get HTTP method for tool
method = tool.get("_method", "get").upper()
# Build API parameters using whitelist from tool schema
api_params = self._build_api_params(arguments, tool)
logger.info(
f"Executing tool: {name}",
extra={
"host": host,
"port": port,
"protocol": protocol,
"full_url": full_url,
"path": path,
"method": method,
"param_count": len(api_params),
"use_ssl": use_ssl,
"verify_ssl": verify_ssl,
},
)
try:
# Create API client with per-request credentials and dynamic URL
logger.debug(
f"Initializing API client: {full_url}",
extra={"use_ssl": use_ssl, "verify_ssl": verify_ssl},
)
async with OpManagerAPIClient(
host=str(host),
api_key=str(api_key),
port=int(port),
use_https=bool(use_ssl),
tls_verify=bool(verify_ssl),
timeout=self.config.server.request_timeout // 1000,
max_retries=self.config.server.max_retries,
) as client:
# Log the constructed base URL for verification
logger.debug(f"API client base URL: {client.base_url}")
# Execute the API call
result = await client.execute_operation(
path=path,
method=method,
params=api_params if api_params else None,
)
logger.info(
f"Successfully executed tool: {name}",
extra={"response_type": type(result).__name__},
)
# Return success result per MCP spec
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=json.dumps(result, indent=2),
)
],
isError=False,
)
except OpManagerAPIError as e:
# Per MCP spec: Tool execution errors return isError=True
logger.error(
f"API error executing tool {name}: {e}",
extra={"tool": name, "error_type": type(e).__name__},
)
error_details: dict[str, Any] = {
"error": type(e).__name__,
"message": str(e),
"tool": name,
}
if hasattr(e, "status_code") and e.status_code:
error_details["status_code"] = e.status_code
if hasattr(e, "details") and e.details:
error_details["details"] = e.details
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=json.dumps(error_details, indent=2),
)
],
isError=True,
)
except Exception as e:
# Per MCP spec: Unexpected errors also return isError=True
logger.error(
f"Unexpected error executing tool {name}: {e}",
extra={"tool": name, "error_type": type(e).__name__},
)
unexpected_error_details: dict[str, Any] = {
"error": type(e).__name__,
"message": str(e),
"tool": name,
}
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=json.dumps(unexpected_error_details, indent=2),
)
],
isError=True,
)
def _build_api_params(
self, arguments: dict[str, Any], tool: dict[str, Any]
) -> dict[str, Any]:
"""Build API parameters from tool arguments using whitelist approach.
Only includes parameters that are defined in the tool's OpenAPI schema,
excluding connection parameters. This ensures we only pass valid
OpManager API parameters and automatically filter out any n8n/MCP metadata.
Args:
arguments: Raw tool arguments.
tool: Tool definition with inputSchema.
Returns:
Cleaned parameters for the API call.
"""
api_params: dict[str, Any] = {}
# Get the valid parameter names from the tool's schema
schema = tool.get("inputSchema", {})
properties = schema.get("properties", {})
valid_params = set(properties.keys()) - CONNECTION_PARAMS
# Only include parameters that are defined in the schema and not connection params
for key, value in arguments.items():
if key in valid_params and value is not None:
api_params[key] = value
# Handle queryParams as a special case for additional parameters
query_params = arguments.get("queryParams", {})
if isinstance(query_params, dict):
for key, value in query_params.items():
if key in valid_params and value is not None:
api_params[key] = value
return api_params
def _get_path_for_tool(self, tool_name: str) -> str | None:
"""Get API path for a tool by matching against OpenAPI spec.
Args:
tool_name: Name of the tool.
Returns:
API path or None if not found.
"""
if not self.tool_generator:
return None
return self.tool_generator.get_path_for_tool(tool_name)