DroidMind

by hyperb1iss
Verified
======================================== Section: Configuration Files ======================================== ======================================== File: "examples/clients/simple-chatbot/pyproject.toml" Size: 1119 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== [project] name = "mcp-simple-chatbot" version = "0.1.0" description = "A simple CLI chatbot using the Model Context Protocol (MCP)" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Edoardo Cilia" }] keywords = ["mcp", "llm", "chatbot", "cli"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = [ "python-dotenv>=1.0.0", "requests>=2.31.0", "mcp>=1.0.0", "uvicorn>=0.32.1" ] [project.scripts] mcp-simple-chatbot = "mcp_simple_chatbot.client:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["mcp_simple_chatbot"] [tool.pyright] include = ["mcp_simple_chatbot"] venvPath = "." venv = ".venv" [tool.ruff.lint] select = ["E", "F", "I"] ignore = [] [tool.ruff] line-length = 88 target-version = "py310" [tool.uv] dev-dependencies = ["pyright>=1.1.379", "pytest>=8.3.3", "ruff>=0.6.9"] ======================================== File: "examples/servers/simple-prompt/pyproject.toml" Size: 1234 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== [project] name = "mcp-simple-prompt" version = "0.1.0" description = "A simple MCP server exposing a customizable prompt" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Anthropic, PBC." }] maintainers = [ { name = "David Soria Parra", email = "davidsp@anthropic.com" }, { name = "Justin Spahr-Summers", email = "justin@anthropic.com" }, ] keywords = ["mcp", "llm", "automation", "web", "fetch"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp"] [project.scripts] mcp-simple-prompt = "mcp_simple_prompt.server:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["mcp_simple_prompt"] [tool.pyright] include = ["mcp_simple_prompt"] venvPath = "." venv = ".venv" [tool.ruff.lint] select = ["E", "F", "I"] ignore = [] [tool.ruff] line-length = 88 target-version = "py310" [tool.uv] dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] ======================================== File: "examples/servers/simple-resource/pyproject.toml" Size: 1244 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== [project] name = "mcp-simple-resource" version = "0.1.0" description = "A simple MCP server exposing sample text resources" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Anthropic, PBC." }] maintainers = [ { name = "David Soria Parra", email = "davidsp@anthropic.com" }, { name = "Justin Spahr-Summers", email = "justin@anthropic.com" }, ] keywords = ["mcp", "llm", "automation", "web", "fetch"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp"] [project.scripts] mcp-simple-resource = "mcp_simple_resource.server:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["mcp_simple_resource"] [tool.pyright] include = ["mcp_simple_resource"] venvPath = "." venv = ".venv" [tool.ruff.lint] select = ["E", "F", "I"] ignore = [] [tool.ruff] line-length = 88 target-version = "py310" [tool.uv] dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] ======================================== File: "examples/servers/simple-tool/pyproject.toml" Size: 1226 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== [project] name = "mcp-simple-tool" version = "0.1.0" description = "A simple MCP server exposing a website fetching tool" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Anthropic, PBC." }] maintainers = [ { name = "David Soria Parra", email = "davidsp@anthropic.com" }, { name = "Justin Spahr-Summers", email = "justin@anthropic.com" }, ] keywords = ["mcp", "llm", "automation", "web", "fetch"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp"] [project.scripts] mcp-simple-tool = "mcp_simple_tool.server:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["mcp_simple_tool"] [tool.pyright] include = ["mcp_simple_tool"] venvPath = "." venv = ".venv" [tool.ruff.lint] select = ["E", "F", "I"] ignore = [] [tool.ruff] line-length = 88 target-version = "py310" [tool.uv] dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] ======================================== File: "examples/clients/simple-chatbot/mcp_simple_chatbot/servers_config.json" Size: 246 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== { "mcpServers": { "sqlite": { "command": "uvx", "args": ["mcp-server-sqlite", "--db-path", "./test.db"] }, "puppeteer": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-puppeteer"] } } } ======================================== Section: Documentation ======================================== ======================================== File: "examples/clients/simple-chatbot/README.MD" Size: 3149 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== # MCP Simple Chatbot This example demonstrates how to integrate the Model Context Protocol (MCP) into a simple CLI chatbot. The implementation showcases MCP's flexibility by supporting multiple tools through MCP servers and is compatible with any LLM provider that follows OpenAI API standards. ## Requirements - Python 3.10 - `python-dotenv` - `requests` - `mcp` - `uvicorn` ## Installation 1. **Install the dependencies:** ```bash pip install -r requirements.txt ``` 2. **Set up environment variables:** Create a `.env` file in the root directory and add your API key: ```plaintext LLM_API_KEY=your_api_key_here ``` 3. **Configure servers:** The `servers_config.json` follows the same structure as Claude Desktop, allowing for easy integration of multiple servers. Here's an example: ```json { "mcpServers": { "sqlite": { "command": "uvx", "args": ["mcp-server-sqlite", "--db-path", "./test.db"] }, "puppeteer": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-puppeteer"] } } } ``` Environment variables are supported as well. Pass them as you would with the Claude Desktop App. Example: ```json { "mcpServers": { "server_name": { "command": "uvx", "args": ["mcp-server-name", "--additional-args"], "env": { "API_KEY": "your_api_key_here" } } } } ``` ## Usage 1. **Run the client:** ```bash python main.py ``` 2. **Interact with the assistant:** The assistant will automatically detect available tools and can respond to queries based on the tools provided by the configured servers. 3. **Exit the session:** Type `quit` or `exit` to end the session. ## Architecture - **Tool Discovery**: Tools are automatically discovered from configured servers. - **System Prompt**: Tools are dynamically included in the system prompt, allowing the LLM to understand available capabilities. - **Server Integration**: Supports any MCP-compatible server, tested with various server implementations including Uvicorn and Node.js. ### Class Structure - **Configuration**: Manages environment variables and server configurations - **Server**: Handles MCP server initialization, tool discovery, and execution - **Tool**: Represents individual tools with their properties and formatting - **LLMClient**: Manages communication with the LLM provider - **ChatSession**: Orchestrates the interaction between user, LLM, and tools ### Logic Flow 1. **Tool Integration**: - Tools are dynamically discovered from MCP servers - Tool descriptions are automatically included in system prompt - Tool execution is handled through standardized MCP protocol 2. **Runtime Flow**: - User input is received - Input is sent to LLM with context of available tools - LLM response is parsed: - If it's a tool call → execute tool and return result - If it's a direct response → return to user - Tool results are sent back to LLM for interpretation - Final response is presented to user ======================================== File: "examples/README.md" Size: 207 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== # Python SDK Examples This folders aims to provide simple examples of using the Python SDK. Please refer to the [servers repository](https://github.com/modelcontextprotocol/servers) for real-world servers. ======================================== File: "examples/servers/simple-prompt/README.md" Size: 1473 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== # MCP Simple Prompt A simple MCP server that exposes a customizable prompt template with optional context and topic parameters. ## Usage Start the server using either stdio (default) or SSE transport: ```bash # Using stdio transport (default) uv run mcp-simple-prompt # Using SSE transport on custom port uv run mcp-simple-prompt --transport sse --port 8000 ``` The server exposes a prompt named "simple" that accepts two optional arguments: - `context`: Additional context to consider - `topic`: Specific topic to focus on ## Example Using the MCP client, you can retrieve the prompt like this using the STDIO transport: ```python import asyncio from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client async def main(): async with stdio_client( StdioServerParameters(command="uv", args=["run", "mcp-simple-prompt"]) ) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # List available prompts prompts = await session.list_prompts() print(prompts) # Get the prompt with arguments prompt = await session.get_prompt( "simple", { "context": "User is a software developer", "topic": "Python async programming", }, ) print(prompt) asyncio.run(main()) ``` ======================================== File: "examples/servers/simple-resource/README.md" Size: 1226 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== # MCP Simple Resource A simple MCP server that exposes sample text files as resources. ## Usage Start the server using either stdio (default) or SSE transport: ```bash # Using stdio transport (default) uv run mcp-simple-resource # Using SSE transport on custom port uv run mcp-simple-resource --transport sse --port 8000 ``` The server exposes some basic text file resources that can be read by clients. ## Example Using the MCP client, you can retrieve resources like this using the STDIO transport: ```python import asyncio from mcp.types import AnyUrl from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client async def main(): async with stdio_client( StdioServerParameters(command="uv", args=["run", "mcp-simple-resource"]) ) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # List available resources resources = await session.list_resources() print(resources) # Get a specific resource resource = await session.read_resource(AnyUrl("file:///greeting.txt")) print(resource) asyncio.run(main()) ``` ======================================== File: "examples/servers/simple-tool/README.md" Size: 1170 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== A simple MCP server that exposes a website fetching tool. ## Usage Start the server using either stdio (default) or SSE transport: ```bash # Using stdio transport (default) uv run mcp-simple-tool # Using SSE transport on custom port uv run mcp-simple-tool --transport sse --port 8000 ``` The server exposes a tool named "fetch" that accepts one required argument: - `url`: The URL of the website to fetch ## Example Using the MCP client, you can use the tool like this using the STDIO transport: ```python import asyncio from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client async def main(): async with stdio_client( StdioServerParameters(command="uv", args=["run", "mcp-simple-tool"]) ) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # List available tools tools = await session.list_tools() print(tools) # Call the fetch tool result = await session.call_tool("fetch", {"url": "https://example.com"}) print(result) asyncio.run(main()) ``` ======================================== File: "examples/clients/simple-chatbot/mcp_simple_chatbot/requirements.txt" Size: 64 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== python-dotenv>=1.0.0 requests>=2.31.0 mcp>=1.0.0 uvicorn>=0.32.1 ======================================== Section: Source Files ======================================== ======================================== File: "examples/servers/simple-prompt/mcp_simple_prompt/__init__.py" Size: 1 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== ======================================== File: "examples/servers/simple-prompt/mcp_simple_prompt/__main__.py" Size: 55 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== import sys from .server import main sys.exit(main()) ======================================== File: "examples/servers/simple-resource/mcp_simple_resource/__main__.py" Size: 54 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== import sys from server import main sys.exit(main()) ======================================== File: "examples/fastmcp/complex_inputs.py" Size: 685 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Complex inputs Example Demonstrates validation via pydantic with complex models. """ from typing import Annotated from pydantic import BaseModel, Field from mcp.server.fastmcp import FastMCP mcp = FastMCP("Shrimp Tank") class ShrimpTank(BaseModel): class Shrimp(BaseModel): name: Annotated[str, Field(max_length=10)] shrimp: list[Shrimp] @mcp.tool() def name_shrimp( tank: ShrimpTank, # You can use pydantic Field in function signatures for validation. extra_names: Annotated[list[str], Field(max_length=10)], ) -> list[str]: """List all shrimp names in the tank""" return [shrimp.name for shrimp in tank.shrimp] + extra_names ======================================== File: "examples/fastmcp/desktop.py" Size: 487 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Desktop Example A simple example that exposes the desktop directory as a resource. """ from pathlib import Path from mcp.server.fastmcp import FastMCP # Create server mcp = FastMCP("Demo") @mcp.resource("dir://desktop") def desktop() -> list[str]: """List the files in the user's desktop""" desktop = Path.home() / "Desktop" return [str(f) for f in desktop.iterdir()] @mcp.tool() def add(a: int, b: int) -> int: """Add two numbers""" return a + b ======================================== File: "examples/fastmcp/echo.py" Size: 487 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Echo Server """ from mcp.server.fastmcp import FastMCP # Create server mcp = FastMCP("Echo Server") @mcp.tool() def echo_tool(text: str) -> str: """Echo the input text""" return text @mcp.resource("echo://static") def echo_resource() -> str: return "Echo!" @mcp.resource("echo://{text}") def echo_template(text: str) -> str: """Echo the input text""" return f"Echo: {text}" @mcp.prompt("echo") def echo_prompt(text: str) -> str: return text ======================================== File: "examples/clients/simple-chatbot/mcp_simple_chatbot/main.py" Size: 14807 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 561956320 } ======================================== import asyncio import json import logging import os import shutil from contextlib import AsyncExitStack from typing import Any import httpx from dotenv import load_dotenv from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) class Configuration: """Manages configuration and environment variables for the MCP client.""" def __init__(self) -> None: """Initialize configuration with environment variables.""" self.load_env() self.api_key = os.getenv("GROQ_API_KEY") @staticmethod def load_env() -> None: """Load environment variables from .env file.""" load_dotenv() @staticmethod def load_config(file_path: str) -> dict[str, Any]: """Load server configuration from JSON file. Args: file_path: Path to the JSON configuration file. Returns: Dict containing server configuration. Raises: FileNotFoundError: If configuration file doesn't exist. JSONDecodeError: If configuration file is invalid JSON. """ with open(file_path, "r") as f: return json.load(f) @property def llm_api_key(self) -> str: """Get the LLM API key. Returns: The API key as a string. Raises: ValueError: If the API key is not found in environment variables. """ if not self.api_key: raise ValueError("LLM_API_KEY not found in environment variables") return self.api_key class Server: """Manages MCP server connections and tool execution.""" def __init__(self, name: str, config: dict[str, Any]) -> None: self.name: str = name self.config: dict[str, Any] = config self.stdio_context: Any | None = None self.session: ClientSession | None = None self._cleanup_lock: asyncio.Lock = asyncio.Lock() self.exit_stack: AsyncExitStack = AsyncExitStack() async def initialize(self) -> None: """Initialize the server connection.""" command = ( shutil.which("npx") if self.config["command"] == "npx" else self.config["command"] ) if command is None: raise ValueError("The command must be a valid string and cannot be None.") server_params = StdioServerParameters( command=command, args=self.config["args"], env={**os.environ, **self.config["env"]} if self.config.get("env") else None, ) try: stdio_transport = await self.exit_stack.enter_async_context( stdio_client(server_params) ) read, write = stdio_transport session = await self.exit_stack.enter_async_context( ClientSession(read, write) ) await session.initialize() self.session = session except Exception as e: logging.error(f"Error initializing server {self.name}: {e}") await self.cleanup() raise async def list_tools(self) -> list[Any]: """List available tools from the server. Returns: A list of available tools. Raises: RuntimeError: If the server is not initialized. """ if not self.session: raise RuntimeError(f"Server {self.name} not initialized") tools_response = await self.session.list_tools() tools = [] for item in tools_response: if isinstance(item, tuple) and item[0] == "tools": for tool in item[1]: tools.append(Tool(tool.name, tool.description, tool.inputSchema)) return tools async def execute_tool( self, tool_name: str, arguments: dict[str, Any], retries: int = 2, delay: float = 1.0, ) -> Any: """Execute a tool with retry mechanism. Args: tool_name: Name of the tool to execute. arguments: Tool arguments. retries: Number of retry attempts. delay: Delay between retries in seconds. Returns: Tool execution result. Raises: RuntimeError: If server is not initialized. Exception: If tool execution fails after all retries. """ if not self.session: raise RuntimeError(f"Server {self.name} not initialized") attempt = 0 while attempt < retries: try: logging.info(f"Executing {tool_name}...") result = await self.session.call_tool(tool_name, arguments) return result except Exception as e: attempt += 1 logging.warning( f"Error executing tool: {e}. Attempt {attempt} of {retries}." ) if attempt < retries: logging.info(f"Retrying in {delay} seconds...") await asyncio.sleep(delay) else: logging.error("Max retries reached. Failing.") raise async def cleanup(self) -> None: """Clean up server resources.""" async with self._cleanup_lock: try: await self.exit_stack.aclose() self.session = None self.stdio_context = None except Exception as e: logging.error(f"Error during cleanup of server {self.name}: {e}") class Tool: """Represents a tool with its properties and formatting.""" def __init__( self, name: str, description: str, input_schema: dict[str, Any] ) -> None: self.name: str = name self.description: str = description self.input_schema: dict[str, Any] = input_schema def format_for_llm(self) -> str: """Format tool information for LLM. Returns: A formatted string describing the tool. """ args_desc = [] if "properties" in self.input_schema: for param_name, param_info in self.input_schema["properties"].items(): arg_desc = ( f"- {param_name}: {param_info.get('description', 'No description')}" ) if param_name in self.input_schema.get("required", []): arg_desc += " (required)" args_desc.append(arg_desc) return f""" Tool: {self.name} Description: {self.description} Arguments: {chr(10).join(args_desc)} """ class LLMClient: """Manages communication with the LLM provider.""" def __init__(self, api_key: str) -> None: self.api_key: str = api_key def get_response(self, messages: list[dict[str, str]]) -> str: """Get a response from the LLM. Args: messages: A list of message dictionaries. Returns: The LLM's response as a string. Raises: httpx.RequestError: If the request to the LLM fails. """ url = "https://api.groq.com/openai/v1/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", } payload = { "messages": messages, "model": "llama-3.2-90b-vision-preview", "temperature": 0.7, "max_tokens": 4096, "top_p": 1, "stream": False, "stop": None, } try: with httpx.Client() as client: response = client.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] except httpx.RequestError as e: error_message = f"Error getting LLM response: {str(e)}" logging.error(error_message) if isinstance(e, httpx.HTTPStatusError): status_code = e.response.status_code logging.error(f"Status code: {status_code}") logging.error(f"Response details: {e.response.text}") return ( f"I encountered an error: {error_message}. " "Please try again or rephrase your request." ) class ChatSession: """Orchestrates the interaction between user, LLM, and tools.""" def __init__(self, servers: list[Server], llm_client: LLMClient) -> None: self.servers: list[Server] = servers self.llm_client: LLMClient = llm_client async def cleanup_servers(self) -> None: """Clean up all servers properly.""" cleanup_tasks = [] for server in self.servers: cleanup_tasks.append(asyncio.create_task(server.cleanup())) if cleanup_tasks: try: await asyncio.gather(*cleanup_tasks, return_exceptions=True) except Exception as e: logging.warning(f"Warning during final cleanup: {e}") async def process_llm_response(self, llm_response: str) -> str: """Process the LLM response and execute tools if needed. Args: llm_response: The response from the LLM. Returns: The result of tool execution or the original response. """ import json try: tool_call = json.loads(llm_response) if "tool" in tool_call and "arguments" in tool_call: logging.info(f"Executing tool: {tool_call['tool']}") logging.info(f"With arguments: {tool_call['arguments']}") for server in self.servers: tools = await server.list_tools() if any(tool.name == tool_call["tool"] for tool in tools): try: result = await server.execute_tool( tool_call["tool"], tool_call["arguments"] ) if isinstance(result, dict) and "progress" in result: progress = result["progress"] total = result["total"] percentage = (progress / total) * 100 logging.info( f"Progress: {progress}/{total} " f"({percentage:.1f}%)" ) return f"Tool execution result: {result}" except Exception as e: error_msg = f"Error executing tool: {str(e)}" logging.error(error_msg) return error_msg return f"No server found with tool: {tool_call['tool']}" return llm_response except json.JSONDecodeError: return llm_response async def start(self) -> None: """Main chat session handler.""" try: for server in self.servers: try: await server.initialize() except Exception as e: logging.error(f"Failed to initialize server: {e}") await self.cleanup_servers() return all_tools = [] for server in self.servers: tools = await server.list_tools() all_tools.extend(tools) tools_description = "\n".join([tool.format_for_llm() for tool in all_tools]) system_message = ( "You are a helpful assistant with access to these tools:\n\n" f"{tools_description}\n" "Choose the appropriate tool based on the user's question. " "If no tool is needed, reply directly.\n\n" "IMPORTANT: When you need to use a tool, you must ONLY respond with " "the exact JSON object format below, nothing else:\n" "{\n" ' "tool": "tool-name",\n' ' "arguments": {\n' ' "argument-name": "value"\n' " }\n" "}\n\n" "After receiving a tool's response:\n" "1. Transform the raw data into a natural, conversational response\n" "2. Keep responses concise but informative\n" "3. Focus on the most relevant information\n" "4. Use appropriate context from the user's question\n" "5. Avoid simply repeating the raw data\n\n" "Please use only the tools that are explicitly defined above." ) messages = [{"role": "system", "content": system_message}] while True: try: user_input = input("You: ").strip().lower() if user_input in ["quit", "exit"]: logging.info("\nExiting...") break messages.append({"role": "user", "content": user_input}) llm_response = self.llm_client.get_response(messages) logging.info("\nAssistant: %s", llm_response) result = await self.process_llm_response(llm_response) if result != llm_response: messages.append({"role": "assistant", "content": llm_response}) messages.append({"role": "system", "content": result}) final_response = self.llm_client.get_response(messages) logging.info("\nFinal response: %s", final_response) messages.append( {"role": "assistant", "content": final_response} ) else: messages.append({"role": "assistant", "content": llm_response}) except KeyboardInterrupt: logging.info("\nExiting...") break finally: await self.cleanup_servers() async def main() -> None: """Initialize and run the chat session.""" config = Configuration() server_config = config.load_config("servers_config.json") servers = [ Server(name, srv_config) for name, srv_config in server_config["mcpServers"].items() ] llm_client = LLMClient(config.llm_api_key) chat_session = ChatSession(servers, llm_client) await chat_session.start() if __name__ == "__main__": asyncio.run(main()) ======================================== File: "examples/fastmcp/memory.py" Size: 10701 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== # /// script # dependencies = ["pydantic-ai-slim[openai]", "asyncpg", "numpy", "pgvector"] # /// # uv pip install 'pydantic-ai-slim[openai]' asyncpg numpy pgvector """ Recursive memory system inspired by the human brain's clustering of memories. Uses OpenAI's 'text-embedding-3-small' model and pgvector for efficient similarity search. """ import asyncio import math import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Annotated, Self import asyncpg import numpy as np from openai import AsyncOpenAI from pgvector.asyncpg import register_vector # Import register_vector from pydantic import BaseModel, Field from pydantic_ai import Agent from mcp.server.fastmcp import FastMCP MAX_DEPTH = 5 SIMILARITY_THRESHOLD = 0.7 DECAY_FACTOR = 0.99 REINFORCEMENT_FACTOR = 1.1 DEFAULT_LLM_MODEL = "openai:gpt-4o" DEFAULT_EMBEDDING_MODEL = "text-embedding-3-small" mcp = FastMCP( "memory", dependencies=[ "pydantic-ai-slim[openai]", "asyncpg", "numpy", "pgvector", ], ) DB_DSN = "postgresql://postgres:postgres@localhost:54320/memory_db" # reset memory with rm ~/.fastmcp/{USER}/memory/* PROFILE_DIR = ( Path.home() / ".fastmcp" / os.environ.get("USER", "anon") / "memory" ).resolve() PROFILE_DIR.mkdir(parents=True, exist_ok=True) def cosine_similarity(a: list[float], b: list[float]) -> float: a_array = np.array(a, dtype=np.float64) b_array = np.array(b, dtype=np.float64) return np.dot(a_array, b_array) / ( np.linalg.norm(a_array) * np.linalg.norm(b_array) ) async def do_ai[T]( user_prompt: str, system_prompt: str, result_type: type[T] | Annotated, deps=None, ) -> T: agent = Agent( DEFAULT_LLM_MODEL, system_prompt=system_prompt, result_type=result_type, ) result = await agent.run(user_prompt, deps=deps) return result.data @dataclass class Deps: openai: AsyncOpenAI pool: asyncpg.Pool async def get_db_pool() -> asyncpg.Pool: async def init(conn): await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;") await register_vector(conn) pool = await asyncpg.create_pool(DB_DSN, init=init) return pool class MemoryNode(BaseModel): id: int | None = None content: str summary: str = "" importance: float = 1.0 access_count: int = 0 timestamp: float = Field( default_factory=lambda: datetime.now(timezone.utc).timestamp() ) embedding: list[float] @classmethod async def from_content(cls, content: str, deps: Deps): embedding = await get_embedding(content, deps) return cls(content=content, embedding=embedding) async def save(self, deps: Deps): async with deps.pool.acquire() as conn: if self.id is None: result = await conn.fetchrow( """ INSERT INTO memories (content, summary, importance, access_count, timestamp, embedding) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id """, self.content, self.summary, self.importance, self.access_count, self.timestamp, self.embedding, ) self.id = result["id"] else: await conn.execute( """ UPDATE memories SET content = $1, summary = $2, importance = $3, access_count = $4, timestamp = $5, embedding = $6 WHERE id = $7 """, self.content, self.summary, self.importance, self.access_count, self.timestamp, self.embedding, self.id, ) async def merge_with(self, other: Self, deps: Deps): self.content = await do_ai( f"{self.content}\n\n{other.content}", "Combine the following two texts into a single, coherent text.", str, deps, ) self.importance += other.importance self.access_count += other.access_count self.embedding = [(a + b) / 2 for a, b in zip(self.embedding, other.embedding)] self.summary = await do_ai( self.content, "Summarize the following text concisely.", str, deps ) await self.save(deps) # Delete the merged node from the database if other.id is not None: await delete_memory(other.id, deps) def get_effective_importance(self): return self.importance * (1 + math.log(self.access_count + 1)) async def get_embedding(text: str, deps: Deps) -> list[float]: embedding_response = await deps.openai.embeddings.create( input=text, model=DEFAULT_EMBEDDING_MODEL, ) return embedding_response.data[0].embedding async def delete_memory(memory_id: int, deps: Deps): async with deps.pool.acquire() as conn: await conn.execute("DELETE FROM memories WHERE id = $1", memory_id) async def add_memory(content: str, deps: Deps): new_memory = await MemoryNode.from_content(content, deps) await new_memory.save(deps) similar_memories = await find_similar_memories(new_memory.embedding, deps) for memory in similar_memories: if memory.id != new_memory.id: await new_memory.merge_with(memory, deps) await update_importance(new_memory.embedding, deps) await prune_memories(deps) return f"Remembered: {content}" async def find_similar_memories(embedding: list[float], deps: Deps) -> list[MemoryNode]: async with deps.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, content, summary, importance, access_count, timestamp, embedding FROM memories ORDER BY embedding <-> $1 LIMIT 5 """, embedding, ) memories = [ MemoryNode( id=row["id"], content=row["content"], summary=row["summary"], importance=row["importance"], access_count=row["access_count"], timestamp=row["timestamp"], embedding=row["embedding"], ) for row in rows ] return memories async def update_importance(user_embedding: list[float], deps: Deps): async with deps.pool.acquire() as conn: rows = await conn.fetch( "SELECT id, importance, access_count, embedding FROM memories" ) for row in rows: memory_embedding = row["embedding"] similarity = cosine_similarity(user_embedding, memory_embedding) if similarity > SIMILARITY_THRESHOLD: new_importance = row["importance"] * REINFORCEMENT_FACTOR new_access_count = row["access_count"] + 1 else: new_importance = row["importance"] * DECAY_FACTOR new_access_count = row["access_count"] await conn.execute( """ UPDATE memories SET importance = $1, access_count = $2 WHERE id = $3 """, new_importance, new_access_count, row["id"], ) async def prune_memories(deps: Deps): async with deps.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, importance, access_count FROM memories ORDER BY importance DESC OFFSET $1 """, MAX_DEPTH, ) for row in rows: await conn.execute("DELETE FROM memories WHERE id = $1", row["id"]) async def display_memory_tree(deps: Deps) -> str: async with deps.pool.acquire() as conn: rows = await conn.fetch( """ SELECT content, summary, importance, access_count FROM memories ORDER BY importance DESC LIMIT $1 """, MAX_DEPTH, ) result = "" for row in rows: effective_importance = row["importance"] * ( 1 + math.log(row["access_count"] + 1) ) summary = row["summary"] or row["content"] result += f"- {summary} (Importance: {effective_importance:.2f})\n" return result @mcp.tool() async def remember( contents: list[str] = Field( description="List of observations or memories to store" ), ): deps = Deps(openai=AsyncOpenAI(), pool=await get_db_pool()) try: return "\n".join( await asyncio.gather(*[add_memory(content, deps) for content in contents]) ) finally: await deps.pool.close() @mcp.tool() async def read_profile() -> str: deps = Deps(openai=AsyncOpenAI(), pool=await get_db_pool()) profile = await display_memory_tree(deps) await deps.pool.close() return profile async def initialize_database(): pool = await asyncpg.create_pool( "postgresql://postgres:postgres@localhost:54320/postgres" ) try: async with pool.acquire() as conn: await conn.execute(""" SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = 'memory_db' AND pid <> pg_backend_pid(); """) await conn.execute("DROP DATABASE IF EXISTS memory_db;") await conn.execute("CREATE DATABASE memory_db;") finally: await pool.close() pool = await asyncpg.create_pool(DB_DSN) try: async with pool.acquire() as conn: await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;") await register_vector(conn) await conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, summary TEXT, importance REAL NOT NULL, access_count INT NOT NULL, timestamp DOUBLE PRECISION NOT NULL, embedding vector(1536) NOT NULL ); CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING hnsw (embedding vector_l2_ops); """) finally: await pool.close() if __name__ == "__main__": asyncio.run(initialize_database()) ======================================== File: "examples/fastmcp/parameter_descriptions.py" Size: 627 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Example showing parameter descriptions """ from pydantic import Field from mcp.server.fastmcp import FastMCP # Create server mcp = FastMCP("Parameter Descriptions Server") @mcp.tool() def greet_user( name: str = Field(description="The name of the person to greet"), title: str = Field(description="Optional title like Mr/Ms/Dr", default=""), times: int = Field(description="Number of times to repeat the greeting", default=1), ) -> str: """Greet a user with optional title and repetition""" greeting = f"Hello {title + ' ' if title else ''}{name}!" return "\n".join([greeting] * times) ======================================== File: "examples/fastmcp/readme-quickstart.py" Size: 371 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== from mcp.server.fastmcp import FastMCP # Create an MCP server mcp = FastMCP("Demo") # Add an addition tool @mcp.tool() def add(a: int, b: int) -> int: """Add two numbers""" return a + b # Add a dynamic greeting resource @mcp.resource("greeting://{name}") def get_greeting(name: str) -> str: """Get a personalized greeting""" return f"Hello, {name}!" ======================================== File: "examples/fastmcp/screenshot.py" Size: 785 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Screenshot Example Give Claude a tool to capture and view screenshots. """ import io from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp.utilities.types import Image # Create server mcp = FastMCP("Screenshot Demo", dependencies=["pyautogui", "Pillow"]) @mcp.tool() def take_screenshot() -> Image: """ Take a screenshot of the user's screen and return it as an image. Use this tool anytime the user wants you to look at something they're doing. """ import pyautogui buffer = io.BytesIO() # if the file exceeds ~1MB, it will be rejected by Claude screenshot = pyautogui.screenshot() screenshot.convert("RGB").save(buffer, format="JPEG", quality=60, optimize=True) return Image(data=buffer.getvalue(), format="jpeg") ======================================== File: "examples/servers/simple-prompt/mcp_simple_prompt/server.py" Size: 3679 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== import anyio import click import mcp.types as types from mcp.server.lowlevel import Server def create_messages( context: str | None = None, topic: str | None = None ) -> list[types.PromptMessage]: """Create the messages for the prompt.""" messages = [] # Add context if provided if context: messages.append( types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"Here is some relevant context: {context}" ), ) ) # Add the main prompt prompt = "Please help me with " if topic: prompt += f"the following topic: {topic}" else: prompt += "whatever questions I may have." messages.append( types.PromptMessage( role="user", content=types.TextContent(type="text", text=prompt) ) ) return messages @click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) def main(port: int, transport: str) -> int: app = Server("mcp-simple-prompt") @app.list_prompts() async def list_prompts() -> list[types.Prompt]: return [ types.Prompt( name="simple", description="A simple prompt that can take optional context and topic " "arguments", arguments=[ types.PromptArgument( name="context", description="Additional context to consider", required=False, ), types.PromptArgument( name="topic", description="Specific topic to focus on", required=False, ), ], ) ] @app.get_prompt() async def get_prompt( name: str, arguments: dict[str, str] | None = None ) -> types.GetPromptResult: if name != "simple": raise ValueError(f"Unknown prompt: {name}") if arguments is None: arguments = {} return types.GetPromptResult( messages=create_messages( context=arguments.get("context"), topic=arguments.get("topic") ), description="A simple prompt with optional context and topic arguments", ) if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route sse = SseServerTransport("/messages/") async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) import uvicorn uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: from mcp.server.stdio import stdio_server async def arun(): async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) anyio.run(arun) return 0 ======================================== File: "examples/servers/simple-resource/mcp_simple_resource/server.py" Size: 2461 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== import anyio import click import mcp.types as types from mcp.server.lowlevel import Server from pydantic import FileUrl SAMPLE_RESOURCES = { "greeting": "Hello! This is a sample text resource.", "help": "This server provides a few sample text resources for testing.", "about": "This is the simple-resource MCP server implementation.", } @click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) def main(port: int, transport: str) -> int: app = Server("mcp-simple-resource") @app.list_resources() async def list_resources() -> list[types.Resource]: return [ types.Resource( uri=FileUrl(f"file:///{name}.txt"), name=name, description=f"A sample text resource named {name}", mimeType="text/plain", ) for name in SAMPLE_RESOURCES.keys() ] @app.read_resource() async def read_resource(uri: FileUrl) -> str | bytes: name = uri.path.replace(".txt", "").lstrip("/") if name not in SAMPLE_RESOURCES: raise ValueError(f"Unknown resource: {uri}") return SAMPLE_RESOURCES[name] if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route sse = SseServerTransport("/messages/") async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) import uvicorn uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: from mcp.server.stdio import stdio_server async def arun(): async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) anyio.run(arun) return 0 ======================================== File: "examples/servers/simple-tool/mcp_simple_tool/server.py" Size: 2991 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== import anyio import click import httpx import mcp.types as types from mcp.server.lowlevel import Server async def fetch_website( url: str, ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: headers = { "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)" } async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client: response = await client.get(url) response.raise_for_status() return [types.TextContent(type="text", text=response.text)] @click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) def main(port: int, transport: str) -> int: app = Server("mcp-website-fetcher") @app.call_tool() async def fetch_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name != "fetch": raise ValueError(f"Unknown tool: {name}") if "url" not in arguments: raise ValueError("Missing required argument 'url'") return await fetch_website(arguments["url"]) @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="fetch", description="Fetches a website and returns its content", inputSchema={ "type": "object", "required": ["url"], "properties": { "url": { "type": "string", "description": "URL to fetch", } }, }, ) ] if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route sse = SseServerTransport("/messages/") async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) import uvicorn uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: from mcp.server.stdio import stdio_server async def arun(): async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) anyio.run(arun) return 0 ======================================== File: "examples/fastmcp/simple_echo.py" Size: 202 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ FastMCP Echo Server """ from mcp.server.fastmcp import FastMCP # Create server mcp = FastMCP("Echo Server") @mcp.tool() def echo(text: str) -> str: """Echo the input text""" return text ======================================== File: "examples/fastmcp/text_me.py" Size: 2090 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== # /// script # dependencies = [] # /// """ FastMCP Text Me Server -------------------------------- This defines a simple FastMCP server that sends a text message to a phone number via https://surgemsg.com/. To run this example, create a `.env` file with the following values: SURGE_API_KEY=... SURGE_ACCOUNT_ID=... SURGE_MY_PHONE_NUMBER=... SURGE_MY_FIRST_NAME=... SURGE_MY_LAST_NAME=... Visit https://surgemsg.com/ and click "Get Started" to obtain these values. """ from typing import Annotated import httpx from pydantic import BeforeValidator from pydantic_settings import BaseSettings, SettingsConfigDict from mcp.server.fastmcp import FastMCP class SurgeSettings(BaseSettings): model_config: SettingsConfigDict = SettingsConfigDict( env_prefix="SURGE_", env_file=".env" ) api_key: str account_id: str my_phone_number: Annotated[ str, BeforeValidator(lambda v: "+" + v if not v.startswith("+") else v) ] my_first_name: str my_last_name: str # Create server mcp = FastMCP("Text me") surge_settings = SurgeSettings() # type: ignore @mcp.tool(name="textme", description="Send a text message to me") def text_me(text_content: str) -> str: """Send a text message to a phone number via https://surgemsg.com/""" with httpx.Client() as client: response = client.post( "https://api.surgemsg.com/messages", headers={ "Authorization": f"Bearer {surge_settings.api_key}", "Surge-Account": surge_settings.account_id, "Content-Type": "application/json", }, json={ "body": text_content, "conversation": { "contact": { "first_name": surge_settings.my_first_name, "last_name": surge_settings.my_last_name, "phone_number": surge_settings.my_phone_number, } }, }, ) response.raise_for_status() return f"Message sent: {text_content}" ======================================== File: "examples/fastmcp/unicode_example.py" Size: 1869 bytes Last Modified: SystemTime { tv_sec: 1740722655, tv_nsec: 571457412 } ======================================== """ Example FastMCP server that uses Unicode characters in various places to help test Unicode handling in tools and inspectors. """ from mcp.server.fastmcp import FastMCP mcp = FastMCP() @mcp.tool( description="🌟 A tool that uses various Unicode characters in its description: " "á é í ó ú ñ 漢字 🎉" ) def hello_unicode(name: str = "世界", greeting: str = "¡Hola") -> str: """ A simple tool that demonstrates Unicode handling in: - Tool description (emojis, accents, CJK characters) - Parameter defaults (CJK characters) - Return values (Spanish punctuation, emojis) """ return f"{greeting}, {name}! 👋" @mcp.tool(description="🎨 Tool that returns a list of emoji categories") def list_emoji_categories() -> list[str]: """Returns a list of emoji categories with emoji examples.""" return [ "😀 Smileys & Emotion", "👋 People & Body", "🐶 Animals & Nature", "🍎 Food & Drink", "⚽ Activities", "🌍 Travel & Places", "💡 Objects", "❤️ Symbols", "🚩 Flags", ] @mcp.tool(description="🔤 Tool that returns text in different scripts") def multilingual_hello() -> str: """Returns hello in different scripts and writing systems.""" return "\n".join( [ "English: Hello!", "Spanish: ¡Hola!", "French: Bonjour!", "German: Grüß Gott!", "Russian: Привет!", "Greek: Γεια σας!", "Hebrew: !שָׁלוֹם", "Arabic: !مرحبا", "Hindi: नमस्ते!", "Chinese: 你好!", "Japanese: こんにちは!", "Korean: 안녕하세요!", "Thai: สวัสดี!", ] ) if __name__ == "__main__": mcp.run()
ID: p03zdsi6ol