"""
Transport configuration and handling for MCP server.
"""
import os
import sys
import logging
import json
from typing import Dict, Any, Optional
logger = logging.getLogger('looker-mcp.server.transports')
def get_transport_mode() -> str:
"""Get the transport mode from environment or arguments."""
# Check environment variable first
env_mode = os.environ.get("MCP_MODE", "").lower()
if env_mode in ["stdio", "sse"]:
logger.info(f"Using transport mode from environment: {env_mode}")
return env_mode
# Check command line arguments
if "--transport" in sys.argv:
try:
mode_index = sys.argv.index("--transport") + 1
if mode_index < len(sys.argv):
cli_mode = sys.argv[mode_index].lower()
if cli_mode in ["stdio", "sse"]:
logger.info(f"Using transport mode from CLI: {cli_mode}")
return cli_mode
except Exception as e:
logger.error(f"Error parsing transport mode from CLI: {e}")
# Default to stdio
logger.info("Using default transport mode: stdio")
return "stdio"
async def run_stdio_mode(mcp_server):
"""Run the MCP server in stdio mode."""
logger.info("Starting MCP server in stdio mode (Cursor compatibility)")
# Always use the Cursor-compatible mode when stdio is selected
await run_cursor_stdio_mode(mcp_server)
async def run_cursor_stdio_mode(mcp_server):
"""Special handling for Cursor stdio mode."""
logger.info("Starting Cursor stdio compatibility mode")
tools = []
# Await the async function to get the list of tools
tool_definitions = await mcp_server.list_tools()
for tool_def in tool_definitions:
# Use parameter_schema instead of parameters
param_data = tool_def.parameter_schema if hasattr(tool_def, 'parameter_schema') else {}
param_schema = param_data.get("properties", {})
required_params = param_data.get("required", [])
tools.append({
"name": tool_def.name,
"description": tool_def.description,
"parameters": {
"type": "object",
"properties": param_schema,
"required": required_params
}
})
# Prepare the tools message
tools_message = {
"jsonrpc": "2.0",
"method": "mcp/register",
"params": {
"tools": tools
}
}
# Send the tools registration message
print(json.dumps(tools_message), flush=True)
logger.debug(f"Sent tools registration message with {len(tools)} tools")
# Now process stdin messages
for line in sys.stdin:
line = line.strip()
if not line: # Skip empty lines
continue
# Basic check if the line looks like JSON before parsing
if not line.startswith('{'):
logger.warning(f"Received non-JSON input line, skipping: {line[:100]}")
continue
try:
request = json.loads(line)
logger.debug(f"Received request: {json.dumps(request)}")
# Check if it's a valid RPC request
if "jsonrpc" not in request or request.get("jsonrpc") != "2.0":
logger.warning("Invalid jsonrpc version in request")
continue
if "method" not in request:
logger.warning("No method specified in request")
continue
if request["method"] != "mcp/tools":
logger.warning(f"Unsupported method: {request['method']}")
continue
if "id" not in request:
logger.warning("No request ID provided")
continue
# Process the actual tool call
params = request.get("params", {})
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
error_response = {
"jsonrpc": "2.0",
"id": request["id"],
"error": {
"code": -32602,
"message": "Invalid tool name"
}
}
print(json.dumps(error_response), flush=True)
continue
# Look up the tool and call it
logger.info(f"Tool call: {tool_name} with args: {json.dumps(arguments)}")
try:
# Find the tool in our registered tools - handle both tools and _tools attributes
tool = None
if hasattr(mcp_server, 'tools'):
tool = next((t for t in mcp_server.tools if t.name == tool_name), None)
elif hasattr(mcp_server, '_tools'):
tool = next((t for t in mcp_server._tools if t.name == tool_name), None)
if not tool:
error_response = {
"jsonrpc": "2.0",
"id": request["id"],
"error": {
"code": -32601,
"message": f"Tool '{tool_name}' not found"
}
}
print(json.dumps(error_response), flush=True)
continue
# Call the tool with the provided arguments
result = await tool.function(**arguments)
# Send back the result
response = {
"jsonrpc": "2.0",
"id": request["id"],
"result": result
}
print(json.dumps(response), flush=True)
except Exception as e:
logger.error(f"Error calling tool {tool_name}: {str(e)}")
error_response = {
"jsonrpc": "2.0",
"id": request["id"],
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
async def configure_sse_mode(mcp_server):
"""Configure the MCP server for SSE mode."""
logger.info("Configuring MCP server for SSE mode")
# Get the port from environment or default to 8000
port = int(os.environ.get("MCP_PORT", "8000"))
# Configure and run the SSE server
logger.info(f"Starting SSE server on port {port}")
try:
# Import only when needed to avoid dependency issues
from mcp.server.transport.sse import get_app
# Configure the app
app = get_app(mcp_server)
# Run the app using uvicorn
import uvicorn
logger.info(f"Starting uvicorn server on port {port}")
config = uvicorn.Config(app, host="0.0.0.0", port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
except ImportError:
logger.error("Failed to import SSE transport module. Make sure mcp[sse] is installed.")
sys.exit(1)
except Exception as e:
logger.error(f"Error starting SSE server: {str(e)}")
sys.exit(1)