Skip to main content
Glama
suspicious-cow

Model Context Protocol (MCP) Server

server.py12.4 kB
import asyncio import websockets import json from typing import Dict, Any import sqlite3 # --- MCP Server Implementation --- # This server accepts WebSocket connections and speaks JSON-RPC 2.0. # It supports the MCP handshake (initialize), tool listing/calling, and resource listing/reading. # Server info and capabilities (as per MCP spec) SERVER_INFO = { "name": "ExamplePythonMCPServer", "version": "1.0.0" } CAPABILITIES = { "logging": {}, "prompts": {"listChanged": False}, "resources": {"subscribe": False, "listChanged": False}, "tools": {"listChanged": False} } # Example tool definition (add two numbers) TOOLS = [ { "name": "add_numbers", "description": "Add two numbers and return the sum.", "inputSchema": { "type": "object", "properties": { "a": {"type": "number", "description": "First number"}, "b": {"type": "number", "description": "Second number"} }, "required": ["a", "b"] } }, { "name": "employee_information", "description": "Get information about employees filtered by department (case-insensitive). Leave department empty to get all employees.", "inputSchema": { "type": "object", "properties": { "department": {"type": "string", "description": "Department name (optional, case-insensitive)"} }, "required": [] } } ] # Example resource definition (static text file) RESOURCES = [ { "uri": "file:///example.txt", "name": "Example Text File", "description": "A static example text file.", "mimeType": "text/plain" } ] # Resource content RESOURCE_CONTENTS = { "file:///example.txt": "Hello, this is the content of example.txt!" } # Initialize SQLite database and sample data if not exists def init_sqlite_db(): import os db_path = os.path.join(os.path.dirname(__file__), 'company.db') conn = sqlite3.connect(db_path) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS employees ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, department TEXT NOT NULL, email TEXT NOT NULL UNIQUE, hire_date TEXT NOT NULL )''') # Insert sample data if table is empty c.execute('SELECT COUNT(*) FROM employees') if c.fetchone()[0] == 0: c.executemany('''INSERT INTO employees (name, department, email, hire_date) VALUES (?, ?, ?, ?)''', [ ('Alice Smith', 'Engineering', 'alice.smith@example.com', '2020-01-15'), ('Bob Johnson', 'Marketing', 'bob.johnson@example.com', '2019-07-23'), ('Carol Lee', 'Sales', 'carol.lee@example.com', '2021-03-10'), ('David Kim', 'Engineering', 'david.kim@example.com', '2018-11-05'), ('Eva Brown', 'HR', 'eva.brown@example.com', '2022-06-01') ]) conn.commit() conn.close() init_sqlite_db() # WebSocket connection handler (note websockets.serve() expects this signature) async def handle_jsonrpc(websocket): client_info = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}" print(f"Client connected from {client_info}!") async for message in websocket: try: print(f"[DEBUG] Received message from {client_info}: {message}") request = json.loads(message) # Log method and params method = request.get("method") params = request.get("params", {}) req_id = request.get("id") print(f"[DEBUG] Processing {method} request (id={req_id}) with params: {json.dumps(params)}") # Process the request response = await handle_request(request) if response is not None: # Pretty print the response for logging response_pretty = json.dumps(response, indent=2) print(f"[DEBUG] Sending response for {method}:\n{response_pretty}") await websocket.send(json.dumps(response)) except json.JSONDecodeError as je: print(f"[DEBUG] JSON decode error: {je}") # Send JSON-RPC error response for malformed JSON error_response = { "jsonrpc": "2.0", "id": None, # We can't know the id if JSON parsing failed "error": {"code": -32700, "message": f"Parse error: {str(je)}"} } print(f"[DEBUG] Sending parse error response") await websocket.send(json.dumps(error_response)) except Exception as e: print(f"[DEBUG] Error processing request: {e}") # Send JSON-RPC error response error_response = { "jsonrpc": "2.0", "id": request.get("id") if 'request' in locals() else None, "error": {"code": -32603, "message": str(e)} } print(f"[DEBUG] Sending error response: {error_response}") await websocket.send(json.dumps(error_response)) async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]: method = request.get("method") req_id = request.get("id") params = request.get("params", {}) # --- MCP handshake --- if method == "initialize": return { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2024-11-05", "capabilities": CAPABILITIES, "serverInfo": SERVER_INFO } } # --- Tools discovery --- elif method == "tools/list": return { "jsonrpc": "2.0", "id": req_id, "result": TOOLS } # --- Tools invocation --- elif method == "tools/call": name = params.get("name") arguments = params.get("arguments", {}) print(f"[DEBUG] Calling tool '{name}' with arguments: {json.dumps(arguments)}") # Check if the tool exists tool_exists = any(tool["name"] == name for tool in TOOLS) if not tool_exists: print(f"[ERROR] Unknown tool: {name}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": f"Unknown tool: {name}"} } try: if name == "add_numbers": a = arguments.get("a") b = arguments.get("b") if a is None or b is None: print(f"[ERROR] Missing arguments for add_numbers: a={a}, b={b}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": f"Missing required arguments for add_numbers: a={a}, b={b}"} } # Validate types if not (isinstance(a, (int, float)) and isinstance(b, (int, float))): print(f"[ERROR] Invalid argument types for add_numbers: a={type(a)}, b={type(b)}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": f"Arguments must be numbers: a={type(a)}, b={type(b)}"} } result = {"sum": a + b} print(f"[DEBUG] add_numbers result: {result}") return { "jsonrpc": "2.0", "id": req_id, "result": result } elif name == "employee_information": # Query employees from SQLite database, optionally filter by department import os db_path = os.path.join(os.path.dirname(__file__), 'company.db') if not os.path.exists(db_path): print(f"[ERROR] Database file not found: {db_path}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32603, "message": "Database file not found"} } try: conn = sqlite3.connect(db_path) c = conn.cursor() department = arguments.get("department") if department: print(f"[DEBUG] Filtering employees by department: {department}") # Use case-insensitive search with LOWER() function c.execute("SELECT id, name, department, email, hire_date FROM employees WHERE LOWER(department) = LOWER(?)", (department,)) else: print(f"[DEBUG] Getting all employees") c.execute("SELECT id, name, department, email, hire_date FROM employees") rows = c.fetchall() conn.close() employees = [ {"id": row[0], "name": row[1], "department": row[2], "email": row[3], "hire_date": row[4]} for row in rows ] print(f"[DEBUG] Found {len(employees)} employees") return { "jsonrpc": "2.0", "id": req_id, "result": {"employees": employees} } except sqlite3.Error as e: print(f"[ERROR] Database error: {e}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32603, "message": f"Database error: {str(e)}"} } else: # This shouldn't happen due to the check at the beginning, but just in case return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": f"Unknown tool: {name}"} } except Exception as e: print(f"[ERROR] Exception while calling tool {name}: {e}") return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32603, "message": f"Internal error: {str(e)}"} } # --- Resources discovery --- elif method == "resources/list": return { "jsonrpc": "2.0", "id": req_id, "result": RESOURCES } # --- Resources read --- elif method == "resources/read": uris = params.get("uris", []) contents = [] for uri in uris: if uri in RESOURCE_CONTENTS: contents.append({ "uri": uri, "mimeType": "text/plain", "text": RESOURCE_CONTENTS[uri] }) return { "jsonrpc": "2.0", "id": req_id, "result": {"contents": contents} } # --- Prompts discovery (empty for now) --- elif method == "prompts/list": return { "jsonrpc": "2.0", "id": req_id, "result": [] } # --- Prompts get (not implemented) --- elif method == "prompts/get": return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "No prompts implemented."} } else: return { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown method: {method}"} } async def main(): print("Starting MCP server on ws://localhost:8765 ...") # Modern websockets library uses single-argument handlers (no path) async with websockets.serve(handle_jsonrpc, "localhost", 8765): print("Server started successfully!") await asyncio.Future() # run forever if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("Server stopped by user") except Exception as e: print(f"Error starting server: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/suspicious-cow/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server