server.py•9.01 kB
"""MCP server for the Intermedia Unite Voice API."""
from __future__ import annotations
import asyncio
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, Mapping, MutableMapping, Optional
import httpx
from mcp.server.fastmcp import FastMCPServer
DEFAULT_BASE_URL = "https://uniteapi.intermedia.net/calling"
DEFAULT_TOKEN_URL = "https://oauth.uniteapi.com/connect/token"
@dataclass
class IntermediaConfig:
"""Configuration for accessing the Intermedia Unite Voice API."""
client_id: str
client_secret: str
base_url: str = DEFAULT_BASE_URL
token_url: str = DEFAULT_TOKEN_URL
account_id: Optional[str] = None
@classmethod
def from_env(cls) -> "IntermediaConfig":
"""Create a configuration instance using environment variables."""
client_id = os.getenv("INTERMEDIA_CLIENT_ID")
client_secret = os.getenv("INTERMEDIA_CLIENT_SECRET")
if not client_id or not client_secret:
raise RuntimeError(
"Both INTERMEDIA_CLIENT_ID and INTERMEDIA_CLIENT_SECRET environment variables must be set."
)
base_url = os.getenv("INTERMEDIA_BASE_URL", DEFAULT_BASE_URL)
token_url = os.getenv("INTERMEDIA_TOKEN_URL", DEFAULT_TOKEN_URL)
account_id = os.getenv("INTERMEDIA_ACCOUNT_ID")
return cls(
client_id=client_id,
client_secret=client_secret,
base_url=base_url.rstrip("/"),
token_url=token_url,
account_id=account_id,
)
@dataclass
class TokenCache:
"""Lightweight cache for OAuth access tokens."""
access_token: Optional[str] = None
expires_at: float = 0
def is_valid(self) -> bool:
return bool(self.access_token) and time.time() < self.expires_at - 30
def store(self, token: str, expires_in: int) -> None:
self.access_token = token
self.expires_at = time.time() + expires_in
class IntermediaClient:
"""HTTP client for interacting with the Intermedia Unite Voice API."""
def __init__(self, config: IntermediaConfig) -> None:
self._config = config
self._token_cache = TokenCache()
self._client = httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0))
self._token_lock = asyncio.Lock()
@property
def base_url(self) -> str:
return self._config.base_url
@property
def account_id(self) -> Optional[str]:
return self._config.account_id
async def close(self) -> None:
await self._client.aclose()
async def _refresh_token(self) -> str:
async with self._token_lock:
if self._token_cache.is_valid():
return self._token_cache.access_token or ""
response = await self._client.post(
self._config.token_url,
data={
"grant_type": "client_credentials",
"client_id": self._config.client_id,
"client_secret": self._config.client_secret,
"scope": "calling.read calling.write",
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
payload = response.json()
access_token = payload.get("access_token")
if not access_token:
raise RuntimeError("Token endpoint response did not include an access_token")
expires_in = int(payload.get("expires_in", 3600))
self._token_cache.store(access_token, expires_in)
return access_token
async def _get_token(self) -> str:
if self._token_cache.is_valid():
return self._token_cache.access_token or ""
return await self._refresh_token()
async def request(
self,
method: str,
path: str,
*,
query: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
json_body: Optional[Any] = None,
account_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Send an authenticated request to the Unite Voice API."""
token = await self._get_token()
url = self._build_url(path, account_id=account_id)
request_headers: MutableMapping[str, str] = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
if headers:
request_headers.update(headers)
response = await self._client.request(
method=method.upper(),
url=url,
params=self._prepare_query(query),
json=json_body,
headers=request_headers,
)
if response.status_code == 401:
token = await self._refresh_token()
request_headers["Authorization"] = f"Bearer {token}"
response = await self._client.request(
method=method.upper(),
url=url,
params=self._prepare_query(query),
json=json_body,
headers=request_headers,
)
response.raise_for_status()
if not response.content:
return {"status_code": response.status_code}
content_type = response.headers.get("content-type", "")
if "json" in content_type:
return response.json()
return {
"status_code": response.status_code,
"content_type": content_type,
"text": response.text,
}
def _build_url(self, path: str, *, account_id: Optional[str]) -> str:
if path.startswith("http://") or path.startswith("https://"):
return path
normalized = path.lstrip("/")
if "{accountId}" in normalized:
resolved_account_id = account_id or self.account_id
if not resolved_account_id:
raise ValueError("The path requires an account ID but none was provided.")
normalized = normalized.replace("{accountId}", resolved_account_id)
return f"{self.base_url}/{normalized}"
@staticmethod
def _prepare_query(query: Optional[Mapping[str, Any]]) -> Optional[Dict[str, Any]]:
if not query:
return None
return {key: value for key, value in query.items() if value is not None}
def create_server(config: Optional[IntermediaConfig] = None) -> FastMCPServer:
"""Create and configure the MCP server."""
server = FastMCPServer("intermedia-unite-voice")
cfg = config
client_holder: Dict[str, IntermediaClient] = {}
async def get_client() -> IntermediaClient:
nonlocal cfg
if cfg is None:
cfg = IntermediaConfig.from_env()
if "client" not in client_holder:
client_holder["client"] = IntermediaClient(cfg)
return client_holder["client"]
@server.tool()
async def call_intermedia_api(
method: str,
path: str,
query: Optional[Mapping[str, Any]] = None,
json_body: Optional[Any] = None,
headers: Optional[Mapping[str, str]] = None,
account_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Call an arbitrary Intermedia Unite Voice API endpoint.
Parameters
----------
method:
The HTTP method to use (GET, POST, PUT, PATCH, DELETE, etc.).
path:
The API path, e.g. "/v1/accounts/{accountId}/call-history". The path
can include the placeholder "{accountId}" which will be replaced with
the configured account ID when available.
query:
Optional dictionary of query string parameters to include in the
request. Values that are ``None`` will be omitted.
json_body:
Optional JSON-serializable payload to send in the request body.
headers:
Optional additional headers to merge into the request.
account_id:
Override for the Intermedia account ID. If omitted, the value from
``INTERMEDIA_ACCOUNT_ID`` will be used when needed.
"""
client = await get_client()
return await client.request(
method=method,
path=path,
query=query,
json_body=json_body,
headers=headers,
account_id=account_id,
)
@server.tool()
async def refresh_access_token() -> Dict[str, Any]:
"""Force a refresh of the cached OAuth access token."""
client = await get_client()
token = await client._refresh_token()
return {"access_token": token}
@server.on_shutdown
async def _shutdown() -> None:
client = client_holder.get("client")
if client is not None:
await client.close()
return server
def run() -> None:
"""Entry point for running the MCP server from the command line."""
server = create_server()
server.run()
if __name__ == "__main__":
run()