main.py•9.51 kB
"""SelfMemory MCP Server
Implements an MCP (Model Context Protocol) server that provides memory operations
for SelfMemory using simple Bearer token authentication.
Features:
- Simple Bearer token authentication with SelfMemory API keys
- Per-request client creation for proper user isolation
- Graceful error handling when core server is unavailable
- Tools: add_memory and search_memories
- Streamable HTTP transport for production deployment
"""
import logging
import os
import sys
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
from mcp.server.fastmcp import Context, FastMCP
from selfmemory import SelfMemoryClient
# Ensure project root is in sys.path (two levels up from this file)
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.append(str(PROJECT_ROOT))
# Add selfmemory-mcp to path for telemetry imports
sys.path.insert(0, str(Path(__file__).parent))
load_dotenv() # Load environment variables from .env
# Import telemetry after adding to path
from telemetry import init_logging, init_telemetry # noqa: E402
# Initialize logging based on environment (console for dev, file for prod)
init_logging()
logger = logging.getLogger(__name__)
# Initialize OpenTelemetry if enabled (optional)
init_telemetry()
# Configuration
CORE_SERVER_HOST = os.getenv("SELFMEMORY_API_HOST", "http://localhost:8081")
MCP_SERVER_PORT = int(os.getenv("MCP_SERVER_PORT", "5055"))
MCP_SERVER_HOST = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
# Initialize MCP server without OAuth (simple Bearer token approach)
mcp = FastMCP(
name="SelfMemory",
instructions="Memory management server for SelfMemory - store and search personal memories with metadata",
stateless_http=True,
json_response=True,
port=MCP_SERVER_PORT,
host=MCP_SERVER_HOST,
)
logger.info(f"SelfMemory MCP Server initialized - Core server: {CORE_SERVER_HOST}")
def _extract_memory_contents(search_result: dict[str, Any]) -> list[str]:
"""Extract only content strings from search results for LLM consumption.
Args:
search_result: Full search result dictionary from client.search()
Returns:
List of memory content strings, empty list if no results
"""
if "results" not in search_result or not search_result["results"]:
return []
return [memory.get("content", "") for memory in search_result["results"]]
def _generate_memory_confirmation(content: str) -> str:
"""Generate a personalized confirmation message for stored memory.
Args:
content: The memory content that was stored
Returns:
Personalized confirmation message string
"""
return "I learnt more about you with this!"
def validate_and_get_client(ctx: Context) -> SelfMemoryClient:
"""
Validate request and create authenticated SelfMemoryClient.
Supports both dashboard session auth and direct API key auth.
Args:
ctx: FastMCP Context containing request information
Returns:
SelfMemoryClient: Client authenticated with the user's token
Raises:
ValueError: If authentication fails
"""
try:
# Extract headers from the HTTP request
request = ctx.request_context.request
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise ValueError("No valid authorization header found")
token = auth_header.replace("Bearer ", "")
# Create and validate client - this will raise ValueError if token is invalid
client = SelfMemoryClient(api_key=token, host=CORE_SERVER_HOST)
logger.info(
f"✅ MCP: API key authenticated for user: {client.user_info.get('user_id', 'unknown')}"
)
return client
except AttributeError as e:
logger.error(f"Context structure error: {e}")
raise ValueError("Request context not available") from e
except ValueError:
# Re-raise ValueError as-is (these are our custom auth errors)
raise
except Exception as e:
logger.error(f"Authentication error: {e}")
raise ValueError("Authentication failed") from e
@mcp.tool()
async def add_memory(
content: str, ctx: Context, tags: str = "", people: str = "", category: str = ""
) -> str:
"""
Store new memories with metadata.
Args:
content: The memory content to store
tags: Optional comma-separated tags (e.g., "work,meeting,important")
people: Optional comma-separated people mentioned (e.g., "Alice,Bob")
category: Optional topic category (e.g., "work", "personal", "learning")
Returns:
Personalized confirmation message string
Examples:
- add_memory("Had a great meeting about the new project", tags="work,meeting", people="Sarah,Mike") -> "I learnt more about you with this!"
- add_memory("Learned about Python decorators today", category="learning") -> "I learnt more about you with this!"
- add_memory("Birthday party this weekend", tags="personal,social", people="Emma") -> "I learnt more about you with this!"
"""
try:
logger.info(f"Adding memory: {content[:50]}...")
# Validate token and get authenticated client
client = validate_and_get_client(ctx)
# Format data in the correct selfmemory format that the core server expects
memory_data = {
"messages": [{"role": "user", "content": content}],
"metadata": {
"tags": tags,
"people_mentioned": people,
"topic_category": category,
},
}
# Use the client's underlying httpx client to send the correct format
response = client.client.post("/api/memories", json=memory_data)
response.raise_for_status()
# Close the client connection
client.close()
logger.info("Memory added successfully")
# Generate personalized confirmation message
return _generate_memory_confirmation(content)
except ValueError as e:
error_msg = f"Authentication error: {str(e)}"
logger.error(error_msg)
return f"Authentication failed: {str(e)}"
except Exception as e:
error_msg = f"Failed to add memory: {str(e)}"
logger.error(error_msg)
return f"Failed to store memory: {str(e)}"
@mcp.tool()
async def search_memories(
query: str,
ctx: Context,
limit: int = 10,
tags: list[str] | None = None,
people: list[str] | None = None,
category: str | None = None,
threshold: float | None = None,
) -> list[str]:
"""
Search memories using semantic search with optional filters.
Args:
query: The search query (e.g., "meeting notes", "python learning", "weekend plans")
limit: Maximum number of results to return (default: 10, max: 50)
tags: Optional list of tags to filter by (e.g., ["work", "important"])
people: Optional list of people to filter by (e.g., ["Alice", "Bob"])
category: Optional category filter (e.g., "work", "personal")
threshold: Optional minimum similarity score (0.0 to 1.0)
Returns:
List of memory content strings for LLM consumption
Examples:
- search_memories("project meeting") -> ["Had a meeting about the new project", ...]
- search_memories("Python", tags=["learning"], limit=5) -> ["Learned Python decorators", ...]
- search_memories("birthday", people=["Emma"], category="personal") -> ["Emma's birthday party", ...]
"""
try:
logger.info(f"Searching memories: '{query}'")
# Validate limit
if limit > 50:
limit = 50
elif limit < 1:
limit = 1
# Validate token and get authenticated client
client = validate_and_get_client(ctx)
# Use SelfMemoryClient properly (no circular dependency)
result = client.search(
query=query,
limit=limit,
tags=tags,
people_mentioned=people,
topic_category=category,
threshold=threshold,
)
# Close the client connection
client.close()
results_count = len(result.get("results", []))
logger.info(f"Search completed: {results_count} results found")
# Extract only content strings for LLM consumption
return _extract_memory_contents(result)
except ValueError as e:
error_msg = f"Authentication error: {str(e)}"
logger.error(error_msg)
return []
except Exception as e:
error_msg = f"Search failed: {str(e)}"
logger.error(error_msg)
return []
def main():
"""Main entry point for the SelfMemory MCP server."""
logger.info("=" * 60)
logger.info("🚀 Starting SelfMemory MCP Server")
logger.info("=" * 60)
logger.info(f"📡 Core Server: {CORE_SERVER_HOST}")
logger.info(f"🌐 MCP Server: http://{MCP_SERVER_HOST}:{MCP_SERVER_PORT}")
logger.info("🔒 Authentication: Bearer Token")
logger.info("🛠️ Tools: add_memory, search_memories")
logger.info("=" * 60)
try:
# Run server with streamable HTTP transport
mcp.run(transport="streamable-http")
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
raise
if __name__ == "__main__":
main()