Skip to main content
Glama
SNYCFIRE-CORE

Letta MCP Server Railway Edition

server.py44.6 kB
#!/usr/bin/env python3 """ Letta MCP Server - Main server implementation using FastMCP """ import os import json import asyncio import logging from typing import Any, Dict, List, Optional, Union from datetime import datetime import httpx from fastmcp import FastMCP from .config import LettaConfig, load_config from .models import AgentInfo, MemoryBlock, ToolInfo, Message, StreamChunk from .exceptions import LettaMCPError, APIError, ConfigurationError from .utils import ( parse_message_response, format_memory_blocks, validate_agent_id, extract_assistant_message, create_retry_client ) # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LettaMCPServer: """Main Letta MCP Server implementation""" def __init__(self, config: Optional[LettaConfig] = None): """Initialize the Letta MCP Server""" self.config = config or load_config() self.validate_config() # Initialize FastMCP server self.mcp = FastMCP( "Letta MCP Server Railway", instructions=""" Universal MCP server providing seamless integration between any AI client and Letta.ai agents. Supports all MCP-compatible clients: - Claude Desktop, GitHub Copilot, Cursor, Replit, Sourcegraph Cody, OpenAI ChatGPT, and more Key features: - Stateful agent conversations (no need to send history) - Memory block management (human, persona, custom) - Tool orchestration and workflow rules - Real-time streaming responses - Cross-agent communication Remember: Letta agents maintain their own conversation history across all clients! """ ) # Create HTTP client with retry logic self.client = create_retry_client( base_url=self.config.base_url, headers={ "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" }, timeout=self.config.timeout, max_retries=self.config.max_retries ) # Register all tools self._register_tools() # Register resources self._register_resources() def validate_config(self): """Validate the configuration""" if not self.config.api_key and self.config.base_url == "https://api.letta.com": raise ConfigurationError( "LETTA_API_KEY is required for Letta Cloud. " "Set it in environment or config file." ) def _register_tools(self): """Register all MCP tools""" # Agent Management self._register_agent_tools() # Conversation Tools self._register_conversation_tools() # Memory Management self._register_memory_tools() # Tool Management self._register_tool_management() # Utility Tools self._register_utility_tools() def _register_agent_tools(self): """Register agent management tools""" @self.mcp.tool() async def letta_list_agents( filter: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> Dict[str, Any]: """ List all available Letta agents with pagination support. Args: filter: Optional text to filter agent names limit: Number of agents to return (max 100) offset: Offset for pagination Returns: List of agent summaries (id, name, description, tool_count, created_at, model) to stay within token limits. Use letta_get_agent for full details. """ try: params = {"limit": min(limit, 100), "offset": offset} response = await self.client.get("/v1/agents", params=params) response.raise_for_status() agents = response.json() # Apply filter if provided if filter: filter_lower = filter.lower() agents = [ a for a in agents if filter_lower in a.get("name", "").lower() or filter_lower in a.get("description", "").lower() ] # Return summarized agents to stay under token limit summarized_agents = [] for agent in agents: summarized_agents.append({ "id": agent.get("id"), "name": agent.get("name"), "description": agent.get("description"), "tool_count": len(agent.get("tools", [])), "created_at": agent.get("created_at"), "model": agent.get("model", "") }) return { "success": True, "count": len(summarized_agents), "total": response.headers.get("X-Total-Count", len(agents)), "agents": summarized_agents } except httpx.HTTPError as e: logger.error(f"HTTP error listing agents: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Error listing agents: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_create_agent( name: str, description: Optional[str] = None, human_memory: Optional[str] = None, persona_memory: Optional[str] = None, custom_blocks: Optional[List[Dict[str, str]]] = None, model: Optional[str] = None, tools: Optional[List[str]] = None ) -> Dict[str, Any]: """ Create a new Letta agent with memory blocks and tools. Args: name: Name for the new agent description: Description of the agent's purpose human_memory: Information about the human user persona_memory: Agent's persona and behavior custom_blocks: Additional memory blocks with label, value, description model: LLM model to use (default: from config) tools: List of tool names to attach Returns: Created agent details """ try: # Build memory blocks memory_blocks = [] if human_memory: memory_blocks.append({ "label": "human", "value": human_memory }) else: memory_blocks.append({ "label": "human", "value": "The user is interested in AI development." }) if persona_memory: memory_blocks.append({ "label": "persona", "value": persona_memory }) else: memory_blocks.append({ "label": "persona", "value": f"I am {name}, an AI assistant. I am helpful, professional, and knowledgeable." }) # Add custom blocks if custom_blocks: for block in custom_blocks: if "label" in block and "value" in block: memory_blocks.append(block) # Create agent payload = { "name": name, "description": description or f"AI assistant named {name}", "memory_blocks": memory_blocks, "model": model or self.config.default_model, "embedding": self.config.default_embedding, "tools": tools or ["web_search", "run_code"] } response = await self.client.post("/v1/agents", json=payload) response.raise_for_status() agent = response.json() return { "success": True, "agent": agent, "message": f"Successfully created agent '{name}' with ID: {agent['id']}" } except httpx.HTTPError as e: logger.error(f"HTTP error creating agent: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Error creating agent: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_get_agent(agent_id: str) -> Dict[str, Any]: """Get detailed information about a specific Letta agent.""" try: validate_agent_id(agent_id) response = await self.client.get(f"/v1/agents/{agent_id}") response.raise_for_status() agent = response.json() # Format the response nicely return { "success": True, "agent": { "id": agent["id"], "name": agent.get("name", "Unknown"), "description": agent.get("description", ""), "model": agent.get("model", ""), "created_at": agent.get("created_at", ""), "last_modified": agent.get("last_modified", ""), "tools": agent.get("tools", []), "memory_blocks": len(agent.get("memory_blocks", [])), "message_count": agent.get("message_count", 0) } } except Exception as e: logger.error(f"Error getting agent info: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_update_agent( agent_id: str, name: Optional[str] = None, description: Optional[str] = None, model: Optional[str] = None ) -> Dict[str, Any]: """Update an existing agent's configuration.""" try: validate_agent_id(agent_id) # Build update payload updates = {} if name is not None: updates["name"] = name if description is not None: updates["description"] = description if model is not None: updates["model"] = model if not updates: return { "success": False, "error": "No updates provided" } response = await self.client.patch( f"/v1/agents/{agent_id}", json=updates ) response.raise_for_status() return { "success": True, "message": f"Successfully updated agent {agent_id}", "updates": updates } except Exception as e: logger.error(f"Error updating agent: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_delete_agent( agent_id: str, confirm: bool = False ) -> Dict[str, Any]: """ Delete a Letta agent (requires confirmation). Args: agent_id: ID of the agent to delete confirm: Must be True to confirm deletion Returns: Deletion status """ try: if not confirm: return { "success": False, "error": "Deletion requires confirm=True to prevent accidents" } validate_agent_id(agent_id) response = await self.client.delete(f"/v1/agents/{agent_id}") response.raise_for_status() return { "success": True, "message": f"Successfully deleted agent {agent_id}" } except Exception as e: logger.error(f"Error deleting agent: {e}") return {"success": False, "error": str(e)} def _register_conversation_tools(self): """Register conversation management tools""" @self.mcp.tool() async def letta_send_message( agent_id: str, message: str, stream: bool = False ) -> Dict[str, Any]: """ Send a message to a Letta agent and get the response. Args: agent_id: ID of the agent to message message: Message content to send stream: Whether to stream the response Returns: Agent's response with tool calls and reasoning """ try: validate_agent_id(agent_id) if stream: return await self._stream_message(agent_id, message) response = await self.client.post( f"/v1/agents/{agent_id}/messages", json={ "messages": [{"role": "user", "content": message}], "stream_steps": False, "stream_tokens": False } ) response.raise_for_status() result = response.json() # Parse the response parsed = parse_message_response(result) return { "success": True, "agent_id": agent_id, "message": message, "response": parsed["assistant_message"], "tool_calls": parsed["tool_calls"], "reasoning": parsed["reasoning"], "full_response": result } except Exception as e: logger.error(f"Error sending message: {e}") return { "success": False, "error": str(e), "agent_id": agent_id } async def _stream_message(self, agent_id: str, message: str) -> Dict[str, Any]: """Stream a message response from an agent""" try: chunks = [] async with self.client.stream( "POST", f"/v1/agents/{agent_id}/messages", json={ "messages": [{"role": "user", "content": message}], "stream_steps": True, "stream_tokens": True } ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): try: chunk_data = json.loads(line[6:]) chunks.append(chunk_data) # Yield progress for UI if chunk_data.get("messageType") == "assistant_message": logger.info(f"Streaming: {chunk_data.get('content', '')}") except json.JSONDecodeError: continue # Combine chunks into final response final_content = "" tool_calls = [] reasoning = [] for chunk in chunks: msg_type = chunk.get("messageType") if msg_type == "assistant_message": final_content += chunk.get("content", "") elif msg_type == "tool_call_message": tool_calls.append({ "tool": chunk.get("toolCall", {}).get("name"), "args": chunk.get("toolCall", {}).get("arguments") }) elif msg_type == "reasoning_message": reasoning.append(chunk.get("reasoning", "")) return { "success": True, "agent_id": agent_id, "message": message, "response": final_content, "tool_calls": tool_calls, "reasoning": reasoning, "streamed": True } except Exception as e: logger.error(f"Error streaming message: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_get_conversation_history( agent_id: str, limit: int = 10, before: Optional[str] = None ) -> Dict[str, Any]: """ Get recent conversation history for a Letta agent. Args: agent_id: ID of the agent limit: Number of messages to retrieve (default: 10 to stay within token limits) before: Get messages before this timestamp Returns: Conversation history with messages. Limited to prevent token overflow. """ try: validate_agent_id(agent_id) params = {"limit": min(limit, 100)} if before: params["before"] = before response = await self.client.get( f"/v1/agents/{agent_id}/messages", params=params ) response.raise_for_status() messages = response.json() # Format messages for readability formatted_messages = [] for msg in messages: formatted = { "timestamp": msg.get("created_at"), "type": msg.get("message_type"), "role": msg.get("role") } # Add content based on type if msg.get("message_type") == "assistant_message": formatted["content"] = msg.get("content") elif msg.get("message_type") == "tool_call_message": formatted["tool"] = msg.get("tool_call", {}).get("name") formatted["args"] = msg.get("tool_call", {}).get("arguments") elif msg.get("message_type") == "tool_return_message": formatted["result"] = msg.get("tool_return") formatted_messages.append(formatted) return { "success": True, "agent_id": agent_id, "message_count": len(formatted_messages), "messages": formatted_messages } except Exception as e: logger.error(f"Error getting conversation history: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_export_conversation( agent_id: str, format: str = "markdown", include_tools: bool = False ) -> Dict[str, Any]: """ Export a conversation in various formats. Args: agent_id: ID of the agent format: Export format (markdown, json, text) include_tools: Whether to include tool calls Returns: Exported conversation content (limited to prevent token overflow). WARNING: Large conversations may be truncated to stay within token limits. """ try: # Get conversation history (limited to prevent token overflow) history_result = await letta_get_conversation_history( agent_id, limit=100 # Reduced from 1000 to stay within token limits ) if not history_result["success"]: return history_result messages = history_result["messages"] if format == "markdown": content = f"# Conversation with Agent {agent_id}\n\n" content += f"*Exported on {datetime.now().isoformat()}*\n\n" for msg in reversed(messages): # Chronological order timestamp = msg.get("timestamp", "") if msg["type"] == "user_message": content += f"## User ({timestamp})\n{msg.get('content', '')}\n\n" elif msg["type"] == "assistant_message": content += f"## Assistant ({timestamp})\n{msg.get('content', '')}\n\n" elif include_tools and msg["type"] == "tool_call_message": content += f"### Tool Call: {msg.get('tool')}\n" content += f"```json\n{json.dumps(msg.get('args', {}), indent=2)}\n```\n\n" elif format == "json": content = json.dumps(messages, indent=2) elif format == "text": content = "" for msg in reversed(messages): if msg["type"] in ["user_message", "assistant_message"]: role = "User" if msg["type"] == "user_message" else "Assistant" content += f"{role}: {msg.get('content', '')}\n\n" else: return { "success": False, "error": f"Unknown format: {format}. Use markdown, json, or text." } return { "success": True, "agent_id": agent_id, "format": format, "content": content, "message_count": len(messages) } except Exception as e: logger.error(f"Error exporting conversation: {e}") return {"success": False, "error": str(e)} def _register_memory_tools(self): """Register memory management tools""" @self.mcp.tool() async def letta_get_memory(agent_id: str) -> Dict[str, Any]: """Get all memory blocks for a Letta agent.""" try: validate_agent_id(agent_id) response = await self.client.get(f"/v1/agents/{agent_id}/memory-blocks") response.raise_for_status() memory_blocks = response.json() # Format memory blocks formatted = format_memory_blocks(memory_blocks) return { "success": True, "agent_id": agent_id, "memory_blocks": formatted, "raw_blocks": memory_blocks } except Exception as e: logger.error(f"Error getting memory: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_update_memory( agent_id: str, block_label: str, value: str ) -> Dict[str, Any]: """ Update a memory block for a Letta agent. Args: agent_id: ID of the agent block_label: Label of the memory block (e.g., 'human', 'persona') value: New value for the memory block Returns: Update status """ try: validate_agent_id(agent_id) # First get all memory blocks to find the right one response = await self.client.get(f"/v1/agents/{agent_id}/memory-blocks") response.raise_for_status() memory_blocks = response.json() # Find the block with matching label block_id = None for block in memory_blocks: if block.get("label") == block_label: block_id = block.get("id") break if not block_id: return { "success": False, "error": f"Memory block '{block_label}' not found. Available blocks: {[b.get('label') for b in memory_blocks]}" } # Update the block response = await self.client.patch( f"/v1/agents/{agent_id}/memory-blocks/{block_id}", json={"value": value} ) response.raise_for_status() updated_block = response.json() return { "success": True, "agent_id": agent_id, "block_label": block_label, "block_id": block_id, "updated_value": value, "message": f"Successfully updated '{block_label}' memory block" } except Exception as e: logger.error(f"Error updating memory: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_create_memory_block( agent_id: str, label: str, value: str, description: str ) -> Dict[str, Any]: """ Create a new custom memory block for an agent. Args: agent_id: ID of the agent label: Label for the new block value: Initial value description: Description of what this block stores Returns: Created memory block details """ try: validate_agent_id(agent_id) # Create the memory block response = await self.client.post( f"/v1/agents/{agent_id}/memory-blocks", json={ "label": label, "value": value, "description": description } ) response.raise_for_status() block = response.json() return { "success": True, "agent_id": agent_id, "block": block, "message": f"Successfully created memory block '{label}'" } except Exception as e: logger.error(f"Error creating memory block: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_search_memory( agent_id: str, query: str, limit: int = 10 ) -> Dict[str, Any]: """ Search through agent's conversation memory. Args: agent_id: ID of the agent query: Search query limit: Maximum results to return Returns: Matching messages from memory """ try: validate_agent_id(agent_id) # Search through messages response = await self.client.get( f"/v1/agents/{agent_id}/messages/search", params={ "query": query, "limit": limit } ) response.raise_for_status() results = response.json() return { "success": True, "agent_id": agent_id, "query": query, "result_count": len(results), "results": results } except Exception as e: # If search endpoint doesn't exist, fall back to manual search if "404" in str(e): history = await letta_get_conversation_history(agent_id, limit=1000) if history["success"]: # Manual search through messages query_lower = query.lower() matches = [] for msg in history["messages"]: content = msg.get("content", "").lower() if query_lower in content: matches.append(msg) if len(matches) >= limit: break return { "success": True, "agent_id": agent_id, "query": query, "result_count": len(matches), "results": matches, "method": "manual_search" } logger.error(f"Error searching memory: {e}") return {"success": False, "error": str(e)} def _register_tool_management(self): """Register tool management functionality""" @self.mcp.tool() async def letta_list_tools( filter: Optional[str] = None, tags: Optional[List[str]] = None ) -> Dict[str, Any]: """ List all available tools in the Letta system. Args: filter: Text filter for tool names/descriptions tags: Filter by tool tags Returns: List of tool summaries (id, name, description, tags) to stay within token limits. Descriptions are truncated to 200 characters. """ try: response = await self.client.get("/v1/tools") response.raise_for_status() tools = response.json() # Apply filters if filter: filter_lower = filter.lower() tools = [ t for t in tools if filter_lower in t.get("name", "").lower() or filter_lower in t.get("description", "").lower() ] if tags: tools = [ t for t in tools if any(tag in t.get("tags", []) for tag in tags) ] # Group by tags tools_by_tag = {} for tool in tools: for tag in tool.get("tags", ["other"]): if tag not in tools_by_tag: tools_by_tag[tag] = [] tools_by_tag[tag].append({ "name": tool.get("name"), "description": tool.get("description"), "id": tool.get("id") }) # Return tool summaries to stay under token limit tool_summaries = [] for tool in tools: tool_summaries.append({ "id": tool.get("id"), "name": tool.get("name"), "description": tool.get("description", "")[:200], # Truncate long descriptions "tags": tool.get("tags", []) }) return { "success": True, "total_tools": len(tools), "tools_by_tag": tools_by_tag, "tools": tool_summaries } except Exception as e: logger.error(f"Error listing tools: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_get_agent_tools(agent_id: str) -> Dict[str, Any]: """Get the tools attached to a specific agent.""" try: validate_agent_id(agent_id) response = await self.client.get(f"/v1/agents/{agent_id}") response.raise_for_status() agent = response.json() tools = agent.get("tools", []) return { "success": True, "agent_id": agent_id, "agent_name": agent.get("name"), "tool_count": len(tools), "tools": tools } except Exception as e: logger.error(f"Error getting agent tools: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_attach_tool( agent_id: str, tool_name: str ) -> Dict[str, Any]: """ Attach a tool to an agent. Args: agent_id: ID of the agent tool_name: Name of the tool to attach Returns: Attachment status """ try: validate_agent_id(agent_id) # Get current tools agent_response = await self.client.get(f"/v1/agents/{agent_id}") agent_response.raise_for_status() agent = agent_response.json() current_tools = agent.get("tools", []) # Check if already attached if tool_name in current_tools: return { "success": True, "message": f"Tool '{tool_name}' is already attached to agent", "tools": current_tools } # Add the new tool updated_tools = current_tools + [tool_name] # Update agent response = await self.client.patch( f"/v1/agents/{agent_id}", json={"tools": updated_tools} ) response.raise_for_status() return { "success": True, "agent_id": agent_id, "message": f"Successfully attached tool '{tool_name}'", "tools": updated_tools } except Exception as e: logger.error(f"Error attaching tool: {e}") return {"success": False, "error": str(e)} @self.mcp.tool() async def letta_detach_tool( agent_id: str, tool_name: str ) -> Dict[str, Any]: """ Detach a tool from an agent. Args: agent_id: ID of the agent tool_name: Name of the tool to detach Returns: Detachment status """ try: validate_agent_id(agent_id) # Get current tools agent_response = await self.client.get(f"/v1/agents/{agent_id}") agent_response.raise_for_status() agent = agent_response.json() current_tools = agent.get("tools", []) # Check if attached if tool_name not in current_tools: return { "success": False, "error": f"Tool '{tool_name}' is not attached to this agent", "tools": current_tools } # Remove the tool updated_tools = [t for t in current_tools if t != tool_name] # Update agent response = await self.client.patch( f"/v1/agents/{agent_id}", json={"tools": updated_tools} ) response.raise_for_status() return { "success": True, "agent_id": agent_id, "message": f"Successfully detached tool '{tool_name}'", "tools": updated_tools } except Exception as e: logger.error(f"Error detaching tool: {e}") return {"success": False, "error": str(e)} def _register_utility_tools(self): """Register utility and helper tools""" @self.mcp.tool() async def letta_health_check() -> Dict[str, Any]: """Check the health of the Letta API connection.""" try: # Try to list agents with limit=1 as a health check response = await self.client.get("/v1/agents", params={"limit": 1}) response.raise_for_status() return { "success": True, "status": "healthy", "base_url": self.config.base_url, "api_version": response.headers.get("X-API-Version", "unknown"), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Health check failed: {e}") return { "success": False, "status": "unhealthy", "error": str(e), "base_url": self.config.base_url, "timestamp": datetime.now().isoformat() } @self.mcp.tool() async def letta_get_usage_stats( agent_id: Optional[str] = None, period: str = "day" ) -> Dict[str, Any]: """ Get usage statistics for agents. Args: agent_id: Specific agent ID or None for all agents period: Time period (day, week, month) Returns: Usage statistics """ try: params = {"period": period} if agent_id: params["agent_id"] = agent_id response = await self.client.get("/v1/stats/usage", params=params) response.raise_for_status() stats = response.json() return { "success": True, "period": period, "agent_id": agent_id, "stats": stats } except Exception as e: # If stats endpoint doesn't exist, return mock data if "404" in str(e): return { "success": True, "period": period, "agent_id": agent_id, "stats": { "message_count": "N/A", "tool_calls": "N/A", "tokens_used": "N/A" }, "note": "Usage stats endpoint not available" } logger.error(f"Error getting usage stats: {e}") return {"success": False, "error": str(e)} def _register_resources(self): """Register MCP resources for data access""" @self.mcp.resource("letta://agents") async def get_agents_resource() -> str: """List of all Letta agents""" result = await letta_list_agents() return json.dumps(result, indent=2) @self.mcp.resource("letta://tools") async def get_tools_resource() -> str: """List of all available tools""" result = await letta_list_tools() return json.dumps(result, indent=2) @self.mcp.resource("letta://agent/{agent_id}") async def get_agent_resource(agent_id: str) -> str: """Get detailed information about a specific agent""" result = await letta_get_agent(agent_id) return json.dumps(result, indent=2) @self.mcp.resource("letta://agent/{agent_id}/memory") async def get_agent_memory_resource(agent_id: str) -> str: """Get memory blocks for a specific agent""" result = await letta_get_memory(agent_id) return json.dumps(result, indent=2) @self.mcp.resource("letta://health") async def get_health_resource() -> str: """Health status of the Letta connection""" result = await letta_health_check() return json.dumps(result, indent=2) def run(self, transport: str = "streamable-http"): """Run the MCP server with Railway-optimized transport""" from ._version import __version__ logger.info(f"Starting Letta MCP Server Railway v{__version__}") logger.info(f"Connected to: {self.config.base_url}") logger.info(f"Transport: {transport}") logger.info(f"Railway deployment mode enabled") self.mcp.run(transport=transport) def create_server(config: Optional[LettaConfig] = None) -> LettaMCPServer: """Create a new Letta MCP Server Railway instance""" return LettaMCPServer(config) def run_server(config: Optional[LettaConfig] = None, transport: str = "streamable-http"): """Create and run a Letta MCP Server Railway""" server = create_server(config) server.run(transport) def main(): """Entry point for the letta-mcp-server-railway command""" run_server() if __name__ == "__main__": # CRITICAL: Use streamable-http for Railway deployment import os server = create_server() server.mcp.run( transport="streamable-http", host="0.0.0.0", port=int(os.getenv("PORT", 8000)) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SNYCFIRE-CORE/letta-mcp-server-railway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server