"""
HTTP-based MCP Server for Athena Analytics.
Security architecture:
- Runs in isolated Docker container
- Exposes SSE endpoint for MCP protocol
- Bearer token authentication
- No credentials exposed to Claude container
"""
import logging
import os
import sys
from typing import Any
import uvicorn
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route
try:
from mcp import types
from mcp.server import Server
from mcp.server.sse import SseServerTransport
except ImportError:
logging.error('MCP SDK not found. Install with: pip install mcp')
sys.exit(1)
from config import MCP_ROOT
# Load environment variables
from lib.confutils import load_env # noqa: E402
load_env(
env_path=MCP_ROOT / '.env',
local_env_path=MCP_ROOT / '.env.local',
)
from athena_tools.registry import ( # noqa: E402
TOOL_REGISTRY,
TOOL_REGISTRY_DICT,
)
from lib import jsonutils # noqa: E402
from lib import loggerutils # noqa: E402
from logging_setup import setup_mcp_logging # noqa: E402
setup_mcp_logging(MCP_ROOT.parent / 'tmp')
logger = loggerutils.getLogger('analytics')
# Bearer token from environment (required)
AUTH_TOKEN = os.environ.get('MCP_AUTH_TOKEN')
if not AUTH_TOKEN:
logger.error('MCP_AUTH_TOKEN environment variable not set')
sys.exit(1)
server = Server('athena-analytics')
@server.list_tools()
async def list_tools() -> list[types.Tool]:
"""List all available MCP tools."""
return [
types.Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.input_schema,
)
for tool in TOOL_REGISTRY
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.TextContent]:
"""
Handle tool calls from MCP client (Claude).
Routes tool calls to appropriate handler functions from tools.py.
Catches exceptions and returns error messages in MCP format.
Args:
name: Tool name (e.g. 'list_templates')
arguments: Tool arguments (validated against inputSchema)
Returns:
List of TextContent with tool results or error messages
"""
logger.info('Tool called', tool=name, arguments=arguments)
try:
tool_func = TOOL_REGISTRY_DICT.get(name)
if not tool_func:
return [
types.TextContent(
type='text',
text=(
f'Error: Unknown tool \'{name}\'. '
f'Available tools: {list(TOOL_REGISTRY_DICT.keys())}'
),
)
]
result = tool_func(**arguments)
result_json = jsonutils.dumps(result, indent=2, default=str)
return [types.TextContent(type='text', text=result_json)]
except PermissionError as e:
logger.error(
f'Permission denied in tool \'{name}\'',
data={'tool': name, 'arguments': arguments, 'error': str(e)},
exc_info=True,
)
return [
types.TextContent(
type='text',
text=f'Permission denied: {e}\n\nThis operation is forbidden for security reasons.',
)
]
except ValueError as e:
logger.error(
f'Invalid input in tool \'{name}\'',
data={'tool': name, 'arguments': arguments, 'error': str(e)},
exc_info=True,
)
return [
types.TextContent(
type='text',
text=f'Invalid input: {e}\n\nCheck your parameters and try again.',
)
]
except Exception as e:
logger.error(
f'Unexpected error in tool \'{name}\'',
data={'tool': name, 'arguments': arguments, 'error': str(e)},
exc_info=True,
)
return [
types.TextContent(
type='text',
text=f'Error executing tool: {e}\n\nCheck server logs for details.',
)
]
def authenticate(request: Request) -> bool:
"""
Verify Bearer token authentication.
Args:
request: Starlette request object
Returns:
True if authenticated, False otherwise
"""
auth_header = request.headers.get('Authorization', '')
expected = f'Bearer {AUTH_TOKEN}'
if auth_header != expected:
logger.warning(
'Authentication failed',
data={
'remote_addr': request.client.host if request.client else 'unknown',
'path': request.url.path,
},
)
return False
return True
async def handle_sse(request: Request) -> Response:
"""
Handle SSE connection for MCP protocol.
This endpoint handles the Server-Sent Events transport for MCP.
Claude Code CLI connects here and sends MCP messages.
Args:
request: Starlette request with SSE connection
Returns:
SSE response stream
"""
# Authentication check
if not authenticate(request):
return Response('Unauthorized', status_code=401)
logger.info(
'SSE connection established',
data={
'remote_addr': request.client.host if request.client else 'unknown',
'user_agent': request.headers.get('user-agent', 'unknown'),
},
)
# Create SSE transport
sse = SseServerTransport('/messages')
try:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as streams:
await server.run(
streams[0],
streams[1],
server.create_initialization_options(),
)
except Exception:
logger.error('Error in SSE handler', exc_info=True)
raise
return Response()
async def handle_messages(request: Request) -> Response:
"""
Handle POST requests to /messages endpoint.
This is used by SSE transport for client-to-server messages.
Args:
request: Starlette request with message payload
Returns:
Response indicating message was received
"""
# Authentication check
if not authenticate(request):
return Response('Unauthorized', status_code=401)
# SSE transport handles the message internally
return Response()
async def handle_health(request: Request) -> Response:
"""Health check endpoint."""
return Response(
jsonutils.dumps({'status': 'healthy', 'service': 'mcp-athena-analytics'}),
media_type='application/json',
)
# CORS configuration from environment
ALLOWED_ORIGINS = os.environ.get('MCP_ALLOWED_ORIGINS', 'http://localhost:*').split(',')
# Starlette app with CORS middleware
app = Starlette(
debug=False,
routes=[
Route('/sse', handle_sse),
Route('/messages', handle_messages, methods=['POST']),
Route('/health', handle_health),
],
middleware=[
Middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_methods=['GET', 'POST'],
allow_headers=['Authorization', 'Content-Type'],
)
],
)
if __name__ == '__main__':
host = os.environ.get('MCP_HOST', '0.0.0.0') # nosec B104
port = int(os.environ.get('MCP_PORT', '8000'))
logger.info(
'Starting HTTP-based MCP Athena Analytics server',
data={
'mcp_root': str(MCP_ROOT),
'transport': 'SSE',
'host': host,
'port': port,
},
)
uvicorn.run(app, host=host, port=port, log_level='info')