Skip to main content
Glama
ambivo-corp

Ambivo MCP Server

Official
by ambivo-corp
server.py13.7 kB
#!/usr/bin/env python3 """ MCP Server for Ambivo API Endpoints This MCP server provides access to Ambivo's entity/natural_query endpoint through standardized MCP tools. It handles authentication via JWT Bearer tokens with enhanced security, configuration management, and error handling. """ import asyncio import json import logging import os import time from typing import Any, Dict, List, Optional import httpx from mcp import types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions # Import from package modules from .config import ServerConfig, load_config from .security import InputValidator, RateLimiter, TokenValidator # Load configuration try: config_path = os.getenv("AMBIVO_CONFIG_FILE") config = load_config(config_path) logger = config.setup_logging() except Exception as e: # Fallback logging if config fails logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ambivo-mcp") logger.error(f"Failed to load configuration: {e}") # Use default config config = ServerConfig() # Initialize security components rate_limiter = RateLimiter( max_requests=config.rate_limit_requests, window_seconds=config.rate_limit_window ) input_validator = InputValidator( max_query_length=config.max_query_length, max_payload_size=config.max_payload_size ) token_validator = TokenValidator(cache_ttl=config.token_cache_ttl) # Server configuration server = Server(config.server_name) class AmbivoAPIClient: """Client for interacting with Ambivo API endpoints with enhanced error handling""" def __init__(self, config: ServerConfig, auth_token: Optional[str] = None): self.config = config self.base_url = config.base_url.rstrip("/") self.auth_token = auth_token self.client = httpx.AsyncClient(timeout=config.timeout) self.logger = logging.getLogger("ambivo-mcp.client") def set_auth_token(self, token: str): """Set the authentication token with validation""" try: if self.config.token_validation_enabled: token_validator.validate_token_format(token) self.auth_token = token self.logger.info("Authentication token set successfully") except ValueError as e: self.logger.error(f"Invalid token format: {e}") raise def _get_headers(self) -> Dict[str, str]: """Get headers for API requests""" headers = {"Content-Type": "application/json", "Accept": "application/json"} if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" return headers async def _make_request_with_retry( self, method: str, url: str, **kwargs ) -> httpx.Response: """Make HTTP request with retry logic""" last_exception = None for attempt in range(self.config.max_retries + 1): try: response = await self.client.request(method, url, **kwargs) return response except (httpx.TimeoutException, httpx.ConnectError, httpx.ReadError) as e: last_exception = e if attempt < self.config.max_retries: wait_time = 2**attempt # Exponential backoff self.logger.warning( f"Request attempt {attempt + 1} failed, retrying in {wait_time}s: {e}" ) await asyncio.sleep(wait_time) else: self.logger.error( f"All {self.config.max_retries + 1} request attempts failed" ) raise # This should never be reached, but just in case raise last_exception async def natural_query( self, query: str, response_format: str = "both" ) -> Dict[str, Any]: """ Execute a natural language query against entity data with validation and error handling Args: query: Natural language query string response_format: Response format - "table", "natural", or "both" Returns: API response dictionary """ # Validate inputs input_validator.validate_query(query) if response_format not in ["table", "natural", "both"]: raise ValueError( "Invalid response_format. Must be 'table', 'natural', or 'both'" ) payload = {"query": query, "response_format": response_format} url = f"{self.base_url}/entity/natural_query" try: self.logger.info(f"Executing natural query: {query[:100]}...") start_time = time.time() response = await self._make_request_with_retry( "POST", url, json=payload, headers=self._get_headers() ) elapsed_time = time.time() - start_time self.logger.info(f"Natural query completed in {elapsed_time:.2f}s") response.raise_for_status() result = response.json() self.logger.debug(f"API response: {json.dumps(result, indent=2)[:500]}...") return result except httpx.TimeoutException as e: self.logger.error(f"Natural query timeout: {e}") raise Exception(f"Request timeout after {self.config.timeout}s") except httpx.HTTPStatusError as e: self.logger.error( f"Natural query HTTP error: {e.response.status_code} - {e.response.text}" ) raise except Exception as e: self.logger.error(f"Natural query unexpected error: {e}") raise async def close(self): """Close the HTTP client""" await self.client.aclose() # Global client instance api_client = AmbivoAPIClient(config, auth_token=config.auth_token) @server.list_tools() async def handle_list_tools() -> List[types.Tool]: """ List available tools. """ return [ types.Tool( name="natural_query", description="Execute natural language queries against Ambivo entity data. " "This tool processes natural language queries and returns structured data " "about leads, contacts, opportunities, and other entities.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language query describing what data you want to retrieve. " "Examples: 'Show me leads created this week', 'Find contacts with gmail addresses', " "'List opportunities worth more than $10,000'", }, "response_format": { "type": "string", "enum": ["table", "natural", "both"], "default": "both", "description": "Format of the response: 'table' for structured data, " "'natural' for natural language description, 'both' for both formats", }, }, "required": ["query"], }, ), types.Tool( name="set_auth_token", description="Set the authentication token for API requests. " "This must be called before using other tools to authenticate with the Ambivo API.", inputSchema={ "type": "object", "properties": { "token": { "type": "string", "description": "JWT Bearer token for authentication with Ambivo API", } }, "required": ["token"], }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: Dict[str, Any] | None ) -> List[types.TextContent]: """ Handle tool calls with security and rate limiting. """ if arguments is None: arguments = {} start_time = time.time() logger.info(f"Tool call started: {name}") try: # Rate limiting (except for auth token setting) if name != "set_auth_token" and api_client.auth_token: client_id = token_validator.get_client_id_from_token(api_client.auth_token) if not rate_limiter.is_allowed(client_id): stats = rate_limiter.get_client_stats(client_id) return [ types.TextContent( type="text", text=f"Rate limit exceeded. Requests: {stats['requests']}/{config.rate_limit_requests}. " f"Reset in {stats['reset_time'] - time.time():.0f}s", ) ] if name == "set_auth_token": token = arguments.get("token") if not token: return [ types.TextContent( type="text", text="Error: Authentication token is required" ) ] # Validate and set token api_client.set_auth_token(token) # Cache the token if validation is enabled if config.token_validation_enabled: token_validator.cache_token(token) return [ types.TextContent( type="text", text="Authentication token set successfully. You can now use other tools to query the Ambivo API.", ) ] elif name == "natural_query": if not api_client.auth_token: return [ types.TextContent( type="text", text="Error: Authentication required. Please use the 'set_auth_token' tool first.", ) ] query = arguments.get("query") if not query: return [ types.TextContent( type="text", text="Error: Query parameter is required" ) ] response_format = arguments.get("response_format", "both") try: result = await api_client.natural_query(query, response_format) return [ types.TextContent( type="text", text=f"Natural Query Results:\n\n{json.dumps(result, indent=2)}", ) ] except httpx.HTTPStatusError as e: error_msg = f"HTTP {e.response.status_code}: {e.response.text}" return [types.TextContent(type="text", text=f"API Error: {error_msg}")] except Exception as e: return [ types.TextContent( type="text", text=f"Error executing natural query: {str(e)}" ) ] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except ValueError as e: # Input validation errors logger.warning(f"Validation error in tool {name}: {e}") return [types.TextContent(type="text", text=f"Validation Error: {str(e)}")] except httpx.HTTPStatusError as e: # HTTP errors from API logger.error(f"HTTP error in tool {name}: {e.response.status_code}") error_msg = f"API Error (HTTP {e.response.status_code})" try: error_detail = e.response.json() if "error_code" in error_detail: error_msg += f": {error_detail['error_code']}" except: error_msg += f": {e.response.text[:200]}" return [types.TextContent(type="text", text=error_msg)] except Exception as e: # Unexpected errors logger.exception(f"Unexpected error in tool {name}") return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")] finally: elapsed_time = time.time() - start_time logger.info(f"Tool call completed: {name} in {elapsed_time:.2f}s") async def main(): """Main entry point for the MCP server with enhanced initialization""" logger.info(f"Starting {config.server_name} v{config.server_version}") logger.info(f"Configuration: Base URL: {config.base_url}") logger.info( f"Security: Rate limit: {config.rate_limit_requests}/{config.rate_limit_window}s" ) # Import here to avoid issues with event loops import mcp.server.stdio try: async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name=config.server_name, server_version=config.server_version, capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {e}") raise finally: # Cleanup await api_client.close() logger.info("Server shutdown complete") def run_server(): """Synchronous wrapper for the async main function""" import asyncio asyncio.run(main()) if __name__ == "__main__": run_server()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ambivo-corp/ambivo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server