server.py•28.8 kB
"""
GPT OpenMemory MCP Server
=========================
대화 메모리 관리 및 지속성 도구
원본: @peakmojo/mcp-openmemory (Node.js)
GPT Desktop용 FastAPI 포팅
"""
import os
import json
import logging
import sqlite3
import aiosqlite
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# Memory Service
# ============================================================================
class MemoryService:
"""대화 메모리 관리 서비스"""
def __init__(self, db_path: str):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
async def initialize(self):
"""데이터베이스 초기화"""
async with aiosqlite.connect(self.db_path) as db:
# 메모리 테이블
await db.execute('''
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
speaker TEXT NOT NULL,
message TEXT NOT NULL,
context TEXT NOT NULL,
timestamp INTEGER NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 메모리 추상화 테이블
await db.execute('''
CREATE TABLE IF NOT EXISTS memory_abstracts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
abstract TEXT NOT NULL,
last_processed_timestamp INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 인덱스 생성
await db.execute('CREATE INDEX IF NOT EXISTS idx_memories_context ON memories(context)')
await db.execute('CREATE INDEX IF NOT EXISTS idx_memories_timestamp ON memories(timestamp)')
await db.commit()
logger.info(f"Database initialized: {self.db_path}")
async def save_memory(self, speaker: str, message: str, context: str) -> Dict:
"""대화 메모리 저장"""
timestamp = int(datetime.now().timestamp() * 1000)
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
'INSERT INTO memories (speaker, message, context, timestamp) VALUES (?, ?, ?, ?)',
(speaker, message, context, timestamp)
)
await db.commit()
memory_id = cursor.lastrowid
return {
"success": True,
"id": memory_id,
"timestamp": timestamp,
"message": f"Memory saved for context '{context}'"
}
async def get_latest_memory_abstract(self) -> Dict:
"""최신 메모리 추상화 조회"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
'SELECT * FROM memory_abstracts ORDER BY id DESC LIMIT 1'
)
row = await cursor.fetchone()
if row:
return {
"abstract": row["abstract"],
"last_processed_timestamp": row["last_processed_timestamp"],
"created_at": row["created_at"]
}
return {
"abstract": None,
"message": "No memory abstract found. Create one using update_memory_abstract."
}
async def update_memory_abstract(self, abstract: str, last_processed_timestamp: Optional[int] = None) -> Dict:
"""메모리 추상화 업데이트"""
if last_processed_timestamp is None:
last_processed_timestamp = int(datetime.now().timestamp() * 1000)
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
'INSERT INTO memory_abstracts (abstract, last_processed_timestamp) VALUES (?, ?)',
(abstract, last_processed_timestamp)
)
await db.commit()
abstract_id = cursor.lastrowid
return {
"success": True,
"id": abstract_id,
"last_processed_timestamp": last_processed_timestamp,
"message": "Memory abstract updated successfully"
}
async def get_recent_memories(self, max_days: int = 3) -> Dict:
"""최근 메모리 조회"""
cutoff_timestamp = int((datetime.now() - timedelta(days=max_days)).timestamp() * 1000)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
'SELECT * FROM memories WHERE timestamp >= ? ORDER BY timestamp DESC',
(cutoff_timestamp,)
)
rows = await cursor.fetchall()
memories = []
for row in rows:
memories.append({
"id": row["id"],
"speaker": row["speaker"],
"message": row["message"],
"context": row["context"],
"timestamp": row["timestamp"],
"created_at": row["created_at"]
})
return {
"memories": memories,
"count": len(memories),
"max_days": max_days,
"cutoff_timestamp": cutoff_timestamp
}
async def get_stats(self) -> Dict:
"""메모리 통계"""
async with aiosqlite.connect(self.db_path) as db:
# 총 메모리 수
cursor = await db.execute('SELECT COUNT(*) FROM memories')
total_memories = (await cursor.fetchone())[0]
# 컨텍스트별 통계
cursor = await db.execute(
'SELECT context, COUNT(*) as count FROM memories GROUP BY context ORDER BY count DESC'
)
context_stats = await cursor.fetchall()
# 스피커별 통계
cursor = await db.execute(
'SELECT speaker, COUNT(*) as count FROM memories GROUP BY speaker ORDER BY count DESC'
)
speaker_stats = await cursor.fetchall()
# 추상화 수
cursor = await db.execute('SELECT COUNT(*) FROM memory_abstracts')
total_abstracts = (await cursor.fetchone())[0]
return {
"total_memories": total_memories,
"total_abstracts": total_abstracts,
"contexts": [{"context": c[0], "count": c[1]} for c in context_stats],
"speakers": [{"speaker": s[0], "count": s[1]} for s in speaker_stats]
}
async def search_memories(self, query: str, context: Optional[str] = None, limit: int = 20) -> Dict:
"""메모리 검색"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if context:
cursor = await db.execute(
'''SELECT * FROM memories
WHERE context = ? AND message LIKE ?
ORDER BY timestamp DESC LIMIT ?''',
(context, f'%{query}%', limit)
)
else:
cursor = await db.execute(
'''SELECT * FROM memories
WHERE message LIKE ?
ORDER BY timestamp DESC LIMIT ?''',
(f'%{query}%', limit)
)
rows = await cursor.fetchall()
memories = []
for row in rows:
memories.append({
"id": row["id"],
"speaker": row["speaker"],
"message": row["message"],
"context": row["context"],
"timestamp": row["timestamp"]
})
return {
"query": query,
"context": context,
"results": memories,
"count": len(memories)
}
async def export_to_markdown(self, output_path: str, context: Optional[str] = None,
max_days: Optional[int] = None, title: Optional[str] = None) -> Dict:
"""메모리를 마크다운 파일로 내보내기"""
import aiofiles
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
query = 'SELECT * FROM memories'
params = []
conditions = []
if context:
conditions.append('context = ?')
params.append(context)
if max_days:
cutoff = int((datetime.now() - timedelta(days=max_days)).timestamp() * 1000)
conditions.append('timestamp >= ?')
params.append(cutoff)
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
query += ' ORDER BY timestamp ASC'
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
if not rows:
return {"error": "No memories found to export"}
# 마크다운 생성
md_title = title or f"GPT Conversation Log - {datetime.now().strftime('%Y-%m-%d')}"
md_content = f"# {md_title}\n\n"
md_content += f"**Exported**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
md_content += f"**Total Messages**: {len(rows)}\n\n"
md_content += "---\n\n"
current_date = None
for row in rows:
msg_time = datetime.fromtimestamp(row["timestamp"] / 1000)
msg_date = msg_time.strftime('%Y-%m-%d')
# 날짜 구분
if msg_date != current_date:
current_date = msg_date
md_content += f"\n## {msg_date}\n\n"
# 메시지 포맷
time_str = msg_time.strftime('%H:%M:%S')
speaker = row["speaker"].upper()
context_tag = f" `[{row['context']}]`" if row["context"] else ""
if speaker == "USER":
md_content += f"### 👤 User ({time_str}){context_tag}\n\n"
elif speaker == "AGENT":
md_content += f"### 🤖 GPT ({time_str}){context_tag}\n\n"
else:
md_content += f"### 📌 {speaker} ({time_str}){context_tag}\n\n"
md_content += f"{row['message']}\n\n"
# 파일 저장
try:
# 디렉토리 생성
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
async with aiofiles.open(output_path, 'w', encoding='utf-8') as f:
await f.write(md_content)
return {
"success": True,
"path": os.path.abspath(output_path),
"messages_exported": len(rows),
"file_size": len(md_content.encode('utf-8'))
}
except Exception as e:
return {"error": f"Failed to write file: {str(e)}"}
async def save_document(self, content: str, file_path: str, title: Optional[str] = None) -> Dict:
"""문서를 로컬 파일로 저장"""
import aiofiles
try:
# 디렉토리 생성
output_dir = os.path.dirname(file_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# 마크다운 헤더 추가 (선택)
if title and file_path.endswith('.md'):
full_content = f"# {title}\n\n"
full_content += f"*Saved: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n"
full_content += "---\n\n"
full_content += content
else:
full_content = content
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(full_content)
return {
"success": True,
"path": os.path.abspath(file_path),
"file_size": len(full_content.encode('utf-8')),
"title": title
}
except Exception as e:
return {"error": f"Failed to save document: {str(e)}"}
async def list_exported_files(self, directory: str) -> Dict:
"""내보낸 파일 목록 조회"""
try:
abs_dir = os.path.abspath(directory)
if not os.path.exists(abs_dir):
return {"error": f"Directory not found: {directory}"}
files = []
for f in os.listdir(abs_dir):
if f.endswith('.md'):
file_path = os.path.join(abs_dir, f)
stat = os.stat(file_path)
files.append({
"name": f,
"path": file_path,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
files.sort(key=lambda x: x["modified"], reverse=True)
return {
"directory": abs_dir,
"files": files,
"count": len(files)
}
except Exception as e:
return {"error": str(e)}
# ============================================================================
# MCP Server Configuration
# ============================================================================
SERVER_INFO = {
"name": "gpt-openmemory-mcp",
"version": "1.0.0",
"description": "Memory management tool for GPT Desktop"
}
TOOLS = [
{
"name": "save_memory",
"description": """대화 메모리를 저장합니다.
중요한 대화 내용을 저장하여 나중에 참조할 수 있습니다.
컨텍스트를 지정하여 관련 메모리를 그룹화합니다.
사용 예시:
- 프로젝트 관련 대화 저장
- 사용자 선호도 기록
- 중요한 결정사항 보존
""",
"inputSchema": {
"type": "object",
"properties": {
"speaker": {
"type": "string",
"description": "발화자 (agent, user, system)"
},
"message": {
"type": "string",
"description": "저장할 메시지 내용"
},
"context": {
"type": "string",
"description": "컨텍스트 (프로젝트명, 주제 등)"
}
},
"required": ["speaker", "message", "context"]
}
},
{
"name": "recall_memory_abstract",
"description": """저장된 메모리 추상화(요약)를 조회합니다.
이전 대화의 요약된 컨텍스트를 불러옵니다.
새 대화 시작 시 이전 맥락을 파악하는데 유용합니다.
""",
"inputSchema": {
"type": "object",
"properties": {
"force_refresh": {
"type": "boolean",
"description": "캐시 무시 여부 (기본: false)",
"default": False
}
}
}
},
{
"name": "update_memory_abstract",
"description": """메모리 추상화(요약)를 업데이트합니다.
최근 대화 내용을 분석하여 새로운 요약을 저장합니다.
워크플로우:
1. recall_memory_abstract로 현재 요약 조회
2. get_recent_memories로 최근 대화 조회
3. 내용을 종합하여 새 요약 작성
4. update_memory_abstract로 저장
""",
"inputSchema": {
"type": "object",
"properties": {
"abstract": {
"type": "string",
"description": "저장할 메모리 추상화 내용"
},
"last_processed_timestamp": {
"type": "integer",
"description": "마지막 처리된 메시지 타임스탬프"
}
},
"required": ["abstract"]
}
},
{
"name": "get_recent_memories",
"description": """최근 대화 메모리를 조회합니다.
지정된 기간 내의 원본 대화 메시지들을 반환합니다.
메모리 추상화 업데이트 시 사용합니다.
""",
"inputSchema": {
"type": "object",
"properties": {
"max_days": {
"type": "integer",
"description": "조회 기간 (일, 기본: 3)",
"default": 3
},
"force_refresh": {
"type": "boolean",
"description": "캐시 무시 여부",
"default": False
}
}
}
},
{
"name": "search_memories",
"description": """메모리를 검색합니다.
키워드로 저장된 메모리를 검색합니다.
컨텍스트 필터링도 가능합니다.
""",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "검색 키워드"
},
"context": {
"type": "string",
"description": "컨텍스트 필터 (선택)"
},
"limit": {
"type": "integer",
"description": "최대 결과 수 (기본: 20)",
"default": 20
}
},
"required": ["query"]
}
},
{
"name": "get_memory_stats",
"description": """메모리 통계를 조회합니다.
저장된 메모리의 통계 정보를 반환합니다.
""",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "export_to_markdown",
"description": """저장된 대화를 마크다운 파일로 내보냅니다.
대화 내용을 날짜별로 정리하여 .md 파일로 저장합니다.
원하는 폴더에 저장할 수 있습니다.
사용 예시:
- 전체 대화 내보내기
- 특정 컨텍스트만 내보내기
- 최근 N일간의 대화만 내보내기
""",
"inputSchema": {
"type": "object",
"properties": {
"output_path": {
"type": "string",
"description": "저장할 파일 경로 (예: C:/Users/me/Documents/chat-log.md)"
},
"context": {
"type": "string",
"description": "특정 컨텍스트만 내보내기 (선택)"
},
"max_days": {
"type": "integer",
"description": "최근 N일간만 내보내기 (선택)"
},
"title": {
"type": "string",
"description": "문서 제목 (선택)"
}
},
"required": ["output_path"]
}
},
{
"name": "save_document",
"description": """문서를 로컬 파일로 저장합니다.
GPT가 생성한 콘텐츠를 원하는 폴더에 저장합니다.
마크다운, 텍스트 등 다양한 형식을 지원합니다.
사용 예시:
- 대화 요약 저장
- 생성된 코드 저장
- 메모나 노트 저장
""",
"inputSchema": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "저장할 내용"
},
"file_path": {
"type": "string",
"description": "저장할 파일 경로 (예: C:/Users/me/Notes/summary.md)"
},
"title": {
"type": "string",
"description": "문서 제목 (마크다운 파일에 헤더로 추가)"
}
},
"required": ["content", "file_path"]
}
},
{
"name": "list_exported_files",
"description": """내보낸 마크다운 파일 목록을 조회합니다.
지정된 폴더의 .md 파일들을 나열합니다.
""",
"inputSchema": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "조회할 폴더 경로"
}
},
"required": ["directory"]
}
}
]
# 전역 메모리 서비스
memory_service: Optional[MemoryService] = None
async def handle_tool_call(name: str, arguments: dict) -> dict:
"""도구 호출 처리"""
global memory_service
if not memory_service:
return {"content": [{"type": "text", "text": "Error: Memory service not initialized"}], "isError": True}
if name == "save_memory":
result = await memory_service.save_memory(
arguments["speaker"],
arguments["message"],
arguments["context"]
)
return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]}
elif name == "recall_memory_abstract":
result = await memory_service.get_latest_memory_abstract()
return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]}
elif name == "update_memory_abstract":
result = await memory_service.update_memory_abstract(
arguments["abstract"],
arguments.get("last_processed_timestamp")
)
return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]}
elif name == "get_recent_memories":
result = await memory_service.get_recent_memories(
arguments.get("max_days", 3)
)
return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]}
elif name == "search_memories":
result = await memory_service.search_memories(
arguments["query"],
arguments.get("context"),
arguments.get("limit", 20)
)
return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]}
elif name == "get_memory_stats":
result = await memory_service.get_stats()
output = "## Memory Statistics\n\n"
output += f"- **Total Memories**: {result['total_memories']}\n"
output += f"- **Total Abstracts**: {result['total_abstracts']}\n\n"
if result['contexts']:
output += "### Contexts\n"
for ctx in result['contexts'][:10]:
output += f"- {ctx['context']}: {ctx['count']}\n"
if result['speakers']:
output += "\n### Speakers\n"
for spk in result['speakers']:
output += f"- {spk['speaker']}: {spk['count']}\n"
return {"content": [{"type": "text", "text": output}]}
elif name == "export_to_markdown":
result = await memory_service.export_to_markdown(
arguments["output_path"],
arguments.get("context"),
arguments.get("max_days"),
arguments.get("title")
)
if "error" in result:
return {"content": [{"type": "text", "text": result["error"]}], "isError": True}
output = f"## Markdown Export Complete\n\n"
output += f"- **Path**: `{result['path']}`\n"
output += f"- **Messages Exported**: {result['messages_exported']}\n"
output += f"- **File Size**: {result['file_size']:,} bytes\n"
return {"content": [{"type": "text", "text": output}]}
elif name == "save_document":
result = await memory_service.save_document(
arguments["content"],
arguments["file_path"],
arguments.get("title")
)
if "error" in result:
return {"content": [{"type": "text", "text": result["error"]}], "isError": True}
output = f"## Document Saved\n\n"
output += f"- **Path**: `{result['path']}`\n"
output += f"- **Size**: {result['file_size']:,} bytes\n"
if result.get("title"):
output += f"- **Title**: {result['title']}\n"
return {"content": [{"type": "text", "text": output}]}
elif name == "list_exported_files":
result = await memory_service.list_exported_files(arguments["directory"])
if "error" in result:
return {"content": [{"type": "text", "text": result["error"]}], "isError": True}
output = f"## Markdown Files in `{result['directory']}`\n\n"
if result["files"]:
for f in result["files"]:
output += f"- **{f['name']}** ({f['size']:,} bytes) - {f['modified']}\n"
else:
output += "No markdown files found.\n"
output += f"\n**Total**: {result['count']} files"
return {"content": [{"type": "text", "text": output}]}
else:
return {"content": [{"type": "text", "text": f"Unknown tool: {name}"}], "isError": True}
# ============================================================================
# FastAPI Application
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
global memory_service
logger.info("=" * 50)
logger.info("GPT OpenMemory MCP Server Starting")
# 데이터베이스 경로 설정
db_path = os.environ.get(
"MEMORY_DB_PATH",
os.path.expanduser("~/Documents/gpt-openmemory-mcp/memory.sqlite")
)
memory_service = MemoryService(db_path)
await memory_service.initialize()
logger.info(f"Database: {db_path}")
logger.info("=" * 50)
yield
logger.info("Server shutting down")
app = FastAPI(
title="GPT OpenMemory MCP",
description="Memory management tool for GPT Desktop",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {"status": "running", "server": SERVER_INFO}
@app.get("/health")
async def health():
global memory_service
if memory_service:
stats = await memory_service.get_stats()
return {"status": "healthy", "memories": stats["total_memories"]}
return {"status": "unhealthy", "error": "Memory service not initialized"}
@app.post("/mcp")
async def mcp_endpoint(request: Request):
try:
body = await request.json()
if body.get("method") == "initialize":
return JSONResponse({
"jsonrpc": "2.0",
"result": {
"protocolVersion": "2024-11-05",
"serverInfo": SERVER_INFO,
"capabilities": {"tools": {}}
},
"id": body.get("id")
})
elif body.get("method") == "tools/list":
return JSONResponse({
"jsonrpc": "2.0",
"result": {"tools": TOOLS},
"id": body.get("id")
})
elif body.get("method") == "tools/call":
params = body.get("params", {})
result = await handle_tool_call(params.get("name"), params.get("arguments", {}))
return JSONResponse({
"jsonrpc": "2.0",
"result": result,
"id": body.get("id")
})
else:
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32601, "message": "Method not found"},
"id": body.get("id")
})
except Exception as e:
logger.error(f"Error: {e}")
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32603, "message": str(e)},
"id": None
}, status_code=400)
def main():
print("\n" + "=" * 50)
print(" GPT OpenMemory MCP Server")
print("=" * 50)
print(" URL: http://127.0.0.1:8769")
print(" ngrok: ngrok http 8769")
print("=" * 50 + "\n")
uvicorn.run(app, host="127.0.0.1", port=8769, log_level="info")
if __name__ == "__main__":
main()