Skip to main content
Glama
completion_client.py20.2 kB
"""High-level client for LLM completion operations.""" import asyncio from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple from ultimate_mcp_server.constants import Provider from ultimate_mcp_server.core.providers.base import BaseProvider, get_provider from ultimate_mcp_server.services.cache import get_cache_service from ultimate_mcp_server.utils import get_logger logger = get_logger("ultimate_mcp_server.clients.completion") class CompletionClient: """ High-level client for LLM text generation operations with advanced features. The CompletionClient provides a unified interface for interacting with various LLM providers (OpenAI, Anthropic, etc.) through a simple, consistent API. It abstracts away the complexity of provider-specific implementations, offering a range of features that enhance reliability and performance. Key features: - Multi-provider support with unified interface - Automatic fallback between providers - Result caching for improved performance and reduced costs - Streaming support for real-time text generation - Provider initialization and error handling - Comprehensive error handling and logging Architecture: The client follows a layered architecture pattern: 1. High-level methods (generate_completion, generate_completion_stream) provide the main API 2. Provider abstraction layer manages provider-specific implementation details 3. Caching layer intercepts requests to reduce redundant API calls 4. Error handling layer provides graceful fallbacks and informative errors Performance Considerations: - Caching is enabled by default and can significantly reduce API costs and latency - For time-sensitive or unique responses, caching can be disabled per request - Streaming mode reduces time-to-first-token but cannot leverage caching - Provider fallback adds resilience but may increase latency if primary providers fail This client is designed for MCP tools that require text generation using LLMs, making interactions more robust by handling common issues like rate limits, timeouts, and provider-specific errors. Example: ```python # Create client with default settings client = CompletionClient() # Generate text non-streaming with specific provider and model result = await client.generate_completion( prompt="Explain quantum computing", provider="anthropic", model="claude-3-5-haiku-20241022", temperature=0.5, max_tokens=1000 ) print(f"Generated by {result.model} in {result.processing_time:.2f}s") print(result.text) # Generate text with streaming for real-time output async for chunk, metadata in client.generate_completion_stream( prompt="Write a short story about robots", temperature=0.8 ): print(chunk, end="") if metadata.get("done", False): print("\nGeneration complete!") # Use provider fallback for high availability try: result = await client.try_providers( prompt="Summarize this article", providers=["openai", "anthropic", "gemini"], models=["gpt-4", "claude-instant-1", "gemini-pro"], temperature=0.3 ) except Exception as e: print(f"All providers failed: {e}") ``` """ def __init__(self, default_provider: str = Provider.OPENAI.value, use_cache_by_default: bool = True): """Initialize the completion client. Args: default_provider: Default provider to use for completions use_cache_by_default: Whether to use cache by default """ self.default_provider = default_provider self.cache_service = get_cache_service() self.use_cache_by_default = use_cache_by_default async def initialize_provider(self, provider_name: str, api_key: Optional[str] = None) -> BaseProvider: """ Initialize and return a provider instance ready for LLM interactions. This method handles the creation and initialization of a specific LLM provider, ensuring it's properly configured and ready to generate completions. It abstracts the details of provider initialization, including async initialization methods that some providers might require. The method performs several steps: 1. Retrieves the provider implementation based on the provider name 2. Applies the API key if provided (otherwise uses environment configuration) 3. Runs any provider-specific async initialization if required 4. Returns the ready-to-use provider instance Provider initialization follows these architecture principles: - Late binding: Providers are initialized on-demand, not at client creation - Dependency injection: API keys can be injected at runtime rather than relying only on environment - Fail-fast: Validation occurs during initialization rather than at generation time - Extensibility: New providers can be added without changing client code Common provider names include: - "openai": OpenAI API (GPT models) - "anthropic": Anthropic API (Claude models) - "google": Google AI/Vertex API (Gemini models) - "mistral": Mistral AI API (Mistral, Mixtral models) - "ollama": Local Ollama server for various open-source models Error handling: - Invalid provider names are caught and reported immediately - Authentication issues (e.g., invalid API keys) are detected during initialization - Provider-specific initialization failures are propagated with detailed error messages Args: provider_name: Identifier for the desired provider (e.g., "openai", "anthropic") api_key: Optional API key to use instead of environment-configured keys Returns: A fully initialized BaseProvider instance ready to generate completions Raises: ValueError: If the provider name is invalid or not supported Exception: If initialization fails (e.g., invalid API key, network issues) Note: This method is typically called internally by other client methods, but can be used directly when you need a specific provider instance for specialized operations not covered by the main client methods. Example: ```python # Get a specific provider instance for custom operations openai_provider = await client.initialize_provider("openai") # Custom operation using provider-specific features response = await openai_provider.some_specialized_method(...) ``` """ try: provider = await get_provider(provider_name, api_key=api_key) # Ensure the provider is initialized (some might need async init) if hasattr(provider, 'initialize') and asyncio.iscoroutinefunction(provider.initialize): await provider.initialize() return provider except Exception as e: logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error") raise async def generate_completion( self, prompt: str, provider: Optional[str] = None, model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, use_cache: bool = True, cache_ttl: int = 3600, **kwargs ): """ Generate text completion from an LLM with optional caching. This method provides a unified interface for generating text completions from any supported LLM provider. It includes intelligent caching to avoid redundant API calls for identical inputs, reducing costs and latency. The caching system: - Creates a unique key based on the prompt, provider, model, and parameters - Checks for cached results before making API calls - Stores successful responses with a configurable TTL - Can be disabled per-request with the use_cache parameter Args: prompt: The text prompt to send to the LLM provider: The LLM provider to use (e.g., "openai", "anthropic", "google") If None, uses the client's default_provider model: Specific model to use (e.g., "gpt-4", "claude-instant-1") If None, uses the provider's default model temperature: Sampling temperature for controlling randomness (0.0-1.0) Lower values are more deterministic, higher values more creative max_tokens: Maximum number of tokens to generate If None, uses provider-specific defaults use_cache: Whether to use the caching system (default: True) cache_ttl: Time-to-live for cache entries in seconds (default: 1 hour) **kwargs: Additional provider-specific parameters (e.g., top_p, frequency_penalty, presence_penalty) Returns: CompletionResult object with attributes: - text: The generated completion text - provider: The provider that generated the text - model: The model used - processing_time: Time taken to generate the completion (in seconds) - tokens: Token usage information (if available) - error: Error information (if an error occurred but was handled) Raises: ValueError: For invalid parameters Exception: For provider errors or other issues during generation Example: ```python result = await client.generate_completion( prompt="Write a poem about artificial intelligence", temperature=0.8, max_tokens=1000 ) print(f"Generated by {result.model} in {result.processing_time:.2f}s") print(result.text) ``` """ provider_name = provider or self.default_provider # Check cache if enabled if use_cache and self.cache_service.enabled: # Create a robust cache key provider_instance = await self.initialize_provider(provider_name) model_id = model or provider_instance.get_default_model() # Include relevant parameters in the cache key params_hash = hash((prompt, temperature, max_tokens, str(kwargs))) cache_key = f"completion:{provider_name}:{model_id}:{params_hash}" cached_result = await self.cache_service.get(cache_key) if cached_result is not None: logger.success("Cache hit! Using cached result", emoji_key="cache") # Set a nominal processing time for cached results cached_result.processing_time = 0.001 return cached_result # Cache miss or cache disabled if use_cache and self.cache_service.enabled: logger.info("Cache miss. Generating new completion...", emoji_key="processing") else: logger.info("Generating completion...", emoji_key="processing") # Initialize provider and generate completion try: provider_instance = await self.initialize_provider(provider_name) model_id = model or provider_instance.get_default_model() result = await provider_instance.generate_completion( prompt=prompt, model=model_id, temperature=temperature, max_tokens=max_tokens, **kwargs ) # Save to cache if enabled if use_cache and self.cache_service.enabled: await self.cache_service.set( key=cache_key, value=result, ttl=cache_ttl ) logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache") return result except Exception as e: logger.error(f"Error generating completion: {str(e)}", emoji_key="error") raise async def generate_completion_stream( self, prompt: str, provider: Optional[str] = None, model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]: """ Generate a streaming text completion with real-time chunks. This method provides a streaming interface to LLM text generation, where text is returned incrementally as it's generated, rather than waiting for the entire response. This enables real-time UI updates, faster apparent response times, and the ability to process partial responses. Unlike the non-streaming version, this method: - Does not support caching (each streaming response is unique) - Returns an async generator that yields content incrementally - Provides metadata with each chunk for tracking generation progress Args: prompt: The text prompt to send to the LLM provider: The LLM provider to use (e.g., "openai", "anthropic") If None, uses the client's default_provider model: Specific model to use (e.g., "gpt-4", "claude-instant-1") If None, uses the provider's default model temperature: Sampling temperature for controlling randomness (0.0-1.0) Lower values are more deterministic, higher values more creative max_tokens: Maximum number of tokens to generate If None, uses provider-specific defaults **kwargs: Additional provider-specific parameters Yields: Tuples of (chunk_text, metadata), where: - chunk_text: A string containing the next piece of generated text - metadata: A dictionary with information about the generation process: - done: Boolean indicating if this is the final chunk - chunk_index: Index of the current chunk (0-based) - token_count: Number of tokens in this chunk (if available) - total_tokens: Running total of tokens generated so far (if available) Raises: ValueError: For invalid parameters Exception: For provider errors or other issues during streaming Example: ```python # Display text as it's generated async for chunk, metadata in client.generate_completion_stream( prompt="Explain the theory of relativity", temperature=0.3 ): print(chunk, end="") if metadata.get("done", False): print("\nGeneration complete!") ``` Note: Not all providers support streaming completions. Check the provider documentation for compatibility. """ provider_name = provider or self.default_provider logger.info("Generating streaming completion...", emoji_key="processing") # Initialize provider and generate streaming completion try: provider_instance = await self.initialize_provider(provider_name) model_id = model or provider_instance.get_default_model() stream = provider_instance.generate_completion_stream( prompt=prompt, model=model_id, temperature=temperature, max_tokens=max_tokens, **kwargs ) async for chunk, metadata in stream: yield chunk, metadata except Exception as e: logger.error(f"Error generating streaming completion: {str(e)}", emoji_key="error") raise async def try_providers( self, prompt: str, providers: List[str], models: Optional[List[str]] = None, **kwargs ): """ Try multiple providers in sequence until one succeeds. This method implements an automatic fallback mechanism that attempts to generate a completion using a list of providers in order, continuing to the next provider if the current one fails. This provides resilience against provider downtime, rate limits, or other temporary failures. The method tries each provider exactly once in the order they're specified, with an optional corresponding model for each. This is useful for scenarios where you need high availability or want to implement prioritized provider selection. Args: prompt: The text prompt to send to the LLM providers: An ordered list of provider names to try (e.g., ["openai", "anthropic", "google"]) Providers are tried in the specified order until one succeeds models: Optional list of models to use with each provider If provided, must be the same length as providers If None, each provider's default model is used **kwargs: Additional parameters passed to generate_completion Applies to all provider attempts Returns: CompletionResult from the first successful provider, with the same structure as generate_completion results Raises: ValueError: If no providers are specified or if models list length doesn't match providers Exception: If all specified providers fail, with details of the last error Example: ```python # Try OpenAI first, fall back to Anthropic, then Google result = await client.try_providers( prompt="Write a sonnet about programming", providers=["openai", "anthropic", "google"], models=["gpt-4", "claude-2", "gemini-pro"], temperature=0.7, max_tokens=800 ) print(f"Successfully used {result.provider} with model {result.model}") print(result.text) ``` Note: Each provider attempt is logged, making it easy to track which providers succeeded or failed during the fallback sequence. """ if not providers: raise ValueError("No providers specified") models = models or [None] * len(providers) if len(models) != len(providers): raise ValueError("If models are specified, there must be one for each provider") last_error = None for i, provider_name in enumerate(providers): try: logger.info(f"Trying provider: {provider_name}", emoji_key="provider") result = await self.generate_completion( prompt=prompt, provider=provider_name, model=models[i], **kwargs ) return result except Exception as e: logger.warning(f"Provider {provider_name} failed: {str(e)}", emoji_key="warning") last_error = e # If we get here, all providers failed raise Exception(f"All providers failed. Last error: {str(last_error)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server