base_transport.py•2.76 kB
"""
传输层抽象基类
定义MCP传输层的统一接口,支持不同传输方式
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Callable
import logging
logger = logging.getLogger(__name__)
class BaseTransport(ABC):
"""传输层抽象基类"""
def __init__(self):
self.message_handler: Optional[Callable] = None
self.running = False
def set_message_handler(self, handler: Callable[[Dict[str, Any]], Dict[str, Any]]):
"""
设置消息处理器
Args:
handler: 消息处理函数,接收请求字典,返回响应字典
"""
self.message_handler = handler
@abstractmethod
def start(self) -> None:
"""
启动传输层
开始监听和处理消息
"""
pass
@abstractmethod
def stop(self) -> None:
"""
停止传输层
清理资源并停止监听
"""
pass
@abstractmethod
def send_message(self, message: Dict[str, Any]) -> None:
"""
发送消息
Args:
message: 要发送的消息字典
"""
pass
@abstractmethod
def receive_message(self) -> Optional[Dict[str, Any]]:
"""
接收消息
Returns:
接收到的消息字典,如果没有消息返回None
"""
pass
def get_transport_name(self) -> str:
"""
返回传输层名称
Returns:
传输层名称,如 "stdio"、"http"、"websocket"
"""
return self.__class__.__name__.replace('Transport', '').lower()
def handle_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
处理接收到的消息
Args:
message: 接收到的消息
Returns:
处理结果消息
"""
if self.message_handler is None:
logger.error("Message handler not set")
return {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": "Internal error: message handler not configured"
},
"id": message.get("id")
}
try:
return self.message_handler(message)
except Exception as e:
logger.exception("Error handling message")
return {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
},
"id": message.get("id")
}