simplified_api_server.py•18.4 kB
"""
MemOS简化API服务器
基于现有依赖的轻量级API实现,无需额外安装FastAPI
"""
import json
import time
from datetime import datetime
from typing import Dict, Any, List, Optional
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
from mvp_memory import create_mvp_memory_manager
class MemOSAPIHandler(BaseHTTPRequestHandler):
"""MemOS API请求处理器"""
def __init__(self, *args, mvp_manager=None, **kwargs):
self.mvp_manager = mvp_manager
super().__init__(*args, **kwargs)
def do_GET(self):
"""处理GET请求"""
try:
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/":
self._send_redirect("/docs")
elif path == "/docs":
self._send_api_docs()
elif path == "/health":
self._handle_health_check()
elif path == "/status":
self._handle_system_status()
elif path == "/metrics":
self._handle_performance_metrics()
elif path == "/capacity/report":
self._handle_capacity_report()
else:
self._send_error(404, "Not Found", "端点不存在")
except Exception as e:
self._send_error(500, "Internal Server Error", str(e))
def do_POST(self):
"""处理POST请求"""
try:
parsed_path = urlparse(self.path)
path = parsed_path.path
# 读取请求体
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length)
try:
request_data = json.loads(post_data.decode('utf-8')) if post_data else {}
except json.JSONDecodeError:
self._send_error(400, "Bad Request", "无效的JSON格式")
return
if path == "/memories":
self._handle_create_memory(request_data)
elif path == "/memories/search":
self._handle_search_memories(request_data)
elif path == "/memories/feedback":
self._handle_memory_feedback(request_data)
elif path == "/system/optimize":
self._handle_system_optimization()
else:
self._send_error(404, "Not Found", "端点不存在")
except Exception as e:
self._send_error(500, "Internal Server Error", str(e))
def do_OPTIONS(self):
"""处理OPTIONS请求(CORS预检)"""
self._send_cors_headers()
self.send_response(200)
self.end_headers()
def _send_cors_headers(self):
"""发送CORS头"""
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Accept')
def _send_json_response(self, data: Dict[str, Any], status_code: int = 200):
"""发送JSON响应"""
response_json = json.dumps(data, ensure_ascii=False, indent=2)
self.send_response(status_code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', str(len(response_json.encode('utf-8'))))
self._send_cors_headers()
self.end_headers()
self.wfile.write(response_json.encode('utf-8'))
def _send_error(self, status_code: int, error_type: str, message: str):
"""发送错误响应"""
error_response = {
"success": False,
"error_code": error_type.upper().replace(" ", "_"),
"error_message": message,
"timestamp": datetime.now().isoformat()
}
self._send_json_response(error_response, status_code)
def _send_redirect(self, location: str):
"""发送重定向响应"""
self.send_response(307)
self.send_header('Location', location)
self.end_headers()
def _send_api_docs(self):
"""发送API文档页面"""
docs_html = """
<!DOCTYPE html>
<html>
<head>
<title>MemOS API文档</title>
<meta charset="utf-8">
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.endpoint { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
.method { font-weight: bold; color: #007bff; }
.path { font-family: monospace; background: #f8f9fa; padding: 2px 5px; }
pre { background: #f8f9fa; padding: 10px; border-radius: 3px; overflow-x: auto; }
</style>
</head>
<body>
<h1>MemOS官方API文档</h1>
<p>基于MemOS官方规范的标准化REST API</p>
<h2>基础端点</h2>
<div class="endpoint">
<div><span class="method">GET</span> <span class="path">/health</span></div>
<p>健康检查 - 检查API服务和MemOS系统的健康状态</p>
</div>
<div class="endpoint">
<div><span class="method">GET</span> <span class="path">/status</span></div>
<p>获取系统状态 - 获取MemOS系统的详细状态信息</p>
</div>
<div class="endpoint">
<div><span class="method">GET</span> <span class="path">/metrics</span></div>
<p>获取性能指标 - 获取系统性能指标和监控数据</p>
</div>
<h2>记忆管理</h2>
<div class="endpoint">
<div><span class="method">POST</span> <span class="path">/memories</span></div>
<p>创建记忆 - 添加新的记忆到系统中</p>
<pre>
{
"content": "记忆内容",
"tags": ["标签1", "标签2"],
"metadata": {}
}
</pre>
</div>
<div class="endpoint">
<div><span class="method">POST</span> <span class="path">/memories/search</span></div>
<p>搜索记忆 - 根据查询内容搜索相关记忆</p>
<pre>
{
"query": "搜索查询",
"top_k": 5,
"use_reranker": true,
"use_feedback_boost": true
}
</pre>
</div>
<div class="endpoint">
<div><span class="method">POST</span> <span class="path">/memories/feedback</span></div>
<p>提供记忆反馈 - 为记忆提供👍/👎反馈</p>
<pre>
{
"memory_id": "记忆ID",
"feedback_type": "thumbs_up"
}
</pre>
</div>
<h2>容量管理</h2>
<div class="endpoint">
<div><span class="method">GET</span> <span class="path">/capacity/report</span></div>
<p>获取容量报告 - 获取详细的容量管理报告</p>
</div>
<div class="endpoint">
<div><span class="method">POST</span> <span class="path">/system/optimize</span></div>
<p>手动触发系统优化 - 手动触发系统优化,包括内存压缩和性能调优</p>
</div>
<h2>使用示例</h2>
<pre>
# 健康检查
curl http://localhost:8000/health
# 创建记忆
curl -X POST http://localhost:8000/memories \\
-H "Content-Type: application/json" \\
-d '{"content": "这是一个测试记忆", "tags": ["测试"]}'
# 搜索记忆
curl -X POST http://localhost:8000/memories/search \\
-H "Content-Type: application/json" \\
-d '{"query": "测试", "top_k": 5}'
</pre>
</body>
</html>
"""
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(docs_html.encode('utf-8'))))
self.end_headers()
self.wfile.write(docs_html.encode('utf-8'))
def _handle_health_check(self):
"""处理健康检查"""
try:
connection_ok = self.mvp_manager.test_connection()
status_info = self.mvp_manager.get_status_info()
health_data = {
"api_status": "healthy",
"memos_connection": "ok" if connection_ok else "error",
"system_mode": status_info.get("mode", "unknown"),
"capacity_management": status_info.get("capacity_management", {}).get("enabled", False)
}
response = {
"success": True,
"message": "系统健康状态正常",
"data": health_data,
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(503, "Service Unavailable", f"服务不可用: {str(e)}")
def _handle_system_status(self):
"""处理系统状态查询"""
try:
status_info = self.mvp_manager.get_status_info()
response = {
"success": True,
"message": "系统状态获取成功",
"data": status_info,
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"获取系统状态失败: {str(e)}")
def _handle_performance_metrics(self):
"""处理性能指标查询"""
try:
metrics_data = self.mvp_manager.get_performance_metrics()
if "error" in metrics_data:
self._send_error(503, "Service Unavailable", metrics_data["error"])
return
response = {
"success": True,
"message": "性能指标获取成功",
"data": metrics_data,
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"获取性能指标失败: {str(e)}")
def _handle_create_memory(self, request_data: Dict[str, Any]):
"""处理创建记忆"""
try:
# 验证必需字段
if "content" not in request_data:
self._send_error(400, "Bad Request", "缺少必需字段: content")
return
content = request_data["content"]
tags = request_data.get("tags", [])
metadata = request_data.get("metadata", {})
# 记录操作开始时间
start_time = time.time()
# 添加记忆
success = self.mvp_manager.remember(content=content, tags=tags, metadata=metadata)
if not success:
self._send_error(500, "Internal Server Error", "记忆创建失败")
return
# 记录操作时间
duration = time.time() - start_time
response = {
"success": True,
"message": "记忆创建成功",
"data": {
"content_preview": content[:50] + "..." if len(content) > 50 else content,
"tags": tags,
"processing_time": round(duration, 3)
},
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"创建记忆失败: {str(e)}")
def _handle_search_memories(self, request_data: Dict[str, Any]):
"""处理搜索记忆"""
try:
# 验证必需字段
if "query" not in request_data:
self._send_error(400, "Bad Request", "缺少必需字段: query")
return
query = request_data["query"]
top_k = request_data.get("top_k", 5)
use_reranker = request_data.get("use_reranker", True)
use_feedback_boost = request_data.get("use_feedback_boost", True)
# 记录操作开始时间
start_time = time.time()
# 执行搜索
results = self.mvp_manager.recall(
query=query,
top_k=top_k,
use_reranker=use_reranker,
use_feedback_boost=use_feedback_boost
)
# 格式化搜索结果
formatted_results = []
for result in results:
formatted_result = {
"memory": {
"id": str(result.get("id", "")),
"content": result.get("content", ""),
"tags": result.get("tags", []),
"metadata": result.get("metadata", {}),
"created_at": result.get("metadata", {}).get("timestamp", ""),
"usage_score": result.get("metadata", {}).get("usage_score")
},
"score": result.get("score", 0.0),
"feedback_boost": result.get("feedback_boost")
}
formatted_results.append(formatted_result)
# 记录操作时间
duration = time.time() - start_time
response = {
"success": True,
"message": f"搜索完成,找到 {len(formatted_results)} 条相关记忆",
"data": {
"query": query,
"results": formatted_results,
"total_count": len(formatted_results),
"processing_time": round(duration, 3)
},
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"搜索记忆失败: {str(e)}")
def _handle_memory_feedback(self, request_data: Dict[str, Any]):
"""处理记忆反馈"""
try:
# 验证必需字段
if "memory_id" not in request_data or "feedback_type" not in request_data:
self._send_error(400, "Bad Request", "缺少必需字段: memory_id, feedback_type")
return
memory_id = request_data["memory_id"]
feedback_type = request_data["feedback_type"]
# 提供反馈
success = self.mvp_manager.provide_feedback(memory_id=memory_id, feedback_type=feedback_type)
if not success:
self._send_error(404, "Not Found", "记忆ID不存在或反馈失败")
return
feedback_emoji = "👍" if feedback_type in ["thumbs_up", "👍"] else "👎"
response = {
"success": True,
"message": f"反馈已保存 {feedback_emoji}",
"data": {
"memory_id": memory_id,
"feedback_type": feedback_type,
"feedback_emoji": feedback_emoji
},
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"提供记忆反馈失败: {str(e)}")
def _handle_capacity_report(self):
"""处理容量报告查询"""
try:
capacity_report = self.mvp_manager.get_capacity_report()
if "error" in capacity_report:
self._send_error(503, "Service Unavailable", capacity_report["error"])
return
response = {
"success": True,
"message": "容量报告获取成功",
"data": capacity_report,
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"获取容量报告失败: {str(e)}")
def _handle_system_optimization(self):
"""处理系统优化"""
try:
# 触发手动优化
success = self.mvp_manager.trigger_manual_optimization()
if not success:
self._send_error(500, "Internal Server Error", "系统优化失败")
return
# 获取优化后的性能指标
metrics = self.mvp_manager.get_performance_metrics()
response = {
"success": True,
"message": "系统优化完成",
"data": {
"optimization_applied": True,
"current_metrics": metrics
},
"timestamp": datetime.now().isoformat()
}
self._send_json_response(response)
except Exception as e:
self._send_error(500, "Internal Server Error", f"系统优化失败: {str(e)}")
def create_api_handler(mvp_manager):
"""创建API处理器的工厂函数"""
def handler(*args, **kwargs):
return MemOSAPIHandler(*args, mvp_manager=mvp_manager, **kwargs)
return handler
def start_api_server(host: str = "localhost", port: int = 8000):
"""启动API服务器"""
print("🚀 启动MemOS简化API服务器...")
print(f"📖 API文档: http://{host}:{port}/docs")
print(f"🔍 健康检查: http://{host}:{port}/health")
# 初始化MVP管理器
mvp_manager = create_mvp_memory_manager(use_official_config=True)
# 创建HTTP服务器
handler_class = create_api_handler(mvp_manager)
server = HTTPServer((host, port), handler_class)
print(f"✅ 服务器已启动在 http://{host}:{port}")
print("按 Ctrl+C 停止服务器")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n⏹️ 服务器已停止")
server.shutdown()
if __name__ == "__main__":
start_api_server()