"""
CKAN Open Data MCP Server
This is the main entry point for the CKAN Open Data MCP server.
It provides access to CKAN-powered open data portals through
the Model Context Protocol.
"""
from __future__ import annotations
import asyncio
import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
import uvicorn
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Mount
from starlette.types import ASGIApp, Receive, Scope, Send
from .ckan_tools import register_ckan_tools
from .config_selection import CKAN_CONFIG_SELECTION, CkanConfigCatalog
from .session_state import SessionConfigStore
class CKANMCPServer:
"""CKAN MCP Server implementation."""
def __init__(self) -> None:
"""Initialize the CKAN MCP server."""
self.server = Server("ckan-mcp", "1.0.0")
self.session_store = SessionConfigStore(self.server)
self.config_catalog = CkanConfigCatalog(CKAN_CONFIG_SELECTION)
self._http_app: ASGIApp | None = None
self._session_manager: StreamableHTTPSessionManager | None = None
self._initialized = False
async def init(self) -> None:
"""Initialize the server with CKAN tools."""
if self._initialized:
return
await register_ckan_tools(self.server, self.session_store, self.config_catalog)
# In the future, add more tool groups here (e.g., open511_tools)
self._initialized = True
def create_streamable_http_app(
self,
*,
mount_path: str = "/mcp",
allow_origins: Sequence[str] | None = None,
json_response: bool = False,
stateless: bool = False,
) -> ASGIApp:
"""Create a Starlette app that serves the Streamable HTTP transport."""
if not mount_path.startswith("/"):
raise ValueError("mount_path must start with '/'")
if self._http_app is not None:
return self._http_app
session_manager = StreamableHTTPSessionManager(
app=self.server,
json_response=json_response,
stateless=stateless,
)
self._session_manager = session_manager
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
await session_manager.handle_request(scope, receive, send)
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Manage MCP session lifecycle."""
if not self._initialized:
await self.init()
assert self._initialized
async with session_manager.run():
yield
starlette_app = Starlette(
routes=[Mount(mount_path, app=handle_streamable_http)],
lifespan=lifespan,
)
# Ensure browser clients can read MCP headers when using HTTP transport
starlette_app.state.ckan_server = self
starlette_app.state.session_manager = session_manager
cors_app = CORSMiddleware(
starlette_app,
allow_origins=list(allow_origins or ["*"]),
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["*"],
expose_headers=["Mcp-Session-Id"],
)
self._http_app = cors_app
return cors_app
async def run_stdio(self) -> None:
"""Run the server using stdio transport."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream, write_stream, self.server.create_initialization_options()
)
async def run_streamable_http(
self,
*,
host: str = "0.0.0.0",
port: int = 8000,
mount_path: str = "/mcp",
allow_origins: Sequence[str] | None = None,
json_response: bool = False,
log_level: str = "info",
) -> None:
"""Serve the MCP server over the Streamable HTTP transport."""
app = self.create_streamable_http_app(
mount_path=mount_path,
allow_origins=allow_origins,
json_response=json_response,
)
config = uvicorn.Config(
app,
host=host,
port=port,
log_level=log_level,
)
server = uvicorn.Server(config)
await server.serve()
def create_default_server() -> CKANMCPServer:
"""Create a server instance."""
return CKANMCPServer()
def create_http_app() -> ASGIApp:
"""Factory used by uvicorn/gunicorn to mount the HTTP transport."""
server = create_default_server()
allow_origins_env = os.getenv("CKAN_MCP_HTTP_ALLOW_ORIGINS", "*")
allow_origins = [origin.strip() for origin in allow_origins_env.split(",") if origin.strip()]
json_response = os.getenv("CKAN_MCP_HTTP_JSON_RESPONSE", "false").lower() in {
"1",
"true",
"yes",
"on",
}
mount_path = os.getenv("CKAN_MCP_HTTP_PATH", "/mcp")
return server.create_streamable_http_app(
mount_path=mount_path,
allow_origins=allow_origins,
json_response=json_response,
)
async def main() -> None:
"""Main entry point for the server."""
server = create_default_server()
await server.init()
# Check if we should run as stdio or web server
if os.getenv("CKAN_MCP_MODE", "stdio").lower() == "http":
host = os.getenv("CKAN_MCP_HOST", "0.0.0.0")
port = int(os.getenv("CKAN_MCP_PORT", "8000"))
mount_path = os.getenv("CKAN_MCP_HTTP_PATH", "/mcp")
allow_origins_env = os.getenv("CKAN_MCP_HTTP_ALLOW_ORIGINS", "*")
allow_origins = [
origin.strip() for origin in allow_origins_env.split(",") if origin.strip()
]
json_response = os.getenv("CKAN_MCP_HTTP_JSON_RESPONSE", "false").lower() in {
"1",
"true",
"yes",
"on",
}
log_level = os.getenv("CKAN_MCP_HTTP_LOG_LEVEL", "info")
await server.run_streamable_http(
host=host,
port=port,
mount_path=mount_path,
allow_origins=allow_origins,
json_response=json_response,
log_level=log_level,
)
else:
await server.run_stdio()
def cli() -> None:
"""Console script entry point wrapping the async main coroutine."""
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())