main.py•10.9 kB
"""
Creative Writing Assistant - An AI-powered creative writing assistant that helps with character development, plot generation, story analysis, and provides writing guidance and inspiration
Advanced FastMCP server demonstrating AI integration and lifecycle management.
Showcases structural elegance with minimal cognitive complexity.
Generated by MCP-Creator-MCP
"""
import asyncio
import atexit
import logging
import signal
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
from mcp.server.fastmcp import Context, FastMCP
# Configure logging for operational clarity
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# Application state with clean lifecycle management
class ServerState:
"""Centralized state management with clear boundaries."""
def __init__(self):
self.connections: dict[str, Any] = {}
self.cache: dict[str, Any] = {}
self.initialized = False
async def initialize(self):
"""Initialize resources with graceful error handling."""
if self.initialized:
return
try:
# Initialize connections, cache, etc.
logger.info("Initializing creative_writing_assistant resources")
self.initialized = True
except Exception as e:
logger.error(f"Initialization failed: {e}")
raise
async def cleanup(self):
"""Clean shutdown with resource deallocation."""
logger.info("Cleaning up creative_writing_assistant resources")
# Close connections
for conn_id, conn in self.connections.items():
try:
if hasattr(conn, 'close'):
await conn.close()
except Exception as e:
logger.warning(f"Connection cleanup failed for {conn_id}: {e}")
self.connections.clear()
self.cache.clear()
self.initialized = False
# Global state instance
_state = ServerState()
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[ServerState]:
"""Elegant lifecycle management with clear resource boundaries."""
logger.info("Starting creative_writing_assistant lifecycle")
try:
await _state.initialize()
yield _state
except Exception as e:
logger.error(f"Lifecycle error: {e}")
raise
finally:
await _state.cleanup()
# Initialize FastMCP with lifecycle management
mcp = FastMCP("creative_writing_assistant", lifespan=app_lifespan)
@mcp.tool()
async def intelligent_processor(
ctx: Context,
data: str,
operation: str = "analyze"
) -> str:
"""
Process data with AI enhancement and intelligent feedback.
Demonstrates sampling integration with clean error boundaries
and predictable behavior patterns.
Args:
data: Input data to process
operation: Type of processing (analyze, summarize, transform)
Returns:
AI-enhanced processing results with clear status
"""
try:
# Input validation with clear feedback
if not data.strip():
return "⚠️ Please provide data for processing"
if operation not in ["analyze", "summarize", "transform"]:
return f"❌ Unsupported operation: {operation}"
# AI-enhanced processing through sampling
enhancement_prompt = f"""
{operation.title()} the following data with focus on:
1. Key insights and patterns
2. Actionable recommendations
3. Clear, structured output
Data: {data}
Provide a {operation} that is concise yet comprehensive.
"""
ai_result = await ctx.sample(enhancement_prompt)
# Combine AI insights with deterministic processing
result = f"🧠 AI-Enhanced {operation.title()}:\n\n{ai_result}"
# Log for operational visibility
logger.info(f"Successfully processed {operation} request: {len(data)} chars")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
return f"❌ Processing error: {str(e)}"
@mcp.tool()
async def manage_cache(
ctx: Context,
action: str,
key: str | None = None,
value: str | None = None
) -> str:
"""
Manage server cache with clean state operations.
Demonstrates stateful operations with clear boundaries
and predictable behavior patterns.
Args:
action: Cache operation (get, set, list, clear)
key: Cache key (for get/set operations)
value: Cache value (for set operations)
Returns:
Cache operation results with clear status
"""
try:
state = ctx.request_context.lifespan_context
if action == "get":
if not key:
return "❌ Key required for get operation"
result = state.cache.get(key, "Key not found")
return f"📦 Cache[{key}]: {result}"
elif action == "set":
if not key or value is None:
return "❌ Key and value required for set operation"
state.cache[key] = value
return f"✅ Cache[{key}] = {value}"
elif action == "list":
if not state.cache:
return "📦 Cache is empty"
items = [f"• {k}: {v}" for k, v in state.cache.items()]
return "📦 Cache contents:\n" + "\n".join(items)
elif action == "clear":
count = len(state.cache)
state.cache.clear()
return f"🗑️ Cleared {count} cache entries"
else:
return f"❌ Unsupported action: {action}"
except Exception as e:
logger.error(f"Cache operation failed: {e}")
return f"❌ Cache error: {str(e)}"
@mcp.resource("creative_writing_assistant://status")
async def server_status() -> str:
"""
Provide comprehensive server status with operational metrics.
Demonstrates resource patterns with structured data
and clear operational visibility.
"""
try:
status_data = {
"server": "creative_writing_assistant",
"status": "healthy" if _state.initialized else "initializing",
"features": ["tools", "resources", "prompts", "sampling"],
"connections": len(_state.connections),
"cache_entries": len(_state.cache),
"memory_usage": "optimal"
}
import json
return json.dumps(status_data, indent=2)
except Exception as e:
logger.error(f"Status resource failed: {e}")
return '{"error": "Status unavailable: ' + str(e) + '"}'
@mcp.resource("creative_writing_assistant://metrics/{timeframe}")
async def performance_metrics(timeframe: str) -> str:
"""
Provide performance metrics for specified timeframe.
Demonstrates dynamic resources with parameter validation
and intelligent fallback behavior.
Args:
timeframe: Metrics timeframe (hour, day, week)
Returns:
Performance metrics data
"""
try:
valid_timeframes = ["hour", "day", "week"]
if timeframe not in valid_timeframes:
return '{"error": "Invalid timeframe. Use: ' + str(valid_timeframes) + '"}'
# Mock metrics - replace with actual implementation
metrics = {
"timeframe": timeframe,
"requests_processed": 150,
"average_response_time": "45ms",
"success_rate": "99.7%",
"cache_hit_rate": "85%"
}
import json
return json.dumps(metrics, indent=2)
except Exception as e:
logger.error(f"Metrics resource failed: {e}")
return '{"error": "Metrics unavailable: ' + str(e) + '"}'
@mcp.prompt()
def analysis_workflow(
subject: str,
depth: str = "standard",
focus_areas: str | None = None
) -> str:
"""
Generate comprehensive analysis workflow prompts.
Demonstrates intelligent prompt generation with contextual
customization and clear parameter handling.
Args:
subject: Subject for analysis
depth: Analysis depth (quick, standard, comprehensive)
focus_areas: Specific areas to emphasize
Returns:
Structured analysis prompt template
"""
try:
depth_configs = {
"quick": "Provide a concise overview with key highlights",
"standard": "Deliver balanced analysis with context and insights",
"comprehensive": "Conduct thorough examination with detailed findings"
}
depth_instruction = depth_configs.get(depth, depth_configs["standard"])
prompt_template = f"""
Analysis Request: {subject}
Scope: {depth_instruction}
{f"Focus Areas: {focus_areas}" if focus_areas else ""}
Please provide analysis covering:
1. **Context & Overview**
- Background and current state
- Key stakeholders and dependencies
2. **Core Analysis**
- Strengths and opportunities
- Challenges and constraints
- Critical patterns and trends
3. **Strategic Insights**
- Recommendations and next steps
- Risk assessment and mitigation
- Success metrics and monitoring
Format your response with clear sections and actionable insights.
"""
return prompt_template.strip()
except Exception as e:
logger.error(f"Prompt generation failed: {e}")
return f"Prompt generation error: {str(e)}"
# Graceful shutdown handling with clear resource management
def cleanup_handler() -> None:
"""Clean shutdown with comprehensive resource cleanup."""
logger.info("creative_writing_assistant shutdown initiated")
# Sync cleanup for immediate shutdown
if _state.initialized:
asyncio.run(_state.cleanup())
def signal_handler(signum: int, frame) -> None:
"""Handle shutdown signals with operational logging."""
logger.info(f"Received signal {signum}, shutting down gracefully")
cleanup_handler()
sys.exit(0)
# Register signal handlers for production reliability
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
atexit.register(cleanup_handler)
if __name__ == "__main__":
try:
logger.info("Starting creative_writing_assistant FastMCP server")
logger.info("Features: tools, resources, prompts, sampling")
# Start server with comprehensive error handling
mcp.run()
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Server startup failed: {e}")
sys.exit(1)
finally:
cleanup_handler()