official_api_server.py•24.8 kB
"""
MemOS官方API服务器
基于MemOS官方API规范的标准化FastAPI实现
集成容量管理、性能监控和反馈机制
"""
import logging
import os
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request, Query, Path as PathParam
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field, validator
import uvicorn
# 导入MemOS组件
from mvp_memory import create_mvp_memory_manager
from capacity_manager import MemoryType
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# ===== 标准化响应模型 =====
class APIResponse(BaseModel):
"""标准化API响应格式"""
success: bool = Field(..., description="操作是否成功")
message: str = Field(..., description="响应消息")
data: Optional[Any] = Field(None, description="响应数据")
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat(), description="响应时间戳")
request_id: Optional[str] = Field(None, description="请求ID")
class ErrorResponse(BaseModel):
"""错误响应格式"""
success: bool = Field(False, description="操作失败")
error_code: str = Field(..., description="错误代码")
error_message: str = Field(..., description="错误消息")
details: Optional[Dict[str, Any]] = Field(None, description="错误详情")
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat(), description="错误时间戳")
# ===== 请求模型 =====
class MemoryCreateRequest(BaseModel):
"""创建记忆请求"""
content: str = Field(..., description="记忆内容", min_length=1, max_length=10000)
tags: Optional[List[str]] = Field(default=[], description="标签列表")
metadata: Optional[Dict[str, Any]] = Field(default={}, description="元数据")
mem_cube_id: Optional[str] = Field(None, description="记忆立方体ID")
@validator('tags')
def validate_tags(cls, v):
if v and len(v) > 10:
raise ValueError("标签数量不能超过10个")
return v
class MemorySearchRequest(BaseModel):
"""搜索记忆请求"""
query: str = Field(..., description="搜索查询", min_length=1, max_length=1000)
top_k: int = Field(default=5, description="返回结果数量", ge=1, le=50)
use_reranker: bool = Field(default=True, description="是否使用重排器")
use_feedback_boost: bool = Field(default=True, description="是否使用反馈加权")
mem_cube_ids: Optional[List[str]] = Field(default=None, description="搜索范围的记忆立方体ID列表")
class MemoryFeedbackRequest(BaseModel):
"""记忆反馈请求"""
memory_id: str = Field(..., description="记忆ID")
feedback_type: str = Field(..., description="反馈类型", pattern="^(thumbs_up|👍|thumbs_down|👎)$")
class MemCubeCreateRequest(BaseModel):
"""创建记忆立方体请求"""
name: str = Field(..., description="记忆立方体名称", min_length=1, max_length=100)
description: Optional[str] = Field(None, description="描述", max_length=500)
config: Optional[Dict[str, Any]] = Field(default={}, description="配置参数")
# ===== 响应数据模型 =====
class MemoryInfo(BaseModel):
"""记忆信息"""
id: str = Field(..., description="记忆ID")
content: str = Field(..., description="记忆内容")
tags: List[str] = Field(default=[], description="标签")
metadata: Dict[str, Any] = Field(default={}, description="元数据")
created_at: str = Field(..., description="创建时间")
updated_at: Optional[str] = Field(None, description="更新时间")
usage_score: Optional[float] = Field(None, description="使用评分")
class SearchResult(BaseModel):
"""搜索结果"""
memory: MemoryInfo = Field(..., description="记忆信息")
score: float = Field(..., description="相似度分数")
feedback_boost: Optional[float] = Field(None, description="反馈加权系数")
class SystemStatus(BaseModel):
"""系统状态"""
mode: str = Field(..., description="运行模式")
model: str = Field(..., description="嵌入模型")
reranker: str = Field(..., description="重排器")
status: str = Field(..., description="运行状态")
official_config: bool = Field(..., description="是否使用官方配置")
capacity_management: Dict[str, Any] = Field(..., description="容量管理信息")
class PerformanceMetrics(BaseModel):
"""性能指标"""
memory_add_avg_time: float = Field(..., description="记忆添加平均时间(秒)")
memory_search_avg_time: float = Field(..., description="记忆检索平均时间(秒)")
memory_total_count: int = Field(..., description="总记忆数量")
system_health: str = Field(..., description="系统健康状态")
cpu_usage: float = Field(..., description="CPU使用率(%)")
memory_usage_mb: float = Field(..., description="内存使用量(MB)")
disk_usage_mb: float = Field(..., description="磁盘使用量(MB)")
# ===== FastAPI应用初始化 =====
def create_official_api_app() -> FastAPI:
"""创建官方API应用"""
app = FastAPI(
title="MemOS Official API",
description="基于MemOS官方规范的标准化REST API,支持智能记忆管理、容量监控和性能优化",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化MVP管理器
mvp_manager = create_mvp_memory_manager(use_official_config=True)
# 请求ID中间件
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = f"req_{int(time.time() * 1000)}"
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# 性能监控中间件
@app.middleware("http")
async def performance_monitoring(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
logger.info(f"API请求 {request.method} {request.url.path} 耗时: {process_time:.3f}s")
return response
# ===== API端点实现 =====
@app.get("/", include_in_schema=False)
async def root():
"""根路径重定向到API文档"""
return RedirectResponse(url="/docs")
@app.get("/health",
summary="健康检查",
description="检查API服务和MemOS系统的健康状态",
response_model=APIResponse)
async def health_check(request: Request):
"""健康检查端点"""
try:
# 测试MVP管理器连接
connection_ok = mvp_manager.test_connection()
# 获取系统状态
status_info = mvp_manager.get_status_info()
health_data = {
"api_status": "healthy",
"memos_connection": "ok" if connection_ok else "error",
"system_mode": status_info.get("mode", "unknown"),
"capacity_management": status_info.get("capacity_management", {}).get("enabled", False)
}
return APIResponse(
success=True,
message="系统健康状态正常",
data=health_data,
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"健康检查失败: {e}")
raise HTTPException(status_code=503, detail=f"服务不可用: {str(e)}")
@app.get("/status",
summary="获取系统状态",
description="获取MemOS系统的详细状态信息",
response_model=APIResponse)
async def get_system_status(request: Request):
"""获取系统状态"""
try:
status_info = mvp_manager.get_status_info()
system_status = SystemStatus(
mode=status_info.get("mode", "unknown"),
model=status_info.get("model", "unknown"),
reranker=status_info.get("reranker", "unknown"),
status=status_info.get("status", "unknown"),
official_config=status_info.get("official_config", False),
capacity_management=status_info.get("capacity_management", {})
)
return APIResponse(
success=True,
message="系统状态获取成功",
data=system_status.dict(),
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取系统状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取系统状态失败: {str(e)}")
@app.get("/metrics",
summary="获取性能指标",
description="获取系统性能指标和监控数据",
response_model=APIResponse)
async def get_performance_metrics(request: Request):
"""获取性能指标"""
try:
metrics_data = mvp_manager.get_performance_metrics()
if "error" in metrics_data:
raise HTTPException(status_code=503, detail=metrics_data["error"])
metrics = PerformanceMetrics(**metrics_data)
return APIResponse(
success=True,
message="性能指标获取成功",
data=metrics.dict(),
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取性能指标失败: {e}")
raise HTTPException(status_code=500, detail=f"获取性能指标失败: {str(e)}")
# ===== 记忆管理API端点 =====
@app.post("/memories",
summary="创建记忆",
description="添加新的记忆到系统中",
response_model=APIResponse)
async def create_memory(request: Request, memory_request: MemoryCreateRequest):
"""创建新记忆"""
try:
# 记录操作开始时间
start_time = time.time()
# 添加记忆
success = mvp_manager.remember(
content=memory_request.content,
tags=memory_request.tags,
metadata=memory_request.metadata
)
if not success:
raise HTTPException(status_code=500, detail="记忆创建失败")
# 记录操作时间
duration = time.time() - start_time
return APIResponse(
success=True,
message="记忆创建成功",
data={
"content_preview": memory_request.content[:50] + "..." if len(memory_request.content) > 50 else memory_request.content,
"tags": memory_request.tags,
"processing_time": round(duration, 3)
},
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"创建记忆失败: {e}")
raise HTTPException(status_code=500, detail=f"创建记忆失败: {str(e)}")
@app.post("/memories/search",
summary="搜索记忆",
description="根据查询内容搜索相关记忆",
response_model=APIResponse)
async def search_memories(request: Request, search_request: MemorySearchRequest):
"""搜索记忆"""
try:
# 记录操作开始时间
start_time = time.time()
# 执行搜索
results = mvp_manager.recall(
query=search_request.query,
top_k=search_request.top_k,
use_reranker=search_request.use_reranker,
use_feedback_boost=search_request.use_feedback_boost
)
# 格式化搜索结果
formatted_results = []
for result in results:
memory_info = MemoryInfo(
id=str(result.get("id", "")),
content=result.get("content", ""),
tags=result.get("tags", []),
metadata=result.get("metadata", {}),
created_at=result.get("metadata", {}).get("timestamp", ""),
usage_score=result.get("metadata", {}).get("usage_score")
)
search_result = SearchResult(
memory=memory_info,
score=result.get("score", 0.0),
feedback_boost=result.get("feedback_boost")
)
formatted_results.append(search_result.dict())
# 记录操作时间
duration = time.time() - start_time
return APIResponse(
success=True,
message=f"搜索完成,找到 {len(formatted_results)} 条相关记忆",
data={
"query": search_request.query,
"results": formatted_results,
"total_count": len(formatted_results),
"processing_time": round(duration, 3)
},
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"搜索记忆失败: {e}")
raise HTTPException(status_code=500, detail=f"搜索记忆失败: {str(e)}")
@app.post("/memories/feedback",
summary="提供记忆反馈",
description="为记忆提供👍/👎反馈,影响后续检索排序",
response_model=APIResponse)
async def provide_memory_feedback(request: Request, feedback_request: MemoryFeedbackRequest):
"""提供记忆反馈"""
try:
# 提供反馈
success = mvp_manager.provide_feedback(
memory_id=feedback_request.memory_id,
feedback_type=feedback_request.feedback_type
)
if not success:
raise HTTPException(status_code=404, detail="记忆ID不存在或反馈失败")
feedback_emoji = "👍" if feedback_request.feedback_type in ["thumbs_up", "👍"] else "👎"
return APIResponse(
success=True,
message=f"反馈已保存 {feedback_emoji}",
data={
"memory_id": feedback_request.memory_id,
"feedback_type": feedback_request.feedback_type,
"feedback_emoji": feedback_emoji
},
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"提供记忆反馈失败: {e}")
raise HTTPException(status_code=500, detail=f"提供记忆反馈失败: {str(e)}")
@app.get("/memories/stats",
summary="获取记忆统计",
description="获取记忆系统的统计信息",
response_model=APIResponse)
async def get_memory_stats(request: Request):
"""获取记忆统计"""
try:
# 获取反馈统计
feedback_stats = mvp_manager.get_feedback_stats()
# 获取容量报告
capacity_report = mvp_manager.get_capacity_report()
stats_data = {
"feedback_stats": feedback_stats,
"capacity_info": capacity_report.get("memory_stats", {}),
"system_health": capacity_report.get("system_health", "unknown")
}
return APIResponse(
success=True,
message="记忆统计获取成功",
data=stats_data,
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取记忆统计失败: {e}")
raise HTTPException(status_code=500, detail=f"获取记忆统计失败: {str(e)}")
# ===== 容量管理和系统优化API端点 =====
@app.get("/capacity/report",
summary="获取容量报告",
description="获取详细的容量管理报告",
response_model=APIResponse)
async def get_capacity_report(request: Request):
"""获取容量报告"""
try:
capacity_report = mvp_manager.get_capacity_report()
if "error" in capacity_report:
raise HTTPException(status_code=503, detail=capacity_report["error"])
return APIResponse(
success=True,
message="容量报告获取成功",
data=capacity_report,
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取容量报告失败: {e}")
raise HTTPException(status_code=500, detail=f"获取容量报告失败: {str(e)}")
@app.post("/system/optimize",
summary="手动触发系统优化",
description="手动触发系统优化,包括内存压缩和性能调优",
response_model=APIResponse)
async def trigger_system_optimization(request: Request):
"""手动触发系统优化"""
try:
# 触发手动优化
success = mvp_manager.trigger_manual_optimization()
if not success:
raise HTTPException(status_code=500, detail="系统优化失败")
# 获取优化后的性能指标
metrics = mvp_manager.get_performance_metrics()
return APIResponse(
success=True,
message="系统优化完成",
data={
"optimization_applied": True,
"current_metrics": metrics
},
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"系统优化失败: {e}")
raise HTTPException(status_code=500, detail=f"系统优化失败: {str(e)}")
# ===== 兼容性API端点(与现有MCP服务器兼容)=====
@app.get("/mcp/tools",
summary="获取MCP工具列表",
description="获取可用的MCP工具列表(兼容性接口)",
response_model=APIResponse)
async def get_mcp_tools(request: Request):
"""获取MCP工具列表"""
try:
tools = [
{
"name": "query_memos_context",
"description": "查询MemOS记忆数据库并获取LLM组织的上下文",
"parameters": {
"query": {"type": "string", "description": "要查询的问题或关键词"},
"max_memories": {"type": "integer", "description": "最大返回记忆数量", "default": 5}
}
},
{
"name": "add_memos_memory",
"description": "向MemOS添加新的记忆",
"parameters": {
"content": {"type": "string", "description": "记忆内容"},
"tags": {"type": "array", "description": "记忆标签", "items": {"type": "string"}}
}
},
{
"name": "provide_memory_feedback",
"description": "为记忆提供👍/👎反馈",
"parameters": {
"memory_id": {"type": "string", "description": "记忆ID"},
"feedback_type": {"type": "string", "description": "反馈类型", "enum": ["thumbs_up", "👍", "thumbs_down", "👎"]}
}
},
{
"name": "get_feedback_stats",
"description": "获取记忆反馈统计信息",
"parameters": {}
}
]
return APIResponse(
success=True,
message="MCP工具列表获取成功",
data={"tools": tools},
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取MCP工具列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取MCP工具列表失败: {str(e)}")
# ===== Dashboard专用API端点 =====
@app.get("/dashboard/topic-drift",
summary="获取主题漂移统计",
description="获取主题漂移检测的统计信息",
response_model=APIResponse)
async def get_topic_drift_stats(request: Request):
"""获取主题漂移统计"""
try:
# 获取主题漂移检测器统计信息
if hasattr(mvp_manager, 'topic_drift_detector') and mvp_manager.topic_drift_detector:
stats = mvp_manager.topic_drift_detector.get_statistics()
return APIResponse(
success=True,
message="主题漂移统计获取成功",
data=stats,
request_id=request.state.request_id
)
else:
# 如果没有主题漂移检测器,返回默认数据
default_stats = {
"total_queries": 0,
"drift_count": 0,
"drift_rate": "0.0%",
"window_size": 5,
"current_window_length": 0,
"drift_threshold": 0.5,
"min_similarity": 0.3
}
return APIResponse(
success=True,
message="主题漂移统计获取成功(默认数据)",
data=default_stats,
request_id=request.state.request_id
)
except Exception as e:
logger.error(f"获取主题漂移统计失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取主题漂移统计失败: {str(e)}")
return app, mvp_manager
# ===== 全局异常处理 =====
def setup_exception_handlers(app: FastAPI):
"""设置全局异常处理器"""
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP异常处理"""
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error_code=f"HTTP_{exc.status_code}",
error_message=exc.detail,
details={"path": str(request.url.path), "method": request.method}
).dict()
)
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
"""值错误处理"""
return JSONResponse(
status_code=400,
content=ErrorResponse(
error_code="VALIDATION_ERROR",
error_message=str(exc),
details={"path": str(request.url.path)}
).dict()
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
logger.exception("未处理的异常:")
return JSONResponse(
status_code=500,
content=ErrorResponse(
error_code="INTERNAL_ERROR",
error_message="内部服务器错误",
details={"path": str(request.url.path), "error_type": type(exc).__name__}
).dict()
)
if __name__ == "__main__":
# 创建应用
app, manager = create_official_api_app()
setup_exception_handlers(app)
# 启动服务器
print("🚀 启动MemOS官方API服务器...")
print("📖 API文档: http://localhost:8000/docs")
print("🔍 ReDoc文档: http://localhost:8000/redoc")
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)