TwilioManager MCP

by errajibadr
Verified
""" Asynchronous Twilio API Manager implementation. This module provides an async wrapper around the Twilio API for efficient operations. """ import asyncio import logging import os from typing import Any, Dict, List, Optional from twilio.rest import Client from api.async_twilio_wrapper import AsyncTwilioHttpClient _logger = logging.getLogger(__name__) class AsyncTwilioManager: """Asynchronous wrapper for Twilio API operations.""" def __init__( self, account_sid: str, auth_token: str, timeout: Optional[float] = None, logger: logging.Logger = _logger, ): """Initialize the Twilio manager with credentials.""" self.account_sid = account_sid self.auth_token = auth_token self.timeout = timeout self.logger = logger self._http_client = AsyncTwilioHttpClient() self._client = None self._lock = asyncio.Lock() @property def client(self) -> Client: """Get the Twilio client, raising an error if it's not initialized.""" if not self._client: self._client = Client(self.account_sid, self.auth_token, http_client=self._http_client) return self._client async def __aenter__(self): """Async context manager entry.""" await self._lock.acquire() await self._http_client.init_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self._http_client.close_session() self._lock.release() async def list_subaccounts( self, friendly_name: Optional[str] = None, with_token: bool = False ) -> List[Dict]: """ List all subaccounts or filter by friendly name. Args: friendly_name: Optional filter by friendly name Returns: List of subaccount details """ try: params = {} if friendly_name: params["friendly_name"] = friendly_name accounts = await self.client.api.v2010.accounts.list_async(**params) return [ { "sid": account.sid, "friendly_name": account.friendly_name, "auth_token": account.auth_token if with_token else None, } for account in accounts ] except Exception as e: self.logger.error(f"Failed to list subaccounts: {str(e)}") raise async def get_account_numbers(self, account_sid: Optional[str] = None) -> List[Dict]: """ Get all phone numbers associated with a subaccount. Args: account_sid: The subaccount SID Returns: List of phone numbers and their details """ try: numbers = [] if account_sid: local_numbers = await self.client.api.v2010.accounts( account_sid ).incoming_phone_numbers.local.list_async() mobile_numbers = await self.client.api.v2010.accounts( account_sid ).incoming_phone_numbers.mobile.list_async() else: local_numbers = await self.client.incoming_phone_numbers.local.list_async() mobile_numbers = await self.client.incoming_phone_numbers.mobile.list_async() numbers.extend( [{**number.__dict__, "number_type": "national"} for number in local_numbers] ) numbers.extend( [{**number.__dict__, "number_type": "mobile"} for number in mobile_numbers] ) return numbers except Exception as e: self.logger.error(f"Failed to fetch phone numbers: {str(e)}") raise async def get_addresses(self, account_sid: Optional[str] = None) -> List[Dict[str, Any]]: """ Get all addresses associated with a subaccount. Args: account_sid: The subaccount SID Returns: List of addresses and their details """ try: addresses = await self.client.api.v2010.accounts(account_sid).addresses.list_async() # type: ignore return [address.__dict__ for address in addresses] except Exception as e: self.logger.error(f"Failed to fetch addresses: {str(e)}") raise async def create_address( self, account_sid: str, customer_name: str, friendly_name: str, street: str, city: str, region: str, postal_code: str, iso_country: str, ) -> dict[str, str]: """ Create an address for a subaccount. """ address = await self.client.api.v2010.accounts(account_sid).addresses.create_async( customer_name=customer_name, friendly_name=friendly_name, street=street, city=city, region=region, postal_code=postal_code, iso_country=iso_country, ) return address.__dict__ async def duplicate_regulatory_bundle( self, bundle_sid: str, target_account_sid: str, friendly_name: Optional[str] = None ) -> Dict: """ Duplicate a regulatory bundle to a subaccount. Args: bundle_sid: The SID of the regulatory bundle to duplicate target_account_sid: The target subaccount SID friendly_name: Optional name for the new bundle Returns: Dict containing the new bundle information """ try: new_bundle = await self.client.numbers.v2.bundle_clone( bundle_sid=bundle_sid ).create_async(target_account_sid=target_account_sid, friendly_name=friendly_name) return new_bundle.__dict__ except Exception as e: self.logger.error(f"Failed to duplicate bundle: {str(e)}") raise async def duplicate_own_bundles_to_subaccount( self, target_account_sid: str, ) -> List[Dict]: """ Duplicate all own bundles to a subaccount. """ bundles = await self.client.numbers.v2.regulatory_compliance.bundles.list_async() for bundle in bundles: if bundle.sid is None: self.logger.error(f"Bundle {bundle.friendly_name} has no SID") continue await self.duplicate_regulatory_bundle( bundle_sid=bundle.sid, target_account_sid=target_account_sid, friendly_name=bundle.friendly_name, ) # type: ignore return [bundle.__dict__ for bundle in bundles] async def get_bundle_sid(self, subaccount_sid: Optional[str] = None) -> Optional[str]: """ Get the bundle SID for a subaccount. """ bundles = await self.client.numbers.v2.regulatory_compliance.bundles.list_async() for bundle in bundles: if bundle.account_sid == subaccount_sid: return bundle.sid return None async def get_number_type_from_sid( self, sid: str, account_sid: Optional[str] = None ) -> Optional[str]: """ Get the number type from a phone number SID. """ numbers = await self.get_account_numbers(account_sid) for number in numbers: if number["sid"] == sid: return number["number_type"] return None async def transfer_phone_number( self, source_account_sid: str, phone_number_sid: str, target_account_sid: str, address_sid: Optional[str] = None, bundle_sid: Optional[str] = None, ) -> Dict: """ Transfer a phone number to a different subaccount. Args: source_account_sid: The source subaccount SID phone_number_sid: The SID of the phone number to transfer target_account_sid: The target subaccount SID address_sid: The address SID bundle_sid: The bundle SID Returns: Dict containing the updated phone number information """ try: # Get or create bundle if not provided bundle_sid = await self._get_or_create_bundle( bundle_sid, phone_number_sid, source_account_sid, target_account_sid ) # Get or create address if not provided address_sid = await self._get_or_create_address(address_sid, target_account_sid) return await self._execute_number_transfer( source_account_sid, phone_number_sid, target_account_sid, address_sid, bundle_sid ) except Exception as e: self.logger.error(f"Failed to transfer phone number: {str(e)}") raise async def _get_or_create_bundle( self, bundle_sid: Optional[str], phone_number_sid: str, source_account_sid: str, target_account_sid: str, ) -> str: """Helper method to get or create a regulatory bundle""" if bundle_sid is not None: return bundle_sid number_type = await self.get_number_type_from_sid(phone_number_sid, source_account_sid) reg_bundle = await self.list_regulatory_bundles( account_sid=target_account_sid, number_type=number_type ) if not reg_bundle: self.logger.info("No bundle found, duplicating own bundles from main account") await self.duplicate_own_bundles_to_subaccount(target_account_sid) reg_bundle = await self.list_regulatory_bundles( account_sid=target_account_sid, number_type=number_type ) if not reg_bundle: raise Exception("No bundle found, creating one") return reg_bundle[0]["sid"] async def _get_or_create_address( self, address_sid: Optional[str], target_account_sid: str ) -> str: """Helper method to get or create an address""" if address_sid is not None: return address_sid addresses: list[dict[str, str]] = await self.get_addresses(target_account_sid) if len(addresses) == 0: self.logger.info("No address found, creating one") address = await self.create_address( account_sid=target_account_sid, customer_name=os.environ.get("ADDRESS_CUSTOMER_NAME", ""), friendly_name=os.environ.get("ADDRESS_FRIENDLY_NAME", ""), street=os.environ.get("ADDRESS_STREET", ""), city=os.environ.get("ADDRESS_CITY", ""), region=os.environ.get("ADDRESS_REGION", ""), postal_code=os.environ.get("ADDRESS_POSTAL_CODE", ""), iso_country=os.environ.get("ADDRESS_ISO_COUNTRY", ""), ) address_sid = address["sid"] address_friendly_name = address["friendly_name"] else: self.logger.info("Address found, using it") address_sid = addresses[0]["sid"] address_friendly_name = addresses[0]["friendly_name"] self.logger.info(f"Using address_sid {address_sid}, friendly_name {address_friendly_name}") return address_sid async def _execute_number_transfer( self, source_account_sid: str, phone_number_sid: str, target_account_sid: str, address_sid: str, bundle_sid: str, ) -> Dict: """Helper method to execute the actual number transfer""" try: updated_number = ( await self.client.api.v2010.accounts(source_account_sid) .incoming_phone_numbers(phone_number_sid) .update_async( account_sid=target_account_sid, address_sid=address_sid, bundle_sid=bundle_sid ) ) self.logger.info( f"Successfully transferred number {phone_number_sid} from account {source_account_sid} to account {target_account_sid}" ) return updated_number.__dict__ except Exception as transfer_error: return await self._verify_transfer(transfer_error, phone_number_sid, target_account_sid) async def _verify_transfer( self, transfer_error: Exception, phone_number_sid: str, target_account_sid: str ) -> Dict: """Helper method to verify transfer status in case of errors""" try: await asyncio.sleep(2) target_numbers = await self.get_account_numbers(target_account_sid) for number in target_numbers: if number.get("sid") == phone_number_sid: self.logger.info( f"Number {phone_number_sid} found in target account despite error. Transfer likely successful." ) return number raise transfer_error except Exception as verify_error: self.logger.error(f"Error during transfer verification: {str(verify_error)}") raise transfer_error async def list_regulatory_bundles( self, account_sid: Optional[str] = None, number_type: Optional[str] = None, iso_country: Optional[str] = "FR", ) -> List[Dict]: """ List regulatory bundles for a specific subaccount or main account. Args: account_sid: Optional subaccount SID. If not provided, lists bundles for the main account number_type: Optional filter for bundle type ('national' or 'mobile') iso_country: Country code for the bundles (default: 'FR') Returns: List of regulatory bundles and their details """ try: # Select the appropriate client client = self.client subaccount_http_client = None if account_sid: auth_token = await self.get_subaccount_auth_token(account_sid) if auth_token is None: raise Exception("Auth token not found") # Create a new HTTP client with proper lifecycle management subaccount_http_client = AsyncTwilioHttpClient() await subaccount_http_client.__aenter__() client = Client(account_sid, auth_token, http_client=subaccount_http_client) try: bundles = [] # If specific number type requested, fetch only that type if number_type: bundles_list = await client.numbers.v2.regulatory_compliance.bundles.list_async( number_type=number_type, iso_country=iso_country ) bundles.extend( [{**bundle.__dict__, "number_type": number_type} for bundle in bundles_list] ) else: # Fetch both types number_types = { "national": "national", # Map local to national in the response "mobile": "mobile", } for api_type, response_type in number_types.items(): type_bundles = ( await client.numbers.v2.regulatory_compliance.bundles.list_async( number_type=api_type, iso_country=iso_country ) ) bundles.extend( [ {**bundle.__dict__, "number_type": response_type} for bundle in type_bundles ] ) return bundles finally: # Clean up the subaccount HTTP client if we created one if subaccount_http_client: await subaccount_http_client.__aexit__(None, None, None) except Exception as e: self.logger.error(f"Failed to list regulatory bundles: {str(e)}") raise async def get_subaccount_auth_token(self, account_sid: str) -> str | None: """ Get the auth token for a specific subaccount. Args: account_sid: The subaccount SID Returns: The auth token for the subaccount """ try: account = await self.client.api.v2010.accounts(account_sid).fetch_async() if account is None: raise Exception("Account not found") return account.auth_token except Exception as e: self.logger.error(f"Failed to get subaccount auth token: {str(e)}") raise