resource_router.py•5.11 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCP 资源管理路由器
处理资源相关的 MCP 请求
"""
import os
import json
import base64
import logging
from typing import Dict, Any, Optional, List
from .base_router import MCPBaseRouter
logger = logging.getLogger("resource_router")
class ResourceRouter(MCPBaseRouter):
"""资源管理路由器"""
def __init__(self):
"""初始化路由器"""
super().__init__()
self.resources = {}
self._register_methods()
def _register_methods(self):
"""注册方法处理器"""
self.register_method("resources/list", self.handle_list_resources)
self.register_method("resources/get", self.handle_get_resource)
self.register_method("resources/search", self.handle_search_resources)
self.register_method("resources/subscribe", self.handle_subscribe_resources)
def handle_list_resources(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理资源列表请求"""
logger.info(f"处理资源列表请求,参数: {params}")
# 示例资源列表
resources = [
{
"id": "resource1",
"name": "示例代码文件",
"type": "code",
"path": "/examples/example.py",
"metadata": {
"language": "python",
"size": 1024
}
},
{
"id": "resource2",
"name": "示例文档",
"type": "document",
"path": "/docs/example.md",
"metadata": {
"format": "markdown",
"size": 2048
}
}
]
return {
"resources": resources
}
def handle_get_resource(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理获取资源请求"""
if not params or "id" not in params:
raise ValueError("缺少资源 ID 参数")
resource_id = params["id"]
logger.info(f"处理获取资源请求,资源 ID: {resource_id}")
# 示例资源内容
if resource_id == "resource1":
content = """
def hello_world():
print("Hello, MCP!")
if __name__ == "__main__":
hello_world()
"""
return {
"id": "resource1",
"name": "示例代码文件",
"type": "code",
"content": content,
"metadata": {
"language": "python",
"size": len(content)
}
}
elif resource_id == "resource2":
content = """
# 示例文档
这是一个示例 Markdown 文档,用于演示 MCP 资源管理功能。
## 特性
- 支持多种资源类型
- 动态资源加载
- 资源生命周期管理
"""
return {
"id": "resource2",
"name": "示例文档",
"type": "document",
"content": content,
"metadata": {
"format": "markdown",
"size": len(content)
}
}
else:
raise ValueError(f"资源 ID '{resource_id}' 不存在")
async def handle_search_resources(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理搜索资源请求(异步示例)"""
if not params or "query" not in params:
raise ValueError("缺少搜索查询参数")
query = params["query"]
logger.info(f"处理搜索资源请求,查询: {query}")
# 示例搜索结果
results = []
if "python" in query.lower():
results.append({
"id": "resource1",
"name": "示例代码文件",
"type": "code",
"path": "/examples/example.py",
"relevance": 0.95
})
if "文档" in query or "document" in query.lower():
results.append({
"id": "resource2",
"name": "示例文档",
"type": "document",
"path": "/docs/example.md",
"relevance": 0.85
})
return {
"query": query,
"results": results
}
def handle_subscribe_resources(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理订阅资源变更请求"""
if not params or "resourceIds" not in params:
raise ValueError("缺少资源 ID 列表参数")
resource_ids = params["resourceIds"]
logger.info(f"处理订阅资源变更请求,资源 ID: {resource_ids}")
# 在实际应用中,这里会设置订阅,并在资源变更时发送通知
return {
"success": True,
"subscribed": resource_ids
}