Skip to main content
Glama

Fast MCP Task Manager

server.pyโ€ข16.6 kB
#!/usr/bin/env python3 """ Fast MCP Server with Google Gemini Integration This FastAPI server implements a Model Context Protocol (MCP) server that: 1. Provides MCP tools for task management and data processing 2. Integrates with Google Gemini CLI for AI-powered responses 3. Logs all operations for tracking and debugging 4. Supports HTTP POST requests for tool execution Author: FAST MCP Team Version: 1.0.0 """ import json import logging import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd import uvicorn from fastapi import FastAPI, HTTPException from pydantic import BaseModel # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="Fast MCP Server", description="A Model Context Protocol server with Google Gemini integration", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) # Global variables DATA_FILE = Path("sample_data.json") LOG_FILE = Path("mcp_logs.json") SERVER_PORT = 5001 # Pydantic models for request/response validation class MCPToolRequest(BaseModel): """Model for MCP tool requests""" tool_name: str arguments: Dict[str, Any] = {} user_id: Optional[str] = None class MCPToolResponse(BaseModel): """Model for MCP tool responses""" success: bool result: Any message: str timestamp: str tool_name: str class GeminiRequest(BaseModel): """Model for Gemini API requests""" prompt: str context: Optional[str] = None user_id: Optional[str] = None class GeminiResponse(BaseModel): """Model for Gemini API responses""" success: bool response: str timestamp: str prompt: str class MCPLogEntry(BaseModel): """Model for logging MCP operations""" timestamp: str operation: str tool_name: Optional[str] = None prompt: Optional[str] = None success: bool result: Any error: Optional[str] = None # Utility functions def load_data() -> Dict[str, Any]: """Load sample data from JSON file""" try: with open(DATA_FILE, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"Data file {DATA_FILE} not found") return {"tasks": [], "users": [], "projects": []} except json.JSONDecodeError as e: logger.error(f"Error parsing JSON data: {e}") return {"tasks": [], "users": [], "projects": []} def save_data(data: Dict[str, Any]) -> bool: """Save data to JSON file""" try: with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: logger.error(f"Error saving data: {e}") return False def log_operation(log_entry: MCPLogEntry) -> None: """Log MCP operations to JSON file""" try: logs = [] if LOG_FILE.exists(): with open(LOG_FILE, 'r') as f: logs = json.load(f) logs.append(log_entry.dict()) with open(LOG_FILE, 'w') as f: json.dump(logs, f, indent=2) except Exception as e: logger.error(f"Error logging operation: {e}") def call_gemini_cli(prompt: str, context: Optional[str] = None) -> str: """Call Google Gemini CLI using subprocess""" try: # Prepare the command for Gemini CLI # Note: This assumes Gemini CLI is installed and configured full_prompt = prompt if context: full_prompt = f"Context: {context}\n\nPrompt: {prompt}" cmd = ["gemini", full_prompt] logger.info(f"Calling Gemini CLI with prompt: {prompt[:100]}...") # Execute the command result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 # 30 second timeout ) if result.returncode == 0: return result.stdout.strip() else: error_msg = f"Gemini CLI error: {result.stderr}" logger.error(error_msg) return f"Error calling Gemini: {error_msg}" except subprocess.TimeoutExpired: error_msg = "Gemini CLI call timed out" logger.error(error_msg) return f"Error: {error_msg}" except FileNotFoundError: error_msg = "Gemini CLI not found. Please install and configure Gemini CLI." logger.error(error_msg) return f"Error: {error_msg}" except Exception as e: error_msg = f"Unexpected error calling Gemini CLI: {str(e)}" logger.error(error_msg) return f"Error: {error_msg}" # MCP Tools Implementation class MCPTools: """Collection of MCP tools for task management and data processing""" @staticmethod def list_tasks(status_filter: Optional[str] = None) -> Dict[str, Any]: """List all tasks, optionally filtered by status""" data = load_data() tasks = data.get("tasks", []) if status_filter: tasks = [task for task in tasks if task.get("status") == status_filter] return { "total_tasks": len(tasks), "tasks": tasks, "filter_applied": status_filter } @staticmethod def create_task(title: str, description: str, priority: str = "medium", assigned_to: Optional[str] = None) -> Dict[str, Any]: """Create a new task""" data = load_data() tasks = data.get("tasks", []) # Generate new task ID max_id = max([task.get("id", 0) for task in tasks], default=0) new_id = max_id + 1 new_task = { "id": new_id, "title": title, "description": description, "status": "pending", "priority": priority, "assigned_to": assigned_to, "created_at": datetime.now().isoformat(), "due_date": None } tasks.append(new_task) data["tasks"] = tasks if save_data(data): return {"success": True, "task": new_task, "message": "Task created successfully"} else: return {"success": False, "message": "Failed to save task"} @staticmethod def update_task_status(task_id: int, new_status: str) -> Dict[str, Any]: """Update task status""" data = load_data() tasks = data.get("tasks", []) task_found = False for task in tasks: if task.get("id") == task_id: task["status"] = new_status task_found = True break if not task_found: return {"success": False, "message": f"Task with ID {task_id} not found"} if save_data(data): return {"success": True, "message": f"Task {task_id} status updated to {new_status}"} else: return {"success": False, "message": "Failed to save task update"} @staticmethod def get_task_statistics() -> Dict[str, Any]: """Get task statistics and analytics""" data = load_data() tasks = data.get("tasks", []) if not tasks: return {"message": "No tasks found"} # Calculate statistics total_tasks = len(tasks) status_counts = {} priority_counts = {} for task in tasks: status = task.get("status", "unknown") priority = task.get("priority", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 priority_counts[priority] = priority_counts.get(priority, 0) + 1 return { "total_tasks": total_tasks, "status_distribution": status_counts, "priority_distribution": priority_counts, "completion_rate": round((status_counts.get("completed", 0) / total_tasks) * 100, 2) } @staticmethod def search_tasks(query: str) -> Dict[str, Any]: """Search tasks by title or description""" data = load_data() tasks = data.get("tasks", []) query_lower = query.lower() matching_tasks = [] for task in tasks: title = task.get("title", "").lower() description = task.get("description", "").lower() if query_lower in title or query_lower in description: matching_tasks.append(task) return { "query": query, "matches_found": len(matching_tasks), "tasks": matching_tasks } @staticmethod def export_tasks_to_csv() -> Dict[str, Any]: """Export tasks to CSV format""" data = load_data() tasks = data.get("tasks", []) if not tasks: return {"success": False, "message": "No tasks to export"} try: df = pd.DataFrame(tasks) csv_filename = f"tasks_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(csv_filename, index=False) return { "success": True, "message": f"Tasks exported to {csv_filename}", "filename": csv_filename, "task_count": len(tasks) } except Exception as e: return {"success": False, "message": f"Export failed: {str(e)}"} # FastAPI Routes @app.get("/") async def root(): """Root endpoint with server information""" return { "message": "Fast MCP Server with Google Gemini Integration", "version": "1.0.0", "status": "running", "port": SERVER_PORT, "docs": "/docs", "available_tools": [ "list_tasks", "create_task", "update_task_status", "get_task_statistics", "search_tasks", "export_tasks_to_csv" ] } @app.get("/tools") async def list_mcp_tools(): """List all available MCP tools""" return { "tools": [ { "name": "list_tasks", "description": "List all tasks with optional status filtering", "parameters": ["status_filter (optional)"] }, { "name": "create_task", "description": "Create a new task", "parameters": ["title", "description", "priority (optional)", "assigned_to (optional)"] }, { "name": "update_task_status", "description": "Update the status of an existing task", "parameters": ["task_id", "new_status"] }, { "name": "get_task_statistics", "description": "Get task statistics and analytics", "parameters": [] }, { "name": "search_tasks", "description": "Search tasks by title or description", "parameters": ["query"] }, { "name": "export_tasks_to_csv", "description": "Export all tasks to CSV file", "parameters": [] } ] } @app.post("/mcp/tool", response_model=MCPToolResponse) async def call_mcp_tool(request: MCPToolRequest): """Execute an MCP tool""" timestamp = datetime.now().isoformat() try: logger.info(f"Executing MCP tool: {request.tool_name}") # Route to appropriate tool method if request.tool_name == "list_tasks": result = MCPTools.list_tasks(request.arguments.get("status_filter")) elif request.tool_name == "create_task": result = MCPTools.create_task( request.arguments.get("title", ""), request.arguments.get("description", ""), request.arguments.get("priority", "medium"), request.arguments.get("assigned_to") ) elif request.tool_name == "update_task_status": result = MCPTools.update_task_status( request.arguments.get("task_id"), request.arguments.get("new_status") ) elif request.tool_name == "get_task_statistics": result = MCPTools.get_task_statistics() elif request.tool_name == "search_tasks": result = MCPTools.search_tasks(request.arguments.get("query", "")) elif request.tool_name == "export_tasks_to_csv": result = MCPTools.export_tasks_to_csv() else: raise HTTPException(status_code=400, detail=f"Unknown tool: {request.tool_name}") # Log successful operation log_entry = MCPLogEntry( timestamp=timestamp, operation="mcp_tool_call", tool_name=request.tool_name, success=True, result=result ) log_operation(log_entry) return MCPToolResponse( success=True, result=result, message=f"Tool {request.tool_name} executed successfully", timestamp=timestamp, tool_name=request.tool_name ) except Exception as e: error_msg = f"Error executing tool {request.tool_name}: {str(e)}" logger.error(error_msg) # Log failed operation log_entry = MCPLogEntry( timestamp=timestamp, operation="mcp_tool_call", tool_name=request.tool_name, success=False, result=None, error=error_msg ) log_operation(log_entry) raise HTTPException(status_code=500, detail=error_msg) @app.post("/gemini", response_model=GeminiResponse) async def call_gemini(request: GeminiRequest): """Call Google Gemini CLI and return response""" timestamp = datetime.now().isoformat() try: logger.info(f"Calling Gemini with prompt: {request.prompt[:100]}...") # Call Gemini CLI response = call_gemini_cli(request.prompt, request.context) # Log Gemini interaction log_entry = MCPLogEntry( timestamp=timestamp, operation="gemini_call", prompt=request.prompt, success=True, result={"response": response} ) log_operation(log_entry) return GeminiResponse( success=True, response=response, timestamp=timestamp, prompt=request.prompt ) except Exception as e: error_msg = f"Error calling Gemini: {str(e)}" logger.error(error_msg) # Log failed Gemini call log_entry = MCPLogEntry( timestamp=timestamp, operation="gemini_call", prompt=request.prompt, success=False, result=None, error=error_msg ) log_operation(log_entry) raise HTTPException(status_code=500, detail=error_msg) @app.get("/logs") async def get_logs(): """Get MCP operation logs""" try: if LOG_FILE.exists(): with open(LOG_FILE, 'r') as f: logs = json.load(f) return {"logs": logs, "total_entries": len(logs)} else: return {"logs": [], "total_entries": 0, "message": "No logs found"} except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading logs: {str(e)}") @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "data_file_exists": DATA_FILE.exists(), "log_file_exists": LOG_FILE.exists() } if __name__ == "__main__": logger.info("Starting Fast MCP Server...") logger.info(f"Server will run on port {SERVER_PORT}") logger.info(f"Data file: {DATA_FILE}") logger.info(f"Log file: {LOG_FILE}") # Ensure data file exists if not DATA_FILE.exists(): logger.warning(f"Data file {DATA_FILE} not found. Creating with empty data.") save_data({"tasks": [], "users": [], "projects": []}) # Start the server uvicorn.run( "server:app", host="0.0.0.0", port=SERVER_PORT, reload=True, log_level="info" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MalikAqsa-cloud/MCP-Server-using-FAST-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server