MCP DuckDuckGo Search Server

by spences10
Verified
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from sse_starlette.sse import EventSourceResponse import json import asyncio import logging from typing import Dict, List, Optional, Any from pydantic import BaseModel import services from datetime import datetime, timedelta from enum import Enum import time import traceback # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('mcp_server.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def set_debug_mode(enabled: bool): """Update logging level based on debug mode""" if enabled: logger.setLevel(logging.DEBUG) logger.debug("Debug mode enabled - verbose logging activated") else: logger.setLevel(logging.INFO) logger.info("Debug mode disabled - returning to normal logging") app = FastAPI(title="MCP Server for Cursor") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"], max_age=3600 ) # Store service configurations and settings service_configs = { 'github': {'enabled': False, 'config': {}}, 'slack': {'enabled': False, 'config': {}}, 'google_drive': {'enabled': False, 'config': {}}, 'azure': {'enabled': False, 'config': {}}, 'vscode': {'enabled': False, 'config': {}} } server_settings = { 'host': '0.0.0.0', 'port': 8765, 'maxConnections': 100, 'pingTimeout': 30, 'debugMode': False, 'sslEnabled': False } mcp_settings = { 'protocolVersion': '1.0.0', 'maxContextLength': 4096, 'defaultTemperature': 0.7, 'maxTokens': 2048 } class ContentType(str, Enum): TEXT = "text" CODE = "code" IMAGE = "image" class Content(BaseModel): type: ContentType text: str language: Optional[str] = None alt: Optional[str] = None class ToolResponse(BaseModel): content: List[Content] isError: Optional[bool] = False # Define available tools with proper schema for Cursor TOOLS = [ { "name": "test", "display_name": "Test", "description": "A simple test tool to verify MCP is working", "schema": { "type": "object", "properties": { "message": { "type": "string", "description": "Message to echo back" } }, "required": ["message"] }, "enabled": True, "version": "1.0" }, { "name": "google_drive", "display_name": "Google Drive", "description": "Access and manage files in Google Drive", "schema": { "type": "object", "properties": { "operation": { "type": "string", "description": "Operation to perform", "enum": ["list", "upload", "create_folder", "delete"] }, "folder_id": { "type": "string", "description": "Google Drive folder ID (optional)" }, "file_id": { "type": "string", "description": "Google Drive file ID (required for delete operation)" }, "content": { "type": "string", "description": "File content for upload (required for upload operation)" }, "filename": { "type": "string", "description": "Name of the file (required for upload and create_folder operations)" } }, "required": ["operation"] }, "enabled": True, "version": "1.0" } ] class ConnectionManager: def __init__(self): self.connected_clients: Dict[str, dict] = {} self.connection_history: List[dict] = [] self.start_time = time.time() def add_client(self, client_id: str, websocket: WebSocket, headers: dict, client_info: dict) -> None: self.connected_clients[client_id] = { "websocket": websocket, "status": "Connected", "connected_since": datetime.now().isoformat(), "last_ping": datetime.now().isoformat(), "client_info": client_info } self.add_history_entry("connect", client_id, client_info) def remove_client(self, client_id: str, reason: str = "Connection closed") -> None: if client_id in self.connected_clients: del self.connected_clients[client_id] self.add_history_entry("disconnect", client_id, {"reason": reason}) def update_client_ping(self, client_id: str) -> None: if client_id in self.connected_clients: self.connected_clients[client_id]["last_ping"] = datetime.now().isoformat() def add_history_entry(self, event: str, client_id: str, extra_info: dict = None) -> None: current_time = datetime.now() self.connection_history.append({ "time": current_time.isoformat(), "connections": len(self.connected_clients), "debug_info": { "event": event, "client_id": client_id, **extra_info } if server_settings['debugMode'] else None }) def get_history(self, minutes: int = 30) -> List[dict]: current_time = datetime.now() cutoff_time = current_time - timedelta(minutes=minutes) # Clean up old entries self.connection_history = [ entry for entry in self.connection_history if datetime.fromisoformat(entry["time"]) > cutoff_time ] # If there are no recent entries but we have active clients, # add a current data point if len(self.connection_history) == 0 and len(self.connected_clients) > 0: self.add_history_entry("history_backfill", "system") # If the last entry was more than 10 seconds ago, add a current point elif len(self.connection_history) > 0: last_entry_time = datetime.fromisoformat(self.connection_history[-1]["time"]) if (current_time - last_entry_time).total_seconds() > 10: self.add_history_entry("history_update", "system") return self.connection_history def get_active_clients(self) -> List[dict]: return [ { "id": client_id, "status": client["status"], "connectedSince": client["connected_since"], "lastPing": client["last_ping"] } for client_id, client in self.connected_clients.items() ] def get_status(self) -> dict: uptime = str(timedelta(seconds=int(time.time() - self.start_time))) return { "status": "Online", "activeClients": len(self.connected_clients), "uptime": uptime, "version": "1.0.0" } async def start_history_tracking(self): async def update_history(): while True: try: self.add_history_entry("periodic_update", "system") except Exception as e: logger.error(f"Error updating connection history: {str(e)}") await asyncio.sleep(10) # Update every 10 seconds asyncio.create_task(update_history()) # Create a global connection manager connection_manager = ConnectionManager() @app.get("/sse") async def sse(request: Request): """SSE endpoint that sends capabilities and heartbeat messages""" logger.info(f"Received SSE connection from {request.client}") logger.info(f"Request headers: {dict(request.headers)}") async def event_generator(): # Send initial capabilities message capabilities = { "type": "capabilities", "capabilities": { "tools": TOOLS } } logger.info(f"Sending capabilities: {json.dumps(capabilities, indent=2)}") yield { "data": json.dumps(capabilities) } # Send heartbeat every 5 seconds while True: try: await asyncio.sleep(5) heartbeat = { "type": "heartbeat", "timestamp": datetime.now().isoformat() } logger.info(f"Sending heartbeat: {json.dumps(heartbeat, indent=2)}") yield { "data": json.dumps(heartbeat) } except Exception as e: logger.error(f"Error in event stream: {str(e)}") break return EventSourceResponse(event_generator()) @app.post("/invoke/{tool_name}") async def invoke_tool(tool_name: str, request: Request): """Endpoint to invoke a specific tool""" try: logger.info(f"Tool invocation request for {tool_name}") data = await request.json() if server_settings['debugMode']: logger.debug(f"Tool parameters: {json.dumps(data, indent=2)}") logger.debug(f"Request headers: {dict(request.headers)}") else: logger.info(f"Tool parameters: {json.dumps(data, indent=2)}") # Find the requested tool tool = next((t for t in TOOLS if t["name"] == tool_name), None) if not tool: logger.error(f"Tool {tool_name} not found") raise HTTPException(status_code=404, detail=f"Tool {tool_name} not found") # Handle test tool if tool_name == "test": message = data.get("arguments", {}).get("message", "No message provided") return ToolResponse( content=[ Content( type=ContentType.TEXT, text=f"Test successful! Message: {message}" ) ] ) # Handle Google Drive tool if tool_name == "google_drive": # Map the operation to the appropriate handler method operation = data.get("arguments", {}).get("operation") if not operation: raise ValueError("Operation not specified") handler = services.services_manager.get_handler("google_drive") if not handler: raise ValueError("Google Drive service not initialized") # Execute the operation result = None if operation == "list": result = await handler.list_files( folder_id=data.get("arguments", {}).get("folder_id"), page_size=10 ) elif operation == "upload": content = data.get("arguments", {}).get("content") filename = data.get("arguments", {}).get("filename") if not content or not filename: raise ValueError("Content and filename required for upload") result = await handler.upload_file( file_content=content.encode(), filename=filename, mime_type="text/plain", folder_id=data.get("arguments", {}).get("folder_id") ) elif operation == "create_folder": folder_name = data.get("arguments", {}).get("filename") if not folder_name: raise ValueError("Folder name required") result = await handler.create_folder( folder_name=folder_name, parent_folder_id=data.get("arguments", {}).get("folder_id") ) elif operation == "delete": file_id = data.get("arguments", {}).get("file_id") if not file_id: raise ValueError("File ID required for delete") result = await handler.delete_file(file_id) else: raise ValueError(f"Unknown operation: {operation}") # Format response return ToolResponse( content=[ Content( type=ContentType.TEXT, text=json.dumps(result, indent=2) if isinstance(result, (dict, list)) else str(result) ) ] ) except Exception as e: logger.error(f"Error invoking tool {tool_name}: {str(e)}") error_response = { "content": [ Content( type=ContentType.TEXT, text=str(e) ) ], "isError": True } # Add stack trace in debug mode if server_settings['debugMode']: error_response["content"].append( Content( type=ContentType.TEXT, text=f"Stack trace:\n{traceback.format_exc()}" ) ) logger.debug(f"Full stack trace:\n{traceback.format_exc()}") return ToolResponse(**error_response) @app.on_event("startup") async def startup_event(): await connection_manager.start_history_tracking() @app.get("/api/status") async def get_status(): """Get current server status""" return connection_manager.get_status() @app.get("/api/connections/history") async def get_connection_history(): """Get connection history for the last 30 minutes""" return connection_manager.get_history() @app.get("/api/clients") async def get_active_clients(): """Get list of currently connected clients""" return connection_manager.get_active_clients() @app.post("/api/clients/{client_id}/disconnect") async def disconnect_client(client_id: str): """Disconnect a specific client""" if client_id in connection_manager.connected_clients: connection_manager.remove_client(client_id, "Manual disconnect") return {"message": f"Client {client_id} disconnected"} raise HTTPException(status_code=404, detail="Client not found") @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: str): """WebSocket endpoint for real-time updates""" # If client is already connected, close the old connection if client_id in connection_manager.connected_clients: try: old_websocket = connection_manager.connected_clients[client_id]["websocket"] await old_websocket.close(code=1000, reason="New connection established") logger.info(f"Closed existing connection for client {client_id}") except Exception as e: logger.error(f"Error closing existing connection for {client_id}: {str(e)}") finally: connection_manager.remove_client(client_id, "New connection") await websocket.accept() if server_settings['debugMode']: logger.debug(f"New WebSocket connection from {client_id}") logger.debug(f"WebSocket headers: {dict(websocket.headers)}") # Register the new client client_info = { "user_agent": websocket.headers.get("user-agent"), "ip": websocket.client.host, "port": websocket.client.port } if server_settings['debugMode'] else {} connection_manager.add_client(client_id, websocket, dict(websocket.headers), client_info) try: while True: try: data = await websocket.receive_text() if server_settings['debugMode']: logger.debug(f"Received WebSocket message from {client_id}: {data}") if data == "ping": connection_manager.update_client_ping(client_id) await websocket.send_text("pong") if server_settings['debugMode']: logger.debug(f"Sent pong response to {client_id}") except WebSocketDisconnect: raise # Re-raise to handle in outer try/except except Exception as e: logger.error(f"Error handling message from {client_id}: {str(e)}") if server_settings['debugMode']: logger.debug(f"Stack trace:\n{traceback.format_exc()}") continue # Continue listening for messages except WebSocketDisconnect: if server_settings['debugMode']: logger.debug(f"WebSocket connection closed for {client_id}") except Exception as e: logger.error(f"Unexpected error in WebSocket connection for {client_id}: {str(e)}") if server_settings['debugMode']: logger.debug(f"Stack trace:\n{traceback.format_exc()}") finally: connection_manager.remove_client(client_id, "Connection closed") @app.get("/api/services/{service_id}") async def get_service_config(service_id: str): """Get configuration for a specific service""" if service_id not in service_configs: raise HTTPException(status_code=404, detail=f"Service {service_id} not found") return service_configs[service_id] @app.put("/api/services/{service_id}") async def update_service_config(service_id: str, config: dict): """Update configuration for a specific service""" if service_id not in service_configs: raise HTTPException(status_code=404, detail=f"Service {service_id} not found") service_configs[service_id] = config return {"message": f"Service {service_id} configuration updated"} @app.post("/api/services/{service_id}/toggle") async def toggle_service(service_id: str, data: dict): """Toggle a service on/off""" if service_id not in service_configs: raise HTTPException(status_code=404, detail=f"Service {service_id} not found") service_configs[service_id]['enabled'] = data.get('enabled', False) return {"message": f"Service {service_id} {'enabled' if data.get('enabled') else 'disabled'}"} @app.get("/api/settings") async def get_settings(): """Get server and MCP settings""" return { "server": server_settings, "mcp": mcp_settings } @app.put("/api/settings") async def update_settings(settings: dict): """Update server and MCP settings""" if 'server' in settings: # Check if debug mode is being changed if 'debugMode' in settings['server'] and settings['server']['debugMode'] != server_settings['debugMode']: set_debug_mode(settings['server']['debugMode']) server_settings.update(settings['server']) if 'mcp' in settings: mcp_settings.update(settings['mcp']) return {"message": "Settings updated successfully"} if __name__ == "__main__": import uvicorn logger.info("Starting MCP Server with tools:") for tool in TOOLS: logger.info(f" - {tool['name']} ({tool['display_name']})") uvicorn.run(app, host="0.0.0.0", port=8765, log_level="info")