mem0 Memory System
#!/usr/bin/env python3
"""
mem0 MCP Server
This is the main server file for the mem0 MCP server.
"""
import os
import argparse
import logging
from dotenv import load_dotenv
import uvicorn
import socket
# Colors for terminal output
BOLD = '\033[1m'
GREEN = '\033[0;32m'
YELLOW = '\033[0;33m'
BLUE = '\033[0;34m'
PURPLE = '\033[0;35m'
CYAN = '\033[0;36m'
WHITE = '\033[0;37m'
RED = '\033[0;31m'
NC = '\033[0m' # No Color
# ASCII Art Banner
BANNER = f"""
{BOLD}# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓{NC}
{BOLD}# ┃ ┃{NC}
{BOLD}# ┃ {GREEN}███╗ ███╗███████╗███╗ ███╗ ██████╗ ███╗ ███╗ ██████╗██████╗{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}████╗ ████║██╔════╝████╗ ████║██╔═══██╗ ████╗ ████║██╔════╝██╔══██╗{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}██╔████╔██║█████╗ ██╔████╔██║██║ ██║ ██╔████╔██║██║ ██████╔╝{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}██║╚██╔╝██║██╔══╝ ██║╚██╔╝██║██║ ██║ ██║╚██╔╝██║██║ ██╔═══╝{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}██║ ╚═╝ ██║███████╗██║ ╚═╝ ██║╚██████╔╝ ██║ ╚═╝ ██║╚██████╗██║{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝╚═╝{NC} {BOLD}┃{NC}
{BOLD}# ┃ ┃{NC}
{BOLD}# ┃ {GREEN}✨ SERVER MAIN ✨{NC} {BOLD}┃{NC}
{BOLD}# ┃ {GREEN}Made with ❤️ by Pink Pixel{NC} {BOLD}┃{NC}
{BOLD}# ┃ ┃{NC}
{BOLD}# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛{NC}
"""
# Load environment variables from .env file
load_dotenv()
def find_available_port(start_port, max_attempts=100):
"""
Find an available port starting from start_port
Args:
start_port: Port to start checking from
max_attempts: Maximum number of ports to check
Returns:
Available port number or None if no ports are available
"""
for port in range(start_port, start_port + max_attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
result = sock.connect_ex(('localhost', port))
if result != 0: # Port is available
return port
return None
def main():
"""Main function to start the server"""
parser = argparse.ArgumentParser(description="mem0 MCP Server")
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Host to bind the server to (default: 0.0.0.0)"
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind the server to (default: 8000)"
)
parser.add_argument(
"--provider",
type=str,
default=os.environ.get("MEM0_PROVIDER", "openai"),
help="LLM provider to use (default: from .env or openai)"
)
parser.add_argument(
"--embedding-provider",
type=str,
default=os.environ.get("MEM0_EMBEDDING_PROVIDER", "openai"),
help="Embedding provider to use (default: from .env or openai)"
)
parser.add_argument(
"--data-dir",
type=str,
default=os.environ.get("MEM0_DATA_DIR", "~/mem0_memories"),
help="Directory to store memory data (default: from .env or ~/mem0_memories)"
)
parser.add_argument(
"--debug",
action="store_true",
help="Run in debug mode"
)
parser.add_argument(
"--no-auto-port",
action="store_true",
help="Disable automatic port finding if specified port is in use"
)
args = parser.parse_args()
# Configure logging
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Find an available port if the specified port is in use
port = args.port
if not args.no_auto_port:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
result = sock.connect_ex((args.host if args.host != "0.0.0.0" else "localhost", port))
if result == 0: # Port is in use
available_port = find_available_port(port + 1)
if available_port:
print(f"{YELLOW}Port {port} is already in use. Using port {available_port} instead.{NC}")
port = available_port
else:
print(f"{YELLOW}Error: Could not find an available port. Please specify a different port.{NC}")
return
# Expand the data directory path if it contains a tilde
data_dir = os.path.expanduser(args.data_dir)
# Print server information with colored text
print(BANNER)
print(f"{BOLD}{GREEN}Starting server on {BOLD}http://{args.host}:{port}{NC}")
print(f"{CYAN}Using provider: {BOLD}{args.provider}{NC}")
print(f"{CYAN}Using embedding provider: {BOLD}{args.embedding_provider}{NC}")
print(f"{CYAN}Data directory: {BOLD}{data_dir}{NC}")
# Create data directory if it doesn't exist
os.makedirs(data_dir, exist_ok=True)
# Set environment variables for the server
os.environ["MEM0_PROVIDER"] = args.provider
os.environ["MEM0_EMBEDDING_PROVIDER"] = args.embedding_provider
os.environ["MEM0_DATA_DIR"] = data_dir
# Start the server
uvicorn.run(
"api:app",
host=args.host,
port=port,
log_level="debug" if args.debug else "info",
reload=args.debug
)
if __name__ == "__main__":
main()