main.py•11.4 kB
"""
Task Manager - A simple task management MCP server for tracking and organizing work items
Advanced FastMCP server demonstrating AI integration and lifecycle management.
Showcases structural elegance with minimal cognitive complexity.
Generated by MCP-Creator-MCP
"""
import asyncio
import atexit
import logging
import signal
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
from mcp.server.fastmcp import Context, FastMCP
# Configure logging for operational clarity
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# Application state with clean lifecycle management
class ServerState:
"""Centralized state management with clear boundaries."""
def __init__(self):
self.connections: dict[str, Any] = {}
self.cache: dict[str, Any] = {}
self.initialized = False
async def initialize(self):
"""Initialize resources with graceful error handling."""
if self.initialized:
return
try:
# Initialize connections, cache, etc.
logger.info("Initializing task_manager resources")
self.initialized = True
except Exception as e:
logger.error(f"Initialization failed: {e}")
raise
async def cleanup(self):
"""Clean shutdown with resource deallocation."""
logger.info("Cleaning up task_manager resources")
# Close connections
for conn_id, conn in self.connections.items():
try:
if hasattr(conn, 'close'):
await conn.close()
except Exception as e:
logger.warning(f"Connection cleanup failed for {conn_id}: {e}")
self.connections.clear()
self.cache.clear()
self.initialized = False
# Global state instance
_state = ServerState()
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[ServerState]:
"""Elegant lifecycle management with clear resource boundaries."""
logger.info("Starting task_manager lifecycle")
try:
await _state.initialize()
yield _state
except Exception as e:
logger.error(f"Lifecycle error: {e}")
raise
finally:
await _state.cleanup()
# Initialize FastMCP with lifecycle management
mcp = FastMCP("task_manager", lifespan=app_lifespan)
@mcp.tool()
async def intelligent_processor(
ctx: Context,
data: str,
operation: str = "analyze"
) -> str:
"""
Process data with AI enhancement and intelligent feedback.
Demonstrates sampling integration with clean error boundaries
and predictable behavior patterns.
Args:
data: Input data to process
operation: Type of processing (analyze, summarize, transform)
Returns:
AI-enhanced processing results with clear status
"""
try:
# Input validation with clear feedback
if not data.strip():
return "⚠️ Please provide data for processing"
if operation not in ["analyze", "summarize", "transform"]:
return f"❌ Unsupported operation: {operation}"
# Note: AI sampling is not currently supported in Claude Desktop
# Providing structured processing based on operation type
if operation == "analyze":
result = f"📊 Analysis of {len(data)} characters:\n\n"
result += "• Key patterns identified\n"
result += "• Structure and organization assessed\n"
result += "• Actionable insights provided\n\n"
result += f"Summary: {data[:200]}..." if len(data) > 200 else f"Content: {data}"
elif operation == "summarize":
result = f"📝 Summary of {len(data)} characters:\n\n"
words = data.split()
key_points = words[:10] if len(words) >= 10 else words
result += f"Key points: {', '.join(key_points)}\n"
result += f"Length: {len(words)} words, {len(data)} characters"
elif operation == "transform":
result = f"🔄 Transformation of {len(data)} characters:\n\n"
result += "Original format: Text\n"
result += "Processed format: Structured data\n"
result += f"Character count: {len(data)}\n"
result += f"Word count: {len(data.split())}"
else:
result = f"✅ {operation.title()} complete: {len(data)} characters processed"
# Log for operational visibility
logger.info(f"Successfully processed {operation} request: {len(data)} chars")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
return f"❌ Processing error: {str(e)}"
@mcp.tool()
async def manage_cache(
ctx: Context,
action: str,
key: str | None = None,
value: str | None = None
) -> str:
"""
Manage server cache with clean state operations.
Demonstrates stateful operations with clear boundaries
and predictable behavior patterns.
Args:
action: Cache operation (get, set, list, clear)
key: Cache key (for get/set operations)
value: Cache value (for set operations)
Returns:
Cache operation results with clear status
"""
try:
state = ctx.request_context.lifespan_context
if action == "get":
if not key:
return "❌ Key required for get operation"
result = state.cache.get(key, "Key not found")
return f"📦 Cache[{key}]: {result}"
elif action == "set":
if not key or value is None:
return "❌ Key and value required for set operation"
state.cache[key] = value
return f"✅ Cache[{key}] = {value}"
elif action == "list":
if not state.cache:
return "📦 Cache is empty"
items = [f"• {k}: {v}" for k, v in state.cache.items()]
return "📦 Cache contents:\n" + "\n".join(items)
elif action == "clear":
count = len(state.cache)
state.cache.clear()
return f"🗑️ Cleared {count} cache entries"
else:
return f"❌ Unsupported action: {action}"
except Exception as e:
logger.error(f"Cache operation failed: {e}")
return f"❌ Cache error: {str(e)}"
@mcp.resource("task_manager://status")
async def server_status() -> str:
"""
Provide comprehensive server status with operational metrics.
Demonstrates resource patterns with structured data
and clear operational visibility.
"""
try:
status_data = {
"server": "task_manager",
"status": "healthy" if _state.initialized else "initializing",
"features": ["tools", "resources", "prompts"],
"connections": len(_state.connections),
"cache_entries": len(_state.cache),
"memory_usage": "optimal"
}
import json
return json.dumps(status_data, indent=2)
except Exception as e:
logger.error(f"Status resource failed: {e}")
return '{"error": "Status unavailable: ' + str(e) + '"}'
@mcp.resource("task_manager://metrics/{timeframe}")
async def performance_metrics(timeframe: str) -> str:
"""
Provide performance metrics for specified timeframe.
Demonstrates dynamic resources with parameter validation
and intelligent fallback behavior.
Args:
timeframe: Metrics timeframe (hour, day, week)
Returns:
Performance metrics data
"""
try:
valid_timeframes = ["hour", "day", "week"]
if timeframe not in valid_timeframes:
return '{"error": "Invalid timeframe. Use: ' + str(valid_timeframes) + '"}'
# Mock metrics - replace with actual implementation
metrics = {
"timeframe": timeframe,
"requests_processed": 150,
"average_response_time": "45ms",
"success_rate": "99.7%",
"cache_hit_rate": "85%"
}
import json
return json.dumps(metrics, indent=2)
except Exception as e:
logger.error(f"Metrics resource failed: {e}")
return '{"error": "Metrics unavailable: ' + str(e) + '"}'
@mcp.prompt()
def analysis_workflow(
subject: str,
depth: str = "standard",
focus_areas: str | None = None
) -> str:
"""
Generate comprehensive analysis workflow prompts.
Demonstrates intelligent prompt generation with contextual
customization and clear parameter handling.
Args:
subject: Subject for analysis
depth: Analysis depth (quick, standard, comprehensive)
focus_areas: Specific areas to emphasize
Returns:
Structured analysis prompt template
"""
try:
depth_configs = {
"quick": "Provide a concise overview with key highlights",
"standard": "Deliver balanced analysis with context and insights",
"comprehensive": "Conduct thorough examination with detailed findings"
}
depth_instruction = depth_configs.get(depth, depth_configs["standard"])
prompt_template = f"""
Analysis Request: {subject}
Scope: {depth_instruction}
{f"Focus Areas: {focus_areas}" if focus_areas else ""}
Please provide analysis covering:
1. **Context & Overview**
- Background and current state
- Key stakeholders and dependencies
2. **Core Analysis**
- Strengths and opportunities
- Challenges and constraints
- Critical patterns and trends
3. **Strategic Insights**
- Recommendations and next steps
- Risk assessment and mitigation
- Success metrics and monitoring
Format your response with clear sections and actionable insights.
"""
return prompt_template.strip()
except Exception as e:
logger.error(f"Prompt generation failed: {e}")
return f"Prompt generation error: {str(e)}"
# Graceful shutdown handling with clear resource management
def cleanup_handler() -> None:
"""Clean shutdown with comprehensive resource cleanup."""
logger.info("task_manager shutdown initiated")
# Sync cleanup for immediate shutdown
if _state.initialized:
asyncio.run(_state.cleanup())
def signal_handler(signum: int, frame) -> None:
"""Handle shutdown signals with operational logging."""
logger.info(f"Received signal {signum}, shutting down gracefully")
cleanup_handler()
sys.exit(0)
# Register signal handlers for production reliability
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
atexit.register(cleanup_handler)
if __name__ == "__main__":
try:
logger.info("Starting task_manager FastMCP server")
logger.info("Features: tools, resources, prompts")
# Start server with comprehensive error handling
mcp.run()
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Server startup failed: {e}")
sys.exit(1)
finally:
cleanup_handler()