"""HTTP server for MCP using Server API with streamable HTTP transport."""
import logging
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import Resource, Tool, TextContent
from src.resources import (
list_parties,
list_documents,
list_projects,
list_tags,
list_compliance_profiles,
list_party_types,
list_custom_fields,
list_reports,
list_webhooks,
list_branding,
read_resource,
)
from src.tools import get_tools, call_tool
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# Create MCP server instance (same as stdio server)
server = Server("trustlayer-mcp")
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
"""List all available resources."""
all_resources = []
# List each resource type independently, so errors in one don't block others
resource_functions = [
("parties", list_parties),
("documents", list_documents),
("projects", list_projects),
("tags", list_tags),
("compliance_profiles", list_compliance_profiles),
("party_types", list_party_types),
("custom_fields", list_custom_fields),
("reports", list_reports),
("webhooks", list_webhooks),
("branding", list_branding),
]
for name, func in resource_functions:
try:
resources = await func()
all_resources.extend(resources)
logger.info(f"Loaded {len(resources)} {name} resources")
except Exception as e:
logger.warning(f"Error loading {name} resources: {e}")
# Continue with other resources even if one fails
logger.info(f"Total resources loaded: {len(all_resources)}")
return all_resources
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a resource by URI."""
try:
return await read_resource(uri)
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
raise
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List all available tools."""
try:
tools = get_tools()
logger.info(f"Returning {len(tools)} tools to client")
return tools
except Exception as e:
logger.error(f"Error listing tools: {e}", exc_info=True)
return []
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Call a tool by name with arguments."""
try:
return await call_tool(name, arguments)
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
raise
# Create streamable HTTP ASGI app from the server
# Import here to avoid circular dependencies
from mcp.server.fastmcp.server import StreamableHTTPASGIApp
import contextlib
from src.config import api_token_context
class AuthHeaderWrapper:
"""Wrapper for StreamableHTTPASGIApp to extract Authorization header from scope."""
def __init__(self, app: StreamableHTTPASGIApp):
self.app = app
async def __call__(self, scope, receive, send):
# Extract Authorization header from ASGI scope
if scope["type"] == "http":
# Headers in ASGI are list of (name, value) tuples, both are bytes
headers = scope.get("headers", [])
auth_header = None
for name, value in headers:
# Headers are bytes in ASGI, decode for comparison
if name.lower() == b"authorization":
auth_header = value.decode("utf-8") if isinstance(value, bytes) else value
break
# Store in context variable for use in handlers
if auth_header:
api_token_context.set(auth_header)
else:
api_token_context.set(None)
# Call the wrapped app
await self.app(scope, receive, send)
session_manager = StreamableHTTPSessionManager(server)
streamable_http_app_base = StreamableHTTPASGIApp(session_manager)
streamable_http_app = AuthHeaderWrapper(streamable_http_app_base)
# Add custom health check endpoints
from starlette.routing import Route
from starlette.responses import JSONResponse
from starlette.applications import Starlette
async def root(request):
"""Health check endpoint."""
return JSONResponse({
"status": "ok",
"service": "TrustLayer MCP Server",
"version": "0.1.0",
"mcp_endpoint": "/mcp",
"transport": "streamable-http"
})
async def health(request):
"""Health check endpoint."""
return JSONResponse({"status": "healthy"})
@contextlib.asynccontextmanager
async def lifespan(app: Starlette):
"""Lifespan context manager to initialize and cleanup session manager."""
async with session_manager.run():
yield
# Create Starlette app with MCP endpoint and health checks
app = Starlette(
routes=[
Route("/", root, methods=["GET"]),
Route("/health", health, methods=["GET"]),
Route("/mcp", streamable_http_app, methods=["GET", "POST"]),
],
lifespan=lifespan,
)