mem0 Memory System

#!/usr/bin/env python3 """ mem0 MCP Server This is the main server file for the mem0 MCP server. """ import os import argparse import logging from dotenv import load_dotenv import uvicorn import socket # Colors for terminal output BOLD = '\033[1m' GREEN = '\033[0;32m' YELLOW = '\033[0;33m' BLUE = '\033[0;34m' PURPLE = '\033[0;35m' CYAN = '\033[0;36m' WHITE = '\033[0;37m' RED = '\033[0;31m' NC = '\033[0m' # No Color # ASCII Art Banner BANNER = f""" {BOLD}# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓{NC} {BOLD}# ┃ ┃{NC} {BOLD}# ┃ {GREEN}███╗ ███╗███████╗███╗ ███╗ ██████╗ ███╗ ███╗ ██████╗██████╗{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}████╗ ████║██╔════╝████╗ ████║██╔═══██╗ ████╗ ████║██╔════╝██╔══██╗{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}██╔████╔██║█████╗ ██╔████╔██║██║ ██║ ██╔████╔██║██║ ██████╔╝{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}██║╚██╔╝██║██╔══╝ ██║╚██╔╝██║██║ ██║ ██║╚██╔╝██║██║ ██╔═══╝{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}██║ ╚═╝ ██║███████╗██║ ╚═╝ ██║╚██████╔╝ ██║ ╚═╝ ██║╚██████╗██║{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝╚═╝{NC} {BOLD}┃{NC} {BOLD}# ┃ ┃{NC} {BOLD}# ┃ {GREEN}✨ SERVER MAIN ✨{NC} {BOLD}┃{NC} {BOLD}# ┃ {GREEN}Made with ❤️ by Pink Pixel{NC} {BOLD}┃{NC} {BOLD}# ┃ ┃{NC} {BOLD}# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛{NC} """ # Load environment variables from .env file load_dotenv() def find_available_port(start_port, max_attempts=100): """ Find an available port starting from start_port Args: start_port: Port to start checking from max_attempts: Maximum number of ports to check Returns: Available port number or None if no ports are available """ for port in range(start_port, start_port + max_attempts): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: result = sock.connect_ex(('localhost', port)) if result != 0: # Port is available return port return None def main(): """Main function to start the server""" parser = argparse.ArgumentParser(description="mem0 MCP Server") parser.add_argument( "--host", type=str, default="0.0.0.0", help="Host to bind the server to (default: 0.0.0.0)" ) parser.add_argument( "--port", type=int, default=8000, help="Port to bind the server to (default: 8000)" ) parser.add_argument( "--provider", type=str, default=os.environ.get("MEM0_PROVIDER", "openai"), help="LLM provider to use (default: from .env or openai)" ) parser.add_argument( "--embedding-provider", type=str, default=os.environ.get("MEM0_EMBEDDING_PROVIDER", "openai"), help="Embedding provider to use (default: from .env or openai)" ) parser.add_argument( "--data-dir", type=str, default=os.environ.get("MEM0_DATA_DIR", "~/mem0_memories"), help="Directory to store memory data (default: from .env or ~/mem0_memories)" ) parser.add_argument( "--debug", action="store_true", help="Run in debug mode" ) parser.add_argument( "--no-auto-port", action="store_true", help="Disable automatic port finding if specified port is in use" ) args = parser.parse_args() # Configure logging log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig( level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Find an available port if the specified port is in use port = args.port if not args.no_auto_port: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: result = sock.connect_ex((args.host if args.host != "0.0.0.0" else "localhost", port)) if result == 0: # Port is in use available_port = find_available_port(port + 1) if available_port: print(f"{YELLOW}Port {port} is already in use. Using port {available_port} instead.{NC}") port = available_port else: print(f"{YELLOW}Error: Could not find an available port. Please specify a different port.{NC}") return # Expand the data directory path if it contains a tilde data_dir = os.path.expanduser(args.data_dir) # Print server information with colored text print(BANNER) print(f"{BOLD}{GREEN}Starting server on {BOLD}http://{args.host}:{port}{NC}") print(f"{CYAN}Using provider: {BOLD}{args.provider}{NC}") print(f"{CYAN}Using embedding provider: {BOLD}{args.embedding_provider}{NC}") print(f"{CYAN}Data directory: {BOLD}{data_dir}{NC}") # Create data directory if it doesn't exist os.makedirs(data_dir, exist_ok=True) # Set environment variables for the server os.environ["MEM0_PROVIDER"] = args.provider os.environ["MEM0_EMBEDDING_PROVIDER"] = args.embedding_provider os.environ["MEM0_DATA_DIR"] = data_dir # Start the server uvicorn.run( "api:app", host=args.host, port=port, log_level="debug" if args.debug else "info", reload=args.debug ) if __name__ == "__main__": main()