Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
simplified_api_server.py18.4 kB
""" MemOS简化API服务器 基于现有依赖的轻量级API实现,无需额外安装FastAPI """ import json import time from datetime import datetime from typing import Dict, Any, List, Optional from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import threading from mvp_memory import create_mvp_memory_manager class MemOSAPIHandler(BaseHTTPRequestHandler): """MemOS API请求处理器""" def __init__(self, *args, mvp_manager=None, **kwargs): self.mvp_manager = mvp_manager super().__init__(*args, **kwargs) def do_GET(self): """处理GET请求""" try: parsed_path = urlparse(self.path) path = parsed_path.path if path == "/": self._send_redirect("/docs") elif path == "/docs": self._send_api_docs() elif path == "/health": self._handle_health_check() elif path == "/status": self._handle_system_status() elif path == "/metrics": self._handle_performance_metrics() elif path == "/capacity/report": self._handle_capacity_report() else: self._send_error(404, "Not Found", "端点不存在") except Exception as e: self._send_error(500, "Internal Server Error", str(e)) def do_POST(self): """处理POST请求""" try: parsed_path = urlparse(self.path) path = parsed_path.path # 读取请求体 content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length) try: request_data = json.loads(post_data.decode('utf-8')) if post_data else {} except json.JSONDecodeError: self._send_error(400, "Bad Request", "无效的JSON格式") return if path == "/memories": self._handle_create_memory(request_data) elif path == "/memories/search": self._handle_search_memories(request_data) elif path == "/memories/feedback": self._handle_memory_feedback(request_data) elif path == "/system/optimize": self._handle_system_optimization() else: self._send_error(404, "Not Found", "端点不存在") except Exception as e: self._send_error(500, "Internal Server Error", str(e)) def do_OPTIONS(self): """处理OPTIONS请求(CORS预检)""" self._send_cors_headers() self.send_response(200) self.end_headers() def _send_cors_headers(self): """发送CORS头""" self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Accept') def _send_json_response(self, data: Dict[str, Any], status_code: int = 200): """发送JSON响应""" response_json = json.dumps(data, ensure_ascii=False, indent=2) self.send_response(status_code) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Length', str(len(response_json.encode('utf-8')))) self._send_cors_headers() self.end_headers() self.wfile.write(response_json.encode('utf-8')) def _send_error(self, status_code: int, error_type: str, message: str): """发送错误响应""" error_response = { "success": False, "error_code": error_type.upper().replace(" ", "_"), "error_message": message, "timestamp": datetime.now().isoformat() } self._send_json_response(error_response, status_code) def _send_redirect(self, location: str): """发送重定向响应""" self.send_response(307) self.send_header('Location', location) self.end_headers() def _send_api_docs(self): """发送API文档页面""" docs_html = """ <!DOCTYPE html> <html> <head> <title>MemOS API文档</title> <meta charset="utf-8"> <style> body { font-family: Arial, sans-serif; margin: 40px; } .endpoint { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; } .method { font-weight: bold; color: #007bff; } .path { font-family: monospace; background: #f8f9fa; padding: 2px 5px; } pre { background: #f8f9fa; padding: 10px; border-radius: 3px; overflow-x: auto; } </style> </head> <body> <h1>MemOS官方API文档</h1> <p>基于MemOS官方规范的标准化REST API</p> <h2>基础端点</h2> <div class="endpoint"> <div><span class="method">GET</span> <span class="path">/health</span></div> <p>健康检查 - 检查API服务和MemOS系统的健康状态</p> </div> <div class="endpoint"> <div><span class="method">GET</span> <span class="path">/status</span></div> <p>获取系统状态 - 获取MemOS系统的详细状态信息</p> </div> <div class="endpoint"> <div><span class="method">GET</span> <span class="path">/metrics</span></div> <p>获取性能指标 - 获取系统性能指标和监控数据</p> </div> <h2>记忆管理</h2> <div class="endpoint"> <div><span class="method">POST</span> <span class="path">/memories</span></div> <p>创建记忆 - 添加新的记忆到系统中</p> <pre> { "content": "记忆内容", "tags": ["标签1", "标签2"], "metadata": {} } </pre> </div> <div class="endpoint"> <div><span class="method">POST</span> <span class="path">/memories/search</span></div> <p>搜索记忆 - 根据查询内容搜索相关记忆</p> <pre> { "query": "搜索查询", "top_k": 5, "use_reranker": true, "use_feedback_boost": true } </pre> </div> <div class="endpoint"> <div><span class="method">POST</span> <span class="path">/memories/feedback</span></div> <p>提供记忆反馈 - 为记忆提供👍/👎反馈</p> <pre> { "memory_id": "记忆ID", "feedback_type": "thumbs_up" } </pre> </div> <h2>容量管理</h2> <div class="endpoint"> <div><span class="method">GET</span> <span class="path">/capacity/report</span></div> <p>获取容量报告 - 获取详细的容量管理报告</p> </div> <div class="endpoint"> <div><span class="method">POST</span> <span class="path">/system/optimize</span></div> <p>手动触发系统优化 - 手动触发系统优化,包括内存压缩和性能调优</p> </div> <h2>使用示例</h2> <pre> # 健康检查 curl http://localhost:8000/health # 创建记忆 curl -X POST http://localhost:8000/memories \\ -H "Content-Type: application/json" \\ -d '{"content": "这是一个测试记忆", "tags": ["测试"]}' # 搜索记忆 curl -X POST http://localhost:8000/memories/search \\ -H "Content-Type: application/json" \\ -d '{"query": "测试", "top_k": 5}' </pre> </body> </html> """ self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Content-Length', str(len(docs_html.encode('utf-8')))) self.end_headers() self.wfile.write(docs_html.encode('utf-8')) def _handle_health_check(self): """处理健康检查""" try: connection_ok = self.mvp_manager.test_connection() status_info = self.mvp_manager.get_status_info() health_data = { "api_status": "healthy", "memos_connection": "ok" if connection_ok else "error", "system_mode": status_info.get("mode", "unknown"), "capacity_management": status_info.get("capacity_management", {}).get("enabled", False) } response = { "success": True, "message": "系统健康状态正常", "data": health_data, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(503, "Service Unavailable", f"服务不可用: {str(e)}") def _handle_system_status(self): """处理系统状态查询""" try: status_info = self.mvp_manager.get_status_info() response = { "success": True, "message": "系统状态获取成功", "data": status_info, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"获取系统状态失败: {str(e)}") def _handle_performance_metrics(self): """处理性能指标查询""" try: metrics_data = self.mvp_manager.get_performance_metrics() if "error" in metrics_data: self._send_error(503, "Service Unavailable", metrics_data["error"]) return response = { "success": True, "message": "性能指标获取成功", "data": metrics_data, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"获取性能指标失败: {str(e)}") def _handle_create_memory(self, request_data: Dict[str, Any]): """处理创建记忆""" try: # 验证必需字段 if "content" not in request_data: self._send_error(400, "Bad Request", "缺少必需字段: content") return content = request_data["content"] tags = request_data.get("tags", []) metadata = request_data.get("metadata", {}) # 记录操作开始时间 start_time = time.time() # 添加记忆 success = self.mvp_manager.remember(content=content, tags=tags, metadata=metadata) if not success: self._send_error(500, "Internal Server Error", "记忆创建失败") return # 记录操作时间 duration = time.time() - start_time response = { "success": True, "message": "记忆创建成功", "data": { "content_preview": content[:50] + "..." if len(content) > 50 else content, "tags": tags, "processing_time": round(duration, 3) }, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"创建记忆失败: {str(e)}") def _handle_search_memories(self, request_data: Dict[str, Any]): """处理搜索记忆""" try: # 验证必需字段 if "query" not in request_data: self._send_error(400, "Bad Request", "缺少必需字段: query") return query = request_data["query"] top_k = request_data.get("top_k", 5) use_reranker = request_data.get("use_reranker", True) use_feedback_boost = request_data.get("use_feedback_boost", True) # 记录操作开始时间 start_time = time.time() # 执行搜索 results = self.mvp_manager.recall( query=query, top_k=top_k, use_reranker=use_reranker, use_feedback_boost=use_feedback_boost ) # 格式化搜索结果 formatted_results = [] for result in results: formatted_result = { "memory": { "id": str(result.get("id", "")), "content": result.get("content", ""), "tags": result.get("tags", []), "metadata": result.get("metadata", {}), "created_at": result.get("metadata", {}).get("timestamp", ""), "usage_score": result.get("metadata", {}).get("usage_score") }, "score": result.get("score", 0.0), "feedback_boost": result.get("feedback_boost") } formatted_results.append(formatted_result) # 记录操作时间 duration = time.time() - start_time response = { "success": True, "message": f"搜索完成,找到 {len(formatted_results)} 条相关记忆", "data": { "query": query, "results": formatted_results, "total_count": len(formatted_results), "processing_time": round(duration, 3) }, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"搜索记忆失败: {str(e)}") def _handle_memory_feedback(self, request_data: Dict[str, Any]): """处理记忆反馈""" try: # 验证必需字段 if "memory_id" not in request_data or "feedback_type" not in request_data: self._send_error(400, "Bad Request", "缺少必需字段: memory_id, feedback_type") return memory_id = request_data["memory_id"] feedback_type = request_data["feedback_type"] # 提供反馈 success = self.mvp_manager.provide_feedback(memory_id=memory_id, feedback_type=feedback_type) if not success: self._send_error(404, "Not Found", "记忆ID不存在或反馈失败") return feedback_emoji = "👍" if feedback_type in ["thumbs_up", "👍"] else "👎" response = { "success": True, "message": f"反馈已保存 {feedback_emoji}", "data": { "memory_id": memory_id, "feedback_type": feedback_type, "feedback_emoji": feedback_emoji }, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"提供记忆反馈失败: {str(e)}") def _handle_capacity_report(self): """处理容量报告查询""" try: capacity_report = self.mvp_manager.get_capacity_report() if "error" in capacity_report: self._send_error(503, "Service Unavailable", capacity_report["error"]) return response = { "success": True, "message": "容量报告获取成功", "data": capacity_report, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"获取容量报告失败: {str(e)}") def _handle_system_optimization(self): """处理系统优化""" try: # 触发手动优化 success = self.mvp_manager.trigger_manual_optimization() if not success: self._send_error(500, "Internal Server Error", "系统优化失败") return # 获取优化后的性能指标 metrics = self.mvp_manager.get_performance_metrics() response = { "success": True, "message": "系统优化完成", "data": { "optimization_applied": True, "current_metrics": metrics }, "timestamp": datetime.now().isoformat() } self._send_json_response(response) except Exception as e: self._send_error(500, "Internal Server Error", f"系统优化失败: {str(e)}") def create_api_handler(mvp_manager): """创建API处理器的工厂函数""" def handler(*args, **kwargs): return MemOSAPIHandler(*args, mvp_manager=mvp_manager, **kwargs) return handler def start_api_server(host: str = "localhost", port: int = 8000): """启动API服务器""" print("🚀 启动MemOS简化API服务器...") print(f"📖 API文档: http://{host}:{port}/docs") print(f"🔍 健康检查: http://{host}:{port}/health") # 初始化MVP管理器 mvp_manager = create_mvp_memory_manager(use_official_config=True) # 创建HTTP服务器 handler_class = create_api_handler(mvp_manager) server = HTTPServer((host, port), handler_class) print(f"✅ 服务器已启动在 http://{host}:{port}") print("按 Ctrl+C 停止服务器") try: server.serve_forever() except KeyboardInterrupt: print("\n⏹️ 服务器已停止") server.shutdown() if __name__ == "__main__": start_api_server()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server