base.py•3 kB
"""
Base transport interface for Katamari MCP.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, AsyncGenerator
from pydantic import BaseModel
import asyncio
import logging
logger = logging.getLogger(__name__)
class TransportMessage(BaseModel):
"""Message structure for transport communication."""
id: str
method: str
params: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
timestamp: Optional[float] = None
class BaseTransport(ABC):
"""Abstract base class for all transport implementations."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.is_running = False
self.message_handlers = {}
self._connection_id = None
@abstractmethod
async def start(self) -> None:
"""Start the transport server."""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the transport server."""
pass
@abstractmethod
async def send_message(self, message: TransportMessage) -> None:
"""Send a message through the transport."""
pass
@abstractmethod
async def receive_messages(self) -> AsyncGenerator[TransportMessage, None]:
"""Receive messages from the transport."""
pass
def register_handler(self, method: str, handler):
"""Register a message handler for a specific method."""
self.message_handlers[method] = handler
async def handle_message(self, message: TransportMessage) -> TransportMessage:
"""Handle an incoming message."""
handler = self.message_handlers.get(message.method)
if handler:
try:
result = await handler(message.params or {})
return TransportMessage(
id=message.id,
method=message.method,
result=result,
timestamp=asyncio.get_event_loop().time()
)
except Exception as e:
logger.error(f"Handler error for {message.method}: {e}")
return TransportMessage(
id=message.id,
method=message.method,
error={"code": -1, "message": str(e)},
timestamp=asyncio.get_event_loop().time()
)
else:
return TransportMessage(
id=message.id,
method=message.method,
error={"code": -32601, "message": "Method not found"},
timestamp=asyncio.get_event_loop().time()
)
@property
def connection_id(self) -> Optional[str]:
"""Get the connection identifier."""
return self._connection_id
@connection_id.setter
def connection_id(self, value: str):
"""Set the connection identifier."""
self._connection_id = value