Skip to main content
Glama

MCP Project Orchestrator

server.pyโ€ข17.3 kB
""" Main MCP Server implementation for PrintCast Agent. This server orchestrates voice-to-print workflows, integrating multiple services: - Asterisk SIP for telephony - ElevenLabs for conversational AI - GitHub/RSS for content sourcing - CUPS for printing - Delivery services for shipping """ import asyncio import json import logging from typing import Any, Dict, List, Optional, Sequence from datetime import datetime from fastmcp import FastMCP from pydantic import BaseModel, Field import structlog from ..integrations.asterisk import AsteriskManager from ..integrations.elevenlabs import ElevenLabsAgent from ..integrations.content import ContentFetcher from ..integrations.printing import PrintManager from ..integrations.delivery import DeliveryService from ..orchestration.workflow import WorkflowOrchestrator from ..utils.monitoring import MetricsCollector # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) class CallSession(BaseModel): """Represents an active call session.""" session_id: str caller_id: str start_time: datetime selected_items: List[str] = Field(default_factory=list) delivery_address: Optional[str] = None status: str = "active" metadata: Dict[str, Any] = Field(default_factory=dict) class PrintCastMCPServer: """ Main MCP server for PrintCast Agent system. Provides tools and resources for: - Handling incoming calls through Asterisk - Managing AI voice conversations - Fetching and presenting content - Processing print jobs - Arranging delivery """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the PrintCast MCP Server. Args: config: Configuration dictionary for all services """ self.config = config or {} self.app = FastMCP("PrintCast Agent") self.sessions: Dict[str, CallSession] = {} # Initialize service managers self.asterisk = AsteriskManager(self.config.get("asterisk", {})) self.elevenlabs = ElevenLabsAgent(self.config.get("elevenlabs", {})) self.content = ContentFetcher(self.config.get("content", {})) self.printer = PrintManager(self.config.get("printing", {})) self.delivery = DeliveryService(self.config.get("delivery", {})) self.orchestrator = WorkflowOrchestrator( asterisk=self.asterisk, elevenlabs=self.elevenlabs, content=self.content, printer=self.printer, delivery=self.delivery ) self.metrics = MetricsCollector() # Register MCP tools self._register_tools() # Register MCP resources self._register_resources() logger.info("PrintCast MCP Server initialized", config=self.config) def _register_tools(self): """Register all MCP tools.""" @self.app.tool() async def handle_incoming_call( caller_id: str, dtmf_code: Optional[str] = None, language: str = "cs" ) -> Dict[str, Any]: """ Handle an incoming call and initiate the voice workflow. Args: caller_id: The phone number of the caller dtmf_code: Optional DTMF code entered by caller language: Language preference (cs=Czech, en=English) Returns: Session information and next steps """ try: # Create new session session = CallSession( session_id=f"call_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{caller_id}", caller_id=caller_id, start_time=datetime.now(), metadata={"language": language, "dtmf_code": dtmf_code} ) self.sessions[session.session_id] = session # Start voice agent agent_response = await self.elevenlabs.start_conversation( session_id=session.session_id, language=language ) # Get initial content options content_options = await self.content.get_available_content() logger.info( "Call session started", session_id=session.session_id, caller_id=caller_id ) return { "session_id": session.session_id, "status": "connected", "agent_ready": agent_response.get("ready", False), "content_options": content_options, "message": f"Welcome! Session {session.session_id} started." } except Exception as e: logger.error("Failed to handle incoming call", error=str(e)) raise @self.app.tool() async def fetch_trending_content( content_type: str = "github", limit: int = 5, language: Optional[str] = None ) -> List[Dict[str, Any]]: """ Fetch trending content from various sources. Args: content_type: Type of content (github, rss, news) limit: Maximum number of items to fetch language: Optional language filter Returns: List of trending content items """ try: if content_type == "github": items = await self.content.fetch_github_trending( limit=limit, language=language ) elif content_type == "rss": items = await self.content.fetch_rss_feeds(limit=limit) elif content_type == "news": items = await self.content.fetch_news(limit=limit) else: raise ValueError(f"Unknown content type: {content_type}") logger.info( "Fetched trending content", type=content_type, count=len(items) ) return items except Exception as e: logger.error("Failed to fetch content", error=str(e)) raise @self.app.tool() async def process_user_selection( session_id: str, selected_items: List[str], delivery_address: str, delivery_method: str = "post" ) -> Dict[str, Any]: """ Process user's content selection and initiate print/delivery. Args: session_id: Active session ID selected_items: List of selected item IDs delivery_address: Delivery address delivery_method: Delivery method (post, courier) Returns: Order confirmation and tracking information """ try: session = self.sessions.get(session_id) if not session: raise ValueError(f"Session {session_id} not found") # Update session session.selected_items = selected_items session.delivery_address = delivery_address # Orchestrate the workflow result = await self.orchestrator.process_order( session_id=session_id, selected_items=selected_items, delivery_address=delivery_address, delivery_method=delivery_method ) # Update metrics await self.metrics.record_order(session_id, len(selected_items)) logger.info( "Order processed", session_id=session_id, items_count=len(selected_items), tracking_id=result.get("tracking_id") ) return result except Exception as e: logger.error("Failed to process selection", error=str(e)) raise @self.app.tool() async def generate_print_preview( session_id: str, selected_items: List[str], format: str = "pdf" ) -> Dict[str, Any]: """ Generate a print preview for selected items. Args: session_id: Active session ID selected_items: List of selected item IDs format: Output format (pdf, html) Returns: Preview file path and metadata """ try: # Generate preview document preview = await self.printer.generate_preview( items=selected_items, format=format ) logger.info( "Print preview generated", session_id=session_id, format=format ) return { "preview_url": preview["url"], "page_count": preview["pages"], "file_size": preview["size"], "format": format } except Exception as e: logger.error("Failed to generate preview", error=str(e)) raise @self.app.tool() async def get_delivery_quote( delivery_address: str, delivery_method: str = "post", weight_grams: int = 100 ) -> Dict[str, Any]: """ Get delivery cost estimate. Args: delivery_address: Delivery address delivery_method: Delivery method weight_grams: Estimated weight in grams Returns: Delivery quote with pricing and timing """ try: quote = await self.delivery.get_quote( address=delivery_address, method=delivery_method, weight=weight_grams ) return { "price": quote["price"], "currency": quote["currency"], "estimated_delivery": quote["estimated_delivery"], "carrier": quote["carrier"] } except Exception as e: logger.error("Failed to get delivery quote", error=str(e)) raise @self.app.tool() async def end_call_session( session_id: str, reason: str = "completed" ) -> Dict[str, Any]: """ End an active call session. Args: session_id: Session to end reason: Reason for ending (completed, cancelled, error) Returns: Session summary """ try: session = self.sessions.get(session_id) if not session: raise ValueError(f"Session {session_id} not found") # Update session status session.status = reason # Stop voice agent await self.elevenlabs.end_conversation(session_id) # Generate session summary duration = (datetime.now() - session.start_time).total_seconds() summary = { "session_id": session_id, "duration_seconds": duration, "items_selected": len(session.selected_items), "status": reason, "caller_id": session.caller_id } # Clean up session after delay asyncio.create_task(self._cleanup_session(session_id)) logger.info("Call session ended", **summary) return summary except Exception as e: logger.error("Failed to end session", error=str(e)) raise def _register_resources(self): """Register MCP resources for monitoring and configuration.""" @self.app.resource("resource://sessions/active") async def get_active_sessions() -> str: """Get list of active call sessions.""" active = [ { "session_id": s.session_id, "caller_id": s.caller_id, "start_time": s.start_time.isoformat(), "status": s.status, "items_selected": len(s.selected_items) } for s in self.sessions.values() if s.status == "active" ] return json.dumps(active, indent=2) @self.app.resource("resource://config/services") async def get_service_config() -> str: """Get current service configuration.""" config = { "asterisk": { "enabled": self.asterisk.is_connected(), "host": self.config.get("asterisk", {}).get("host", "localhost") }, "elevenlabs": { "enabled": self.elevenlabs.is_configured(), "model": self.config.get("elevenlabs", {}).get("model", "eleven_multilingual_v2") }, "printing": { "enabled": self.printer.is_available(), "printer": self.config.get("printing", {}).get("default_printer", "default") }, "delivery": { "enabled": self.delivery.is_configured(), "carriers": self.config.get("delivery", {}).get("carriers", []) } } return json.dumps(config, indent=2) @self.app.resource("resource://metrics/daily") async def get_daily_metrics() -> str: """Get daily usage metrics.""" metrics = await self.metrics.get_daily_stats() return json.dumps(metrics, indent=2) async def _cleanup_session(self, session_id: str, delay: int = 300): """ Clean up session data after delay. Args: session_id: Session to clean up delay: Delay in seconds before cleanup """ await asyncio.sleep(delay) if session_id in self.sessions: del self.sessions[session_id] logger.info("Session cleaned up", session_id=session_id) async def start(self): """Start the MCP server and all services.""" try: # Initialize all services await self.asterisk.connect() await self.elevenlabs.initialize() await self.printer.initialize() await self.delivery.initialize() # Start metrics collection asyncio.create_task(self.metrics.start_collection()) # Start the MCP server logger.info("Starting PrintCast MCP Server") await self.app.run() except Exception as e: logger.error("Failed to start server", error=str(e)) raise async def stop(self): """Stop the MCP server and cleanup.""" try: # End all active sessions for session_id in list(self.sessions.keys()): await self.end_call_session(session_id, reason="shutdown") # Disconnect services await self.asterisk.disconnect() await self.elevenlabs.shutdown() await self.printer.shutdown() await self.delivery.shutdown() logger.info("PrintCast MCP Server stopped") except Exception as e: logger.error("Error during shutdown", error=str(e)) raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sparesparrow/mcp-project-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server