MCP DuckDuckGo Search Server
by spences10
Verified
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Dict, List, Optional
import json
import asyncio
import logging
import os
from datetime import datetime, timezone, timedelta
from collections import deque
import database as db
import config as cfg
import services
from contextlib import asynccontextmanager
import colorama
from colorama import Fore, Style
colorama.init()
# Load configuration
config = cfg.Config()
def format_log_message(record):
"""Format log messages with colors and better structure"""
timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
timestamp = f"{Fore.CYAN}{timestamp}{Style.RESET_ALL}"
level = record.levelname
level_color = {
'INFO': Fore.GREEN,
'WARNING': Fore.YELLOW,
'ERROR': Fore.RED,
'CRITICAL': Fore.RED + Style.BRIGHT
}.get(level, '')
level_formatted = f"{level_color}{level:8}{Style.RESET_ALL}"
message = record.getMessage()
return f"{timestamp} - {level_formatted} - {message}"
# Configure logging with custom formatter
class ColoredFormatter(logging.Formatter):
def format(self, record):
timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
timestamp = f"{Fore.CYAN}{timestamp}{Style.RESET_ALL}"
level = record.levelname
level_color = {
'INFO': Fore.GREEN,
'WARNING': Fore.YELLOW,
'ERROR': Fore.RED,
'CRITICAL': Fore.RED + Style.BRIGHT
}.get(level, '')
level_formatted = f"{level_color}{level:8}{Style.RESET_ALL}"
message = record.getMessage()
return f"{timestamp} - {level_formatted} - {message}"
# Set up root logger
root = logging.getLogger()
root.setLevel(logging.INFO)
# Configure our application logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create console handler with custom formatter
console_handler = logging.StreamHandler()
console_handler.setFormatter(ColoredFormatter())
console_handler.setLevel(logging.INFO)
# Remove any existing handlers from all loggers
root.handlers = []
logger.handlers = []
# Add console handler to root logger
root.addHandler(console_handler)
logger.addHandler(console_handler)
# Configure uvicorn loggers
for logger_name in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
uvicorn_logger = logging.getLogger(logger_name)
uvicorn_logger.handlers = []
uvicorn_logger.propagate = True
uvicorn_logger.setLevel(logging.INFO)
# Set log level for watchfiles logger
watchfiles_logger = logging.getLogger('watchfiles')
watchfiles_logger.setLevel(logging.INFO)
watchfiles_logger.propagate = True
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan events handler"""
# Start background tasks
await manager.start_history_tracking()
yield
app = FastAPI(
title="MCP Server",
description="ModelContextProtocol Server with Web UI",
lifespan=lifespan
)
# CORS middleware configuration with explicit methods
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
)
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.client_info: Dict[str, dict] = {}
self.start_time = datetime.now(timezone.utc)
self.update_task = None
async def start_history_tracking(self):
"""Start the connection history tracking task"""
self.update_task = asyncio.create_task(self._update_connection_history())
async def _update_connection_history(self):
"""Background task to update connection history"""
while True:
current_connections = len(self.active_connections)
current_time = datetime.now(timezone.utc)
# Store in database with current time
db.add_connection_record(current_connections, current_time)
# Clean up old records
db.cleanup_old_records()
await asyncio.sleep(10) # Update every 10 seconds instead of every minute
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
self.client_info[client_id] = {
"connected_at": datetime.now(timezone.utc).isoformat(),
"last_ping": datetime.now(timezone.utc).isoformat()
}
logger.info(f"Client {client_id} connected")
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
if client_id in self.client_info:
del self.client_info[client_id]
logger.info(f"Client {client_id} disconnected")
async def broadcast(self, message: str):
for connection in self.active_connections.values():
await connection.send_text(message)
def get_active_clients(self) -> List[dict]:
return [
{"client_id": client_id, **info}
for client_id, info in self.client_info.items()
]
def get_connection_history(self) -> List[dict]:
# Get history from database
history = db.get_connection_history()
# Return the raw data without aggregation
return [
{
"time": entry["timestamp"],
"connections": int(entry["connections"]) # Ensure integer values
}
for entry in history
]
def get_uptime(self) -> str:
uptime = datetime.now(timezone.utc) - self.start_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
manager = ConnectionManager()
# WebSocket endpoint for MCP connections
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
# Handle different message types here
if message.get("type") == "ping":
await websocket.send_json({"type": "pong"})
manager.client_info[client_id]["last_ping"] = datetime.now(timezone.utc).isoformat()
else:
# Echo back the message for now
await websocket.send_text(data)
except json.JSONDecodeError:
logger.error(f"Invalid JSON received from client {client_id}")
await websocket.send_json({"error": "Invalid JSON format"})
except WebSocketDisconnect:
manager.disconnect(client_id)
except Exception as e:
logger.error(f"Error in websocket connection: {str(e)}")
manager.disconnect(client_id)
# REST API endpoints for configuration
class ServerSettingsUpdate(BaseModel):
host: str
port: int
max_connections: int
ping_timeout: int
class MCPServer(BaseModel):
name: str
type: str
command: str
args: List[str]
env: Dict[str, str]
class MCPSettingsUpdate(BaseModel):
protocol_version: str
max_context_length: int
default_temperature: float
max_tokens: int
mcpServers: List[MCPServer]
class SettingsUpdate(BaseModel):
server: ServerSettingsUpdate
mcp: MCPSettingsUpdate
@app.get("/api/settings")
async def get_settings():
"""Get current server and MCP settings"""
try:
# Convert Pydantic models to dictionaries for JSON serialization
server_config = {
"host": config.server.host,
"port": config.server.port,
"max_connections": config.server.max_connections,
"ping_timeout": config.server.ping_timeout
}
mcp_config = {
"protocol_version": config.mcp.protocol_version,
"max_context_length": config.mcp.max_context_length,
"default_temperature": config.mcp.default_temperature,
"max_tokens": config.mcp.max_tokens,
"mcpServers": config.mcp.mcpServers
}
return {
"server": server_config,
"mcp": mcp_config
}
except Exception as e:
logger.error(f"Error getting settings: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/settings")
async def update_settings(settings: SettingsUpdate):
"""Update server and MCP settings"""
try:
# Update server settings
config.update_server_config(settings.server.dict())
# Update MCP settings
config.update_mcp_config(settings.mcp.dict())
# Only restart if MCP servers configuration changed
needs_restart = False
if settings.mcp.mcpServers != config.mcp.mcpServers:
needs_restart = True
return {
"status": "success",
"message": "Settings updated successfully",
"requiresRestart": needs_restart
}
except Exception as e:
logger.error(f"Error updating settings: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/clients")
async def get_clients():
"""Get list of connected clients"""
return manager.get_active_clients()
@app.get("/api/status")
async def get_status(hours: float = 1):
"""Get server status"""
history = db.get_connection_history(hours=hours)
# Ensure timestamps are in ISO format with timezone
for entry in history:
if isinstance(entry["timestamp"], datetime):
entry["time"] = entry["timestamp"].isoformat()
else:
entry["time"] = entry["timestamp"]
del entry["timestamp"]
return {
"status": "running",
"active_clients": len(manager.active_connections),
"uptime": manager.get_uptime(),
"version": "1.0.0",
"connection_history": history
}
@app.delete("/api/clients/{client_id}")
async def disconnect_client(client_id: str):
"""Disconnect a specific client"""
if client_id not in manager.active_connections:
raise HTTPException(status_code=404, detail="Client not found")
try:
# Close the WebSocket connection
await manager.active_connections[client_id].close()
# Remove from manager
manager.disconnect(client_id)
return {"status": "success", "message": f"Client {client_id} disconnected"}
except Exception as e:
logger.error(f"Error disconnecting client {client_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# Define the service config update model
class ServiceConfigUpdate(BaseModel):
enabled: bool
config: Optional[Dict[str, str]] = None
@app.get("/api/services")
async def get_services():
"""Get all services configuration"""
return services.services_manager.get_all_services()
@app.get("/api/services/{service_id}")
async def get_service(service_id: str):
"""Get configuration for a specific service"""
service_config = services.services_manager.get_service_config(service_id)
if service_config is None:
raise HTTPException(status_code=404, detail="Service not found")
return service_config
@app.put("/api/services/{service_id}")
async def update_service(service_id: str, config_update: ServiceConfigUpdate):
"""Update configuration for a specific service"""
try:
logger.info(f"Updating service {service_id} - enabled: {config_update.enabled}")
# Get current service config
current_config = services.services_manager.get_service_config(service_id)
if current_config is None:
logger.warning(f"Service {service_id} not found")
raise HTTPException(status_code=404, detail=f"Service {service_id} not found")
# Update the service
updated_config = await services.services_manager.update_service_config(
service_id,
enabled=config_update.enabled,
config=config_update.config or {}
)
logger.info(f"Service {service_id} updated successfully")
return {
"status": "success",
"message": f"Service {service_id} updated successfully",
"enabled": updated_config.enabled,
"config": updated_config.config
}
except ValueError as e:
logger.error(f"Invalid configuration for service {service_id}: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error updating service {service_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# Only serve static files in production mode
if os.path.exists("frontend/build"):
app.mount("/", StaticFiles(directory="frontend/build", html=True), name="static")
logger.info("Serving static files from frontend/build directory")
else:
logger.info("Development mode: Using React development server on port 3000")
if __name__ == "__main__":
import uvicorn
logger.info(f"Starting server on {config.server.host}:{config.server.port}")
uvicorn.run(
"server:app",
host=config.server.host,
port=config.server.port,
log_level="info",
reload=True
)