Skip to main content
Glama

MCP Project Orchestrator

asterisk.py16 kB
""" Asterisk SIP integration for PrintCast Agent. Handles telephony operations including: - Call routing and management - IVR interactions - DTMF processing - Call recording """ import asyncio import logging from typing import Any, Callable, Dict, List, Optional from datetime import datetime import structlog from panoramisk import Manager from pydantic import BaseModel, Field logger = structlog.get_logger(__name__) class CallInfo(BaseModel): """Information about an active call.""" channel: str caller_id: str called_number: str start_time: datetime state: str unique_id: str metadata: Dict[str, Any] = Field(default_factory=dict) class AsteriskManager: """ Manages Asterisk SIP server integration. Provides high-level interface for: - AMI (Asterisk Manager Interface) operations - Call control and routing - IVR menu handling - DTMF input processing """ def __init__(self, config: Dict[str, Any]): """ Initialize Asterisk manager. Args: config: Asterisk configuration including: - host: Asterisk server hostname - port: AMI port (default 5038) - username: AMI username - password: AMI password - context: Default dialplan context """ self.config = config self.ami: Optional[Manager] = None self.connected = False self.active_calls: Dict[str, CallInfo] = {} self.event_handlers: Dict[str, List[Callable]] = {} # Configuration self.host = config.get("host", "localhost") self.port = config.get("port", 5038) self.username = config.get("username", "admin") self.password = config.get("password", "") self.context = config.get("context", "printcast-ivr") logger.info( "Asterisk manager initialized", host=self.host, port=self.port, context=self.context ) async def connect(self) -> bool: """ Connect to Asterisk AMI. Returns: True if connection successful """ try: self.ami = Manager( host=self.host, port=self.port, username=self.username, secret=self.password ) # Connect to AMI await self.ami.connect() # Register event handlers self.ami.register_event("*", self._handle_ami_event) self.connected = True logger.info("Connected to Asterisk AMI", host=self.host) return True except Exception as e: logger.error("Failed to connect to Asterisk", error=str(e)) self.connected = False return False async def disconnect(self): """Disconnect from Asterisk AMI.""" if self.ami and self.connected: try: await self.ami.logoff() await self.ami.close() self.connected = False logger.info("Disconnected from Asterisk") except Exception as e: logger.error("Error disconnecting from Asterisk", error=str(e)) def is_connected(self) -> bool: """Check if connected to Asterisk.""" return self.connected async def _handle_ami_event(self, event: Dict[str, Any]): """ Handle AMI events. Args: event: AMI event data """ event_type = event.get("Event", "") try: # Handle specific events if event_type == "Newchannel": await self._handle_new_channel(event) elif event_type == "Hangup": await self._handle_hangup(event) elif event_type == "DTMF": await self._handle_dtmf(event) elif event_type == "NewCallerid": await self._handle_caller_id(event) # Call registered handlers if event_type in self.event_handlers: for handler in self.event_handlers[event_type]: asyncio.create_task(handler(event)) except Exception as e: logger.error( "Error handling AMI event", event_type=event_type, error=str(e) ) async def _handle_new_channel(self, event: Dict[str, Any]): """Handle new channel creation.""" channel = event.get("Channel", "") caller_id = event.get("CallerIDNum", "") unique_id = event.get("Uniqueid", "") call_info = CallInfo( channel=channel, caller_id=caller_id, called_number=event.get("Exten", ""), start_time=datetime.now(), state="ringing", unique_id=unique_id ) self.active_calls[unique_id] = call_info logger.info( "New call detected", channel=channel, caller_id=caller_id, unique_id=unique_id ) async def _handle_hangup(self, event: Dict[str, Any]): """Handle call hangup.""" unique_id = event.get("Uniqueid", "") if unique_id in self.active_calls: call_info = self.active_calls[unique_id] duration = (datetime.now() - call_info.start_time).total_seconds() logger.info( "Call ended", unique_id=unique_id, duration=duration, caller_id=call_info.caller_id ) del self.active_calls[unique_id] async def _handle_dtmf(self, event: Dict[str, Any]): """Handle DTMF digit press.""" digit = event.get("Digit", "") unique_id = event.get("Uniqueid", "") if unique_id in self.active_calls: call_info = self.active_calls[unique_id] # Store DTMF in metadata if "dtmf_buffer" not in call_info.metadata: call_info.metadata["dtmf_buffer"] = "" call_info.metadata["dtmf_buffer"] += digit logger.debug( "DTMF received", digit=digit, unique_id=unique_id, buffer=call_info.metadata["dtmf_buffer"] ) async def _handle_caller_id(self, event: Dict[str, Any]): """Handle caller ID update.""" unique_id = event.get("Uniqueid", "") caller_id = event.get("CallerIDNum", "") if unique_id in self.active_calls: self.active_calls[unique_id].caller_id = caller_id async def originate_call( self, destination: str, caller_id: str = "PrintCast", timeout: int = 30, variables: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """ Originate an outbound call. Args: destination: Destination number caller_id: Caller ID to present timeout: Call timeout in seconds variables: Channel variables to set Returns: Call result information """ if not self.connected: raise RuntimeError("Not connected to Asterisk") try: response = await self.ami.send_action({ "Action": "Originate", "Channel": f"SIP/{destination}", "Context": self.context, "Exten": "s", "Priority": "1", "CallerID": caller_id, "Timeout": str(timeout * 1000), "Variable": variables or {} }) logger.info( "Call originated", destination=destination, caller_id=caller_id ) return { "success": response.get("Response") == "Success", "message": response.get("Message", ""), "action_id": response.get("ActionID", "") } except Exception as e: logger.error("Failed to originate call", error=str(e)) raise async def transfer_call( self, channel: str, destination: str, context: Optional[str] = None ) -> bool: """ Transfer an active call. Args: channel: Channel to transfer destination: Transfer destination context: Optional context (uses default if not specified) Returns: True if transfer successful """ if not self.connected: raise RuntimeError("Not connected to Asterisk") try: response = await self.ami.send_action({ "Action": "Redirect", "Channel": channel, "Context": context or self.context, "Exten": destination, "Priority": "1" }) success = response.get("Response") == "Success" logger.info( "Call transferred", channel=channel, destination=destination, success=success ) return success except Exception as e: logger.error("Failed to transfer call", error=str(e)) return False async def hangup_call(self, channel: str, cause: int = 16) -> bool: """ Hangup an active call. Args: channel: Channel to hangup cause: Hangup cause code (16 = normal clearing) Returns: True if hangup successful """ if not self.connected: raise RuntimeError("Not connected to Asterisk") try: response = await self.ami.send_action({ "Action": "Hangup", "Channel": channel, "Cause": str(cause) }) success = response.get("Response") == "Success" logger.info( "Call hangup requested", channel=channel, success=success ) return success except Exception as e: logger.error("Failed to hangup call", error=str(e)) return False async def play_audio( self, channel: str, audio_file: str, interrupt_dtmf: bool = True ) -> bool: """ Play audio file to channel. Args: channel: Channel to play audio to audio_file: Path to audio file interrupt_dtmf: Allow DTMF to interrupt playback Returns: True if playback started """ if not self.connected: raise RuntimeError("Not connected to Asterisk") try: response = await self.ami.send_action({ "Action": "Playback", "Channel": channel, "Filename": audio_file, "Interrupt": "yes" if interrupt_dtmf else "no" }) success = response.get("Response") == "Success" logger.info( "Audio playback started", channel=channel, file=audio_file, success=success ) return success except Exception as e: logger.error("Failed to play audio", error=str(e)) return False async def get_channel_variable( self, channel: str, variable: str ) -> Optional[str]: """ Get channel variable value. Args: channel: Channel name variable: Variable name Returns: Variable value or None """ if not self.connected: return None try: response = await self.ami.send_action({ "Action": "GetVar", "Channel": channel, "Variable": variable }) if response.get("Response") == "Success": return response.get("Value") return None except Exception as e: logger.error("Failed to get channel variable", error=str(e)) return None async def set_channel_variable( self, channel: str, variable: str, value: str ) -> bool: """ Set channel variable. Args: channel: Channel name variable: Variable name value: Variable value Returns: True if variable set successfully """ if not self.connected: return False try: response = await self.ami.send_action({ "Action": "SetVar", "Channel": channel, "Variable": variable, "Value": value }) return response.get("Response") == "Success" except Exception as e: logger.error("Failed to set channel variable", error=str(e)) return False def register_event_handler( self, event_type: str, handler: Callable[[Dict[str, Any]], None] ): """ Register custom event handler. Args: event_type: AMI event type handler: Async handler function """ if event_type not in self.event_handlers: self.event_handlers[event_type] = [] self.event_handlers[event_type].append(handler) logger.debug( "Event handler registered", event_type=event_type, handler=handler.__name__ ) async def execute_agi_command( self, channel: str, command: str, args: Optional[List[str]] = None ) -> Dict[str, Any]: """ Execute AGI command on channel. Args: channel: Channel name command: AGI command args: Command arguments Returns: Command result """ if not self.connected: raise RuntimeError("Not connected to Asterisk") command_line = command if args: command_line += " " + " ".join(args) try: response = await self.ami.send_action({ "Action": "AGI", "Channel": channel, "Command": command_line }) return { "success": response.get("Response") == "Success", "result": response.get("Result", ""), "data": response.get("ResultData", "") } except Exception as e: logger.error("Failed to execute AGI command", error=str(e)) raise def get_active_calls(self) -> List[CallInfo]: """Get list of active calls.""" return list(self.active_calls.values()) def get_call_by_caller_id(self, caller_id: str) -> Optional[CallInfo]: """Get call info by caller ID.""" for call in self.active_calls.values(): if call.caller_id == caller_id: return call return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sparesparrow/mcp-project-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server