server.pyโข16.6 kB
#!/usr/bin/env python3
"""
Fast MCP Server with Google Gemini Integration
This FastAPI server implements a Model Context Protocol (MCP) server that:
1. Provides MCP tools for task management and data processing
2. Integrates with Google Gemini CLI for AI-powered responses
3. Logs all operations for tracking and debugging
4. Supports HTTP POST requests for tool execution
Author: FAST MCP Team
Version: 1.0.0
"""
import json
import logging
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Fast MCP Server",
description="A Model Context Protocol server with Google Gemini integration",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Global variables
DATA_FILE = Path("sample_data.json")
LOG_FILE = Path("mcp_logs.json")
SERVER_PORT = 5001
# Pydantic models for request/response validation
class MCPToolRequest(BaseModel):
"""Model for MCP tool requests"""
tool_name: str
arguments: Dict[str, Any] = {}
user_id: Optional[str] = None
class MCPToolResponse(BaseModel):
"""Model for MCP tool responses"""
success: bool
result: Any
message: str
timestamp: str
tool_name: str
class GeminiRequest(BaseModel):
"""Model for Gemini API requests"""
prompt: str
context: Optional[str] = None
user_id: Optional[str] = None
class GeminiResponse(BaseModel):
"""Model for Gemini API responses"""
success: bool
response: str
timestamp: str
prompt: str
class MCPLogEntry(BaseModel):
"""Model for logging MCP operations"""
timestamp: str
operation: str
tool_name: Optional[str] = None
prompt: Optional[str] = None
success: bool
result: Any
error: Optional[str] = None
# Utility functions
def load_data() -> Dict[str, Any]:
"""Load sample data from JSON file"""
try:
with open(DATA_FILE, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Data file {DATA_FILE} not found")
return {"tasks": [], "users": [], "projects": []}
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON data: {e}")
return {"tasks": [], "users": [], "projects": []}
def save_data(data: Dict[str, Any]) -> bool:
"""Save data to JSON file"""
try:
with open(DATA_FILE, 'w') as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
logger.error(f"Error saving data: {e}")
return False
def log_operation(log_entry: MCPLogEntry) -> None:
"""Log MCP operations to JSON file"""
try:
logs = []
if LOG_FILE.exists():
with open(LOG_FILE, 'r') as f:
logs = json.load(f)
logs.append(log_entry.dict())
with open(LOG_FILE, 'w') as f:
json.dump(logs, f, indent=2)
except Exception as e:
logger.error(f"Error logging operation: {e}")
def call_gemini_cli(prompt: str, context: Optional[str] = None) -> str:
"""Call Google Gemini CLI using subprocess"""
try:
# Prepare the command for Gemini CLI
# Note: This assumes Gemini CLI is installed and configured
full_prompt = prompt
if context:
full_prompt = f"Context: {context}\n\nPrompt: {prompt}"
cmd = ["gemini", full_prompt]
logger.info(f"Calling Gemini CLI with prompt: {prompt[:100]}...")
# Execute the command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30 # 30 second timeout
)
if result.returncode == 0:
return result.stdout.strip()
else:
error_msg = f"Gemini CLI error: {result.stderr}"
logger.error(error_msg)
return f"Error calling Gemini: {error_msg}"
except subprocess.TimeoutExpired:
error_msg = "Gemini CLI call timed out"
logger.error(error_msg)
return f"Error: {error_msg}"
except FileNotFoundError:
error_msg = "Gemini CLI not found. Please install and configure Gemini CLI."
logger.error(error_msg)
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"Unexpected error calling Gemini CLI: {str(e)}"
logger.error(error_msg)
return f"Error: {error_msg}"
# MCP Tools Implementation
class MCPTools:
"""Collection of MCP tools for task management and data processing"""
@staticmethod
def list_tasks(status_filter: Optional[str] = None) -> Dict[str, Any]:
"""List all tasks, optionally filtered by status"""
data = load_data()
tasks = data.get("tasks", [])
if status_filter:
tasks = [task for task in tasks if task.get("status") == status_filter]
return {
"total_tasks": len(tasks),
"tasks": tasks,
"filter_applied": status_filter
}
@staticmethod
def create_task(title: str, description: str, priority: str = "medium",
assigned_to: Optional[str] = None) -> Dict[str, Any]:
"""Create a new task"""
data = load_data()
tasks = data.get("tasks", [])
# Generate new task ID
max_id = max([task.get("id", 0) for task in tasks], default=0)
new_id = max_id + 1
new_task = {
"id": new_id,
"title": title,
"description": description,
"status": "pending",
"priority": priority,
"assigned_to": assigned_to,
"created_at": datetime.now().isoformat(),
"due_date": None
}
tasks.append(new_task)
data["tasks"] = tasks
if save_data(data):
return {"success": True, "task": new_task, "message": "Task created successfully"}
else:
return {"success": False, "message": "Failed to save task"}
@staticmethod
def update_task_status(task_id: int, new_status: str) -> Dict[str, Any]:
"""Update task status"""
data = load_data()
tasks = data.get("tasks", [])
task_found = False
for task in tasks:
if task.get("id") == task_id:
task["status"] = new_status
task_found = True
break
if not task_found:
return {"success": False, "message": f"Task with ID {task_id} not found"}
if save_data(data):
return {"success": True, "message": f"Task {task_id} status updated to {new_status}"}
else:
return {"success": False, "message": "Failed to save task update"}
@staticmethod
def get_task_statistics() -> Dict[str, Any]:
"""Get task statistics and analytics"""
data = load_data()
tasks = data.get("tasks", [])
if not tasks:
return {"message": "No tasks found"}
# Calculate statistics
total_tasks = len(tasks)
status_counts = {}
priority_counts = {}
for task in tasks:
status = task.get("status", "unknown")
priority = task.get("priority", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
priority_counts[priority] = priority_counts.get(priority, 0) + 1
return {
"total_tasks": total_tasks,
"status_distribution": status_counts,
"priority_distribution": priority_counts,
"completion_rate": round((status_counts.get("completed", 0) / total_tasks) * 100, 2)
}
@staticmethod
def search_tasks(query: str) -> Dict[str, Any]:
"""Search tasks by title or description"""
data = load_data()
tasks = data.get("tasks", [])
query_lower = query.lower()
matching_tasks = []
for task in tasks:
title = task.get("title", "").lower()
description = task.get("description", "").lower()
if query_lower in title or query_lower in description:
matching_tasks.append(task)
return {
"query": query,
"matches_found": len(matching_tasks),
"tasks": matching_tasks
}
@staticmethod
def export_tasks_to_csv() -> Dict[str, Any]:
"""Export tasks to CSV format"""
data = load_data()
tasks = data.get("tasks", [])
if not tasks:
return {"success": False, "message": "No tasks to export"}
try:
df = pd.DataFrame(tasks)
csv_filename = f"tasks_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(csv_filename, index=False)
return {
"success": True,
"message": f"Tasks exported to {csv_filename}",
"filename": csv_filename,
"task_count": len(tasks)
}
except Exception as e:
return {"success": False, "message": f"Export failed: {str(e)}"}
# FastAPI Routes
@app.get("/")
async def root():
"""Root endpoint with server information"""
return {
"message": "Fast MCP Server with Google Gemini Integration",
"version": "1.0.0",
"status": "running",
"port": SERVER_PORT,
"docs": "/docs",
"available_tools": [
"list_tasks",
"create_task",
"update_task_status",
"get_task_statistics",
"search_tasks",
"export_tasks_to_csv"
]
}
@app.get("/tools")
async def list_mcp_tools():
"""List all available MCP tools"""
return {
"tools": [
{
"name": "list_tasks",
"description": "List all tasks with optional status filtering",
"parameters": ["status_filter (optional)"]
},
{
"name": "create_task",
"description": "Create a new task",
"parameters": ["title", "description", "priority (optional)", "assigned_to (optional)"]
},
{
"name": "update_task_status",
"description": "Update the status of an existing task",
"parameters": ["task_id", "new_status"]
},
{
"name": "get_task_statistics",
"description": "Get task statistics and analytics",
"parameters": []
},
{
"name": "search_tasks",
"description": "Search tasks by title or description",
"parameters": ["query"]
},
{
"name": "export_tasks_to_csv",
"description": "Export all tasks to CSV file",
"parameters": []
}
]
}
@app.post("/mcp/tool", response_model=MCPToolResponse)
async def call_mcp_tool(request: MCPToolRequest):
"""Execute an MCP tool"""
timestamp = datetime.now().isoformat()
try:
logger.info(f"Executing MCP tool: {request.tool_name}")
# Route to appropriate tool method
if request.tool_name == "list_tasks":
result = MCPTools.list_tasks(request.arguments.get("status_filter"))
elif request.tool_name == "create_task":
result = MCPTools.create_task(
request.arguments.get("title", ""),
request.arguments.get("description", ""),
request.arguments.get("priority", "medium"),
request.arguments.get("assigned_to")
)
elif request.tool_name == "update_task_status":
result = MCPTools.update_task_status(
request.arguments.get("task_id"),
request.arguments.get("new_status")
)
elif request.tool_name == "get_task_statistics":
result = MCPTools.get_task_statistics()
elif request.tool_name == "search_tasks":
result = MCPTools.search_tasks(request.arguments.get("query", ""))
elif request.tool_name == "export_tasks_to_csv":
result = MCPTools.export_tasks_to_csv()
else:
raise HTTPException(status_code=400, detail=f"Unknown tool: {request.tool_name}")
# Log successful operation
log_entry = MCPLogEntry(
timestamp=timestamp,
operation="mcp_tool_call",
tool_name=request.tool_name,
success=True,
result=result
)
log_operation(log_entry)
return MCPToolResponse(
success=True,
result=result,
message=f"Tool {request.tool_name} executed successfully",
timestamp=timestamp,
tool_name=request.tool_name
)
except Exception as e:
error_msg = f"Error executing tool {request.tool_name}: {str(e)}"
logger.error(error_msg)
# Log failed operation
log_entry = MCPLogEntry(
timestamp=timestamp,
operation="mcp_tool_call",
tool_name=request.tool_name,
success=False,
result=None,
error=error_msg
)
log_operation(log_entry)
raise HTTPException(status_code=500, detail=error_msg)
@app.post("/gemini", response_model=GeminiResponse)
async def call_gemini(request: GeminiRequest):
"""Call Google Gemini CLI and return response"""
timestamp = datetime.now().isoformat()
try:
logger.info(f"Calling Gemini with prompt: {request.prompt[:100]}...")
# Call Gemini CLI
response = call_gemini_cli(request.prompt, request.context)
# Log Gemini interaction
log_entry = MCPLogEntry(
timestamp=timestamp,
operation="gemini_call",
prompt=request.prompt,
success=True,
result={"response": response}
)
log_operation(log_entry)
return GeminiResponse(
success=True,
response=response,
timestamp=timestamp,
prompt=request.prompt
)
except Exception as e:
error_msg = f"Error calling Gemini: {str(e)}"
logger.error(error_msg)
# Log failed Gemini call
log_entry = MCPLogEntry(
timestamp=timestamp,
operation="gemini_call",
prompt=request.prompt,
success=False,
result=None,
error=error_msg
)
log_operation(log_entry)
raise HTTPException(status_code=500, detail=error_msg)
@app.get("/logs")
async def get_logs():
"""Get MCP operation logs"""
try:
if LOG_FILE.exists():
with open(LOG_FILE, 'r') as f:
logs = json.load(f)
return {"logs": logs, "total_entries": len(logs)}
else:
return {"logs": [], "total_entries": 0, "message": "No logs found"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading logs: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"data_file_exists": DATA_FILE.exists(),
"log_file_exists": LOG_FILE.exists()
}
if __name__ == "__main__":
logger.info("Starting Fast MCP Server...")
logger.info(f"Server will run on port {SERVER_PORT}")
logger.info(f"Data file: {DATA_FILE}")
logger.info(f"Log file: {LOG_FILE}")
# Ensure data file exists
if not DATA_FILE.exists():
logger.warning(f"Data file {DATA_FILE} not found. Creating with empty data.")
save_data({"tasks": [], "users": [], "projects": []})
# Start the server
uvicorn.run(
"server:app",
host="0.0.0.0",
port=SERVER_PORT,
reload=True,
log_level="info"
)