from typing import Any, Dict, Optional
import httpx
try:
from mcp.server.fastmcp import FastMCP as MCPServer
except ImportError:
# Fallback or help debugging
raise ImportError(
"Could not import FastMCP. Please ensure 'mcp[cli]' is installed or check version."
)
from auth import OHIPAuthenticator, OHIPConfig
mcp = MCPServer("OHIP Gateway")
@mcp.tool()
async def call_ohip_api(
method: str,
endpoint: str,
hotel_id: str,
body: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
config = OHIPConfig.from_env()
authenticator = await OHIPAuthenticator.get_instance()
token = await authenticator.get_valid_token()
url = f"{config.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {
"Authorization": f"Bearer {token}",
"x-app-key": config.app_key,
"x-hotelid": hotel_id,
"Accept": "application/json",
}
method_upper = method.upper()
try:
async with httpx.AsyncClient(timeout=60) as client:
response = await client.request(
method_upper,
url,
headers=headers,
json=body,
params=query_params,
)
status_code = response.status_code
try:
data = response.json()
except ValueError:
data = response.text
return {
"status_code": status_code,
"success": status_code < 400,
"data": data,
}
except httpx.RequestError as exc:
return {
"status_code": 0,
"success": False,
"data": str(exc),
}
if __name__ == "__main__":
mcp.run()