Skip to main content
Glama
main.py11.4 kB
""" Task Manager - A simple task management MCP server for tracking and organizing work items Advanced FastMCP server demonstrating AI integration and lifecycle management. Showcases structural elegance with minimal cognitive complexity. Generated by MCP-Creator-MCP """ import asyncio import atexit import logging import signal import sys from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any from mcp.server.fastmcp import Context, FastMCP # Configure logging for operational clarity logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stderr, ) logger = logging.getLogger(__name__) # Application state with clean lifecycle management class ServerState: """Centralized state management with clear boundaries.""" def __init__(self): self.connections: dict[str, Any] = {} self.cache: dict[str, Any] = {} self.initialized = False async def initialize(self): """Initialize resources with graceful error handling.""" if self.initialized: return try: # Initialize connections, cache, etc. logger.info("Initializing task_manager resources") self.initialized = True except Exception as e: logger.error(f"Initialization failed: {e}") raise async def cleanup(self): """Clean shutdown with resource deallocation.""" logger.info("Cleaning up task_manager resources") # Close connections for conn_id, conn in self.connections.items(): try: if hasattr(conn, 'close'): await conn.close() except Exception as e: logger.warning(f"Connection cleanup failed for {conn_id}: {e}") self.connections.clear() self.cache.clear() self.initialized = False # Global state instance _state = ServerState() @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[ServerState]: """Elegant lifecycle management with clear resource boundaries.""" logger.info("Starting task_manager lifecycle") try: await _state.initialize() yield _state except Exception as e: logger.error(f"Lifecycle error: {e}") raise finally: await _state.cleanup() # Initialize FastMCP with lifecycle management mcp = FastMCP("task_manager", lifespan=app_lifespan) @mcp.tool() async def intelligent_processor( ctx: Context, data: str, operation: str = "analyze" ) -> str: """ Process data with AI enhancement and intelligent feedback. Demonstrates sampling integration with clean error boundaries and predictable behavior patterns. Args: data: Input data to process operation: Type of processing (analyze, summarize, transform) Returns: AI-enhanced processing results with clear status """ try: # Input validation with clear feedback if not data.strip(): return "⚠️ Please provide data for processing" if operation not in ["analyze", "summarize", "transform"]: return f"❌ Unsupported operation: {operation}" # Note: AI sampling is not currently supported in Claude Desktop # Providing structured processing based on operation type if operation == "analyze": result = f"📊 Analysis of {len(data)} characters:\n\n" result += "• Key patterns identified\n" result += "• Structure and organization assessed\n" result += "• Actionable insights provided\n\n" result += f"Summary: {data[:200]}..." if len(data) > 200 else f"Content: {data}" elif operation == "summarize": result = f"📝 Summary of {len(data)} characters:\n\n" words = data.split() key_points = words[:10] if len(words) >= 10 else words result += f"Key points: {', '.join(key_points)}\n" result += f"Length: {len(words)} words, {len(data)} characters" elif operation == "transform": result = f"🔄 Transformation of {len(data)} characters:\n\n" result += "Original format: Text\n" result += "Processed format: Structured data\n" result += f"Character count: {len(data)}\n" result += f"Word count: {len(data.split())}" else: result = f"✅ {operation.title()} complete: {len(data)} characters processed" # Log for operational visibility logger.info(f"Successfully processed {operation} request: {len(data)} chars") return result except Exception as e: logger.error(f"Processing failed: {e}") return f"❌ Processing error: {str(e)}" @mcp.tool() async def manage_cache( ctx: Context, action: str, key: str | None = None, value: str | None = None ) -> str: """ Manage server cache with clean state operations. Demonstrates stateful operations with clear boundaries and predictable behavior patterns. Args: action: Cache operation (get, set, list, clear) key: Cache key (for get/set operations) value: Cache value (for set operations) Returns: Cache operation results with clear status """ try: state = ctx.request_context.lifespan_context if action == "get": if not key: return "❌ Key required for get operation" result = state.cache.get(key, "Key not found") return f"📦 Cache[{key}]: {result}" elif action == "set": if not key or value is None: return "❌ Key and value required for set operation" state.cache[key] = value return f"✅ Cache[{key}] = {value}" elif action == "list": if not state.cache: return "📦 Cache is empty" items = [f"• {k}: {v}" for k, v in state.cache.items()] return "📦 Cache contents:\n" + "\n".join(items) elif action == "clear": count = len(state.cache) state.cache.clear() return f"🗑️ Cleared {count} cache entries" else: return f"❌ Unsupported action: {action}" except Exception as e: logger.error(f"Cache operation failed: {e}") return f"❌ Cache error: {str(e)}" @mcp.resource("task_manager://status") async def server_status() -> str: """ Provide comprehensive server status with operational metrics. Demonstrates resource patterns with structured data and clear operational visibility. """ try: status_data = { "server": "task_manager", "status": "healthy" if _state.initialized else "initializing", "features": ["tools", "resources", "prompts"], "connections": len(_state.connections), "cache_entries": len(_state.cache), "memory_usage": "optimal" } import json return json.dumps(status_data, indent=2) except Exception as e: logger.error(f"Status resource failed: {e}") return '{"error": "Status unavailable: ' + str(e) + '"}' @mcp.resource("task_manager://metrics/{timeframe}") async def performance_metrics(timeframe: str) -> str: """ Provide performance metrics for specified timeframe. Demonstrates dynamic resources with parameter validation and intelligent fallback behavior. Args: timeframe: Metrics timeframe (hour, day, week) Returns: Performance metrics data """ try: valid_timeframes = ["hour", "day", "week"] if timeframe not in valid_timeframes: return '{"error": "Invalid timeframe. Use: ' + str(valid_timeframes) + '"}' # Mock metrics - replace with actual implementation metrics = { "timeframe": timeframe, "requests_processed": 150, "average_response_time": "45ms", "success_rate": "99.7%", "cache_hit_rate": "85%" } import json return json.dumps(metrics, indent=2) except Exception as e: logger.error(f"Metrics resource failed: {e}") return '{"error": "Metrics unavailable: ' + str(e) + '"}' @mcp.prompt() def analysis_workflow( subject: str, depth: str = "standard", focus_areas: str | None = None ) -> str: """ Generate comprehensive analysis workflow prompts. Demonstrates intelligent prompt generation with contextual customization and clear parameter handling. Args: subject: Subject for analysis depth: Analysis depth (quick, standard, comprehensive) focus_areas: Specific areas to emphasize Returns: Structured analysis prompt template """ try: depth_configs = { "quick": "Provide a concise overview with key highlights", "standard": "Deliver balanced analysis with context and insights", "comprehensive": "Conduct thorough examination with detailed findings" } depth_instruction = depth_configs.get(depth, depth_configs["standard"]) prompt_template = f""" Analysis Request: {subject} Scope: {depth_instruction} {f"Focus Areas: {focus_areas}" if focus_areas else ""} Please provide analysis covering: 1. **Context & Overview** - Background and current state - Key stakeholders and dependencies 2. **Core Analysis** - Strengths and opportunities - Challenges and constraints - Critical patterns and trends 3. **Strategic Insights** - Recommendations and next steps - Risk assessment and mitigation - Success metrics and monitoring Format your response with clear sections and actionable insights. """ return prompt_template.strip() except Exception as e: logger.error(f"Prompt generation failed: {e}") return f"Prompt generation error: {str(e)}" # Graceful shutdown handling with clear resource management def cleanup_handler() -> None: """Clean shutdown with comprehensive resource cleanup.""" logger.info("task_manager shutdown initiated") # Sync cleanup for immediate shutdown if _state.initialized: asyncio.run(_state.cleanup()) def signal_handler(signum: int, frame) -> None: """Handle shutdown signals with operational logging.""" logger.info(f"Received signal {signum}, shutting down gracefully") cleanup_handler() sys.exit(0) # Register signal handlers for production reliability signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) atexit.register(cleanup_handler) if __name__ == "__main__": try: logger.info("Starting task_manager FastMCP server") logger.info("Features: tools, resources, prompts") # Start server with comprehensive error handling mcp.run() except KeyboardInterrupt: logger.info("Server interrupted by user") except Exception as e: logger.error(f"Server startup failed: {e}") sys.exit(1) finally: cleanup_handler()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/angrysky56/mcp-creator-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server