mcp_protocol.py•2.5 kB
"""standard mcp json-rpc protocol implementation."""
from __future__ import annotations
import json
import sys
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
@dataclass
class MCPRequest:
"""mcp json-rpc request."""
jsonrpc: str = "2.0"
id: Optional[int | str] = None
method: str = ""
params: Optional[Dict[str, Any]] = None
@dataclass
class MCPResponse:
"""mcp json-rpc response."""
jsonrpc: str = "2.0"
id: Optional[int | str] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
@dataclass
class MCPError:
"""mcp error payload."""
code: int
message: str
data: Optional[Dict[str, Any]] = None
class MCPServer:
"""standard mcp server with stdio transport."""
def __init__(self):
self.handlers: Dict[str, callable] = {}
def register(self, method: str, handler: callable):
"""register a method handler."""
self.handlers[method] = handler
def handle_request(self, req: MCPRequest) -> MCPResponse:
"""handle a single mcp request."""
if req.method not in self.handlers:
return MCPResponse(
id=req.id,
error=asdict(MCPError(
code=-32601,
message=f"method not found: {req.method}"
))
)
try:
result = self.handlers[req.method](req.params or {})
return MCPResponse(id=req.id, result=result)
except Exception as exc:
return MCPResponse(
id=req.id,
error=asdict(MCPError(
code=-32603,
message=str(exc),
data={"type": type(exc).__name__}
))
)
def run_stdio(self):
"""run mcp server over stdio (standard transport)."""
for line in sys.stdin:
try:
raw = json.loads(line.strip())
req = MCPRequest(**raw)
resp = self.handle_request(req)
print(json.dumps(asdict(resp)), flush=True)
except Exception as exc:
err_resp = MCPResponse(
id=None,
error=asdict(MCPError(
code=-32700,
message=f"parse error: {exc}"
))
)
print(json.dumps(asdict(err_resp)), flush=True)