server.py•3.44 kB
from __future__ import annotations
import json
from concurrent import futures
from typing import Any
import grpc
from adapters import load_adapters
from config import get_settings
import time
from utils import write_audit
# Generated protos (present after `make proto`)
from generated import mcp_pb2, mcp_pb2_grpc # type: ignore # noqa: E402
class MCPServicer(mcp_pb2_grpc.ModelControlPlaneServicer):
"""Implements the ModelControlPlane gRPC service."""
def __init__(self) -> None:
self.adapters = load_adapters()
# ------------------------------------------------------------------
# RPCs
# ------------------------------------------------------------------
def ListMethods(self, request: mcp_pb2.ListMethodsRequest, context: grpc.ServicerContext): # noqa: N802
resp = mcp_pb2.ListMethodsResponse()
for adapter in self.adapters.values():
for fq_name, fn in adapter.methods.items():
resp.methods.add(fq_name=fq_name, doc=fn.__doc__ or "")
return resp
def Invoke(self, request: mcp_pb2.InvokeRequest, context: grpc.ServicerContext): # noqa: N802
fq_name = request.fq_name
payload: dict[str, Any] = json.loads(request.json_args or "{}")
args = payload.get("args", [])
kwargs = payload.get("kwargs", {})
# pick ttl from grpc metadata if provided
meta = {k: v for k, v in context.invocation_metadata()}
ttl = int(meta.get("cache-ttl", "0"))
# Find adapter by prefix
adapter_name = fq_name.split(".", 1)[0]
adapter = self.adapters.get(adapter_name)
if adapter is None:
context.abort(grpc.StatusCode.NOT_FOUND, f"Adapter '{adapter_name}' not loaded")
try:
result = adapter.call(fq_name, *args, ttl=ttl, **kwargs)
except Exception as exc: # pragma: no cover
context.abort(grpc.StatusCode.INTERNAL, str(exc))
return mcp_pb2.InvokeResponse(json_result=json.dumps(result, default=str))
# ------------------------------------------------------------------
# HealthCheck rpc
# ------------------------------------------------------------------
def HealthCheck(self, request: mcp_pb2.HealthRequest, context: grpc.ServicerContext): # noqa: N802
resp = mcp_pb2.HealthResponse()
for name, adapter in self.adapters.items():
status = resp.adapters.add(name=name)
try:
_ = len(adapter.methods)
status.ok = True
status.detail = "ok"
except Exception as exc: # noqa: broad-except
status.ok = False
status.detail = str(exc)
write_audit({"event": "healthcheck", "adapters": len(self.adapters)})
return resp
# ----------------------------------------------------------------------
# Entrypoint
# ----------------------------------------------------------------------
def serve(port: int = 50051) -> None: # pragma: no cover
settings = get_settings()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
mcp_pb2_grpc.add_ModelControlPlaneServicer_to_server(MCPServicer(), server)
server.add_insecure_port(f"[::]:{port}")
server.start()
print(f"MCP server running on {port} using model {settings.openai_model}")
server.wait_for_termination()
if __name__ == "__main__": # pragma: no cover
serve()