"""High-level client for LLM completion operations."""
import asyncio
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, get_provider
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.clients.completion")
class CompletionClient:
"""
High-level client for LLM text generation operations with advanced features.
The CompletionClient provides a unified interface for interacting with various LLM providers
(OpenAI, Anthropic, etc.) through a simple, consistent API. It abstracts away the complexity
of provider-specific implementations, offering a range of features that enhance reliability
and performance.
Key features:
- Multi-provider support with unified interface
- Automatic fallback between providers
- Result caching for improved performance and reduced costs
- Streaming support for real-time text generation
- Provider initialization and error handling
- Comprehensive error handling and logging
Architecture:
The client follows a layered architecture pattern:
1. High-level methods (generate_completion, generate_completion_stream) provide the main API
2. Provider abstraction layer manages provider-specific implementation details
3. Caching layer intercepts requests to reduce redundant API calls
4. Error handling layer provides graceful fallbacks and informative errors
Performance Considerations:
- Caching is enabled by default and can significantly reduce API costs and latency
- For time-sensitive or unique responses, caching can be disabled per request
- Streaming mode reduces time-to-first-token but cannot leverage caching
- Provider fallback adds resilience but may increase latency if primary providers fail
This client is designed for MCP tools that require text generation using LLMs,
making interactions more robust by handling common issues like rate limits,
timeouts, and provider-specific errors.
Example:
```python
# Create client with default settings
client = CompletionClient()
# Generate text non-streaming with specific provider and model
result = await client.generate_completion(
prompt="Explain quantum computing",
provider="anthropic",
model="claude-3-5-haiku-20241022",
temperature=0.5,
max_tokens=1000
)
print(f"Generated by {result.model} in {result.processing_time:.2f}s")
print(result.text)
# Generate text with streaming for real-time output
async for chunk, metadata in client.generate_completion_stream(
prompt="Write a short story about robots",
temperature=0.8
):
print(chunk, end="")
if metadata.get("done", False):
print("\nGeneration complete!")
# Use provider fallback for high availability
try:
result = await client.try_providers(
prompt="Summarize this article",
providers=["openai", "anthropic", "gemini"],
models=["gpt-4", "claude-instant-1", "gemini-pro"],
temperature=0.3
)
except Exception as e:
print(f"All providers failed: {e}")
```
"""
def __init__(self, default_provider: str = Provider.OPENAI.value, use_cache_by_default: bool = True):
"""Initialize the completion client.
Args:
default_provider: Default provider to use for completions
use_cache_by_default: Whether to use cache by default
"""
self.default_provider = default_provider
self.cache_service = get_cache_service()
self.use_cache_by_default = use_cache_by_default
async def initialize_provider(self, provider_name: str, api_key: Optional[str] = None) -> BaseProvider:
"""
Initialize and return a provider instance ready for LLM interactions.
This method handles the creation and initialization of a specific LLM provider,
ensuring it's properly configured and ready to generate completions. It abstracts
the details of provider initialization, including async initialization methods
that some providers might require.
The method performs several steps:
1. Retrieves the provider implementation based on the provider name
2. Applies the API key if provided (otherwise uses environment configuration)
3. Runs any provider-specific async initialization if required
4. Returns the ready-to-use provider instance
Provider initialization follows these architecture principles:
- Late binding: Providers are initialized on-demand, not at client creation
- Dependency injection: API keys can be injected at runtime rather than relying only on environment
- Fail-fast: Validation occurs during initialization rather than at generation time
- Extensibility: New providers can be added without changing client code
Common provider names include:
- "openai": OpenAI API (GPT models)
- "anthropic": Anthropic API (Claude models)
- "google": Google AI/Vertex API (Gemini models)
- "mistral": Mistral AI API (Mistral, Mixtral models)
- "ollama": Local Ollama server for various open-source models
Error handling:
- Invalid provider names are caught and reported immediately
- Authentication issues (e.g., invalid API keys) are detected during initialization
- Provider-specific initialization failures are propagated with detailed error messages
Args:
provider_name: Identifier for the desired provider (e.g., "openai", "anthropic")
api_key: Optional API key to use instead of environment-configured keys
Returns:
A fully initialized BaseProvider instance ready to generate completions
Raises:
ValueError: If the provider name is invalid or not supported
Exception: If initialization fails (e.g., invalid API key, network issues)
Note:
This method is typically called internally by other client methods,
but can be used directly when you need a specific provider instance
for specialized operations not covered by the main client methods.
Example:
```python
# Get a specific provider instance for custom operations
openai_provider = await client.initialize_provider("openai")
# Custom operation using provider-specific features
response = await openai_provider.some_specialized_method(...)
```
"""
try:
provider = await get_provider(provider_name, api_key=api_key)
# Ensure the provider is initialized (some might need async init)
if hasattr(provider, 'initialize') and asyncio.iscoroutinefunction(provider.initialize):
await provider.initialize()
return provider
except Exception as e:
logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error")
raise
async def generate_completion(
self,
prompt: str,
provider: Optional[str] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
use_cache: bool = True,
cache_ttl: int = 3600,
**kwargs
):
"""
Generate text completion from an LLM with optional caching.
This method provides a unified interface for generating text completions from
any supported LLM provider. It includes intelligent caching to avoid redundant
API calls for identical inputs, reducing costs and latency.
The caching system:
- Creates a unique key based on the prompt, provider, model, and parameters
- Checks for cached results before making API calls
- Stores successful responses with a configurable TTL
- Can be disabled per-request with the use_cache parameter
Args:
prompt: The text prompt to send to the LLM
provider: The LLM provider to use (e.g., "openai", "anthropic", "google")
If None, uses the client's default_provider
model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
If None, uses the provider's default model
temperature: Sampling temperature for controlling randomness (0.0-1.0)
Lower values are more deterministic, higher values more creative
max_tokens: Maximum number of tokens to generate
If None, uses provider-specific defaults
use_cache: Whether to use the caching system (default: True)
cache_ttl: Time-to-live for cache entries in seconds (default: 1 hour)
**kwargs: Additional provider-specific parameters
(e.g., top_p, frequency_penalty, presence_penalty)
Returns:
CompletionResult object with attributes:
- text: The generated completion text
- provider: The provider that generated the text
- model: The model used
- processing_time: Time taken to generate the completion (in seconds)
- tokens: Token usage information (if available)
- error: Error information (if an error occurred but was handled)
Raises:
ValueError: For invalid parameters
Exception: For provider errors or other issues during generation
Example:
```python
result = await client.generate_completion(
prompt="Write a poem about artificial intelligence",
temperature=0.8,
max_tokens=1000
)
print(f"Generated by {result.model} in {result.processing_time:.2f}s")
print(result.text)
```
"""
provider_name = provider or self.default_provider
# Check cache if enabled
if use_cache and self.cache_service.enabled:
# Create a robust cache key
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
# Include relevant parameters in the cache key
params_hash = hash((prompt, temperature, max_tokens, str(kwargs)))
cache_key = f"completion:{provider_name}:{model_id}:{params_hash}"
cached_result = await self.cache_service.get(cache_key)
if cached_result is not None:
logger.success("Cache hit! Using cached result", emoji_key="cache")
# Set a nominal processing time for cached results
cached_result.processing_time = 0.001
return cached_result
# Cache miss or cache disabled
if use_cache and self.cache_service.enabled:
logger.info("Cache miss. Generating new completion...", emoji_key="processing")
else:
logger.info("Generating completion...", emoji_key="processing")
# Initialize provider and generate completion
try:
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
result = await provider_instance.generate_completion(
prompt=prompt,
model=model_id,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
# Save to cache if enabled
if use_cache and self.cache_service.enabled:
await self.cache_service.set(
key=cache_key,
value=result,
ttl=cache_ttl
)
logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache")
return result
except Exception as e:
logger.error(f"Error generating completion: {str(e)}", emoji_key="error")
raise
async def generate_completion_stream(
self,
prompt: str,
provider: Optional[str] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""
Generate a streaming text completion with real-time chunks.
This method provides a streaming interface to LLM text generation, where
text is returned incrementally as it's generated, rather than waiting for
the entire response. This enables real-time UI updates, faster apparent
response times, and the ability to process partial responses.
Unlike the non-streaming version, this method:
- Does not support caching (each streaming response is unique)
- Returns an async generator that yields content incrementally
- Provides metadata with each chunk for tracking generation progress
Args:
prompt: The text prompt to send to the LLM
provider: The LLM provider to use (e.g., "openai", "anthropic")
If None, uses the client's default_provider
model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
If None, uses the provider's default model
temperature: Sampling temperature for controlling randomness (0.0-1.0)
Lower values are more deterministic, higher values more creative
max_tokens: Maximum number of tokens to generate
If None, uses provider-specific defaults
**kwargs: Additional provider-specific parameters
Yields:
Tuples of (chunk_text, metadata), where:
- chunk_text: A string containing the next piece of generated text
- metadata: A dictionary with information about the generation process:
- done: Boolean indicating if this is the final chunk
- chunk_index: Index of the current chunk (0-based)
- token_count: Number of tokens in this chunk (if available)
- total_tokens: Running total of tokens generated so far (if available)
Raises:
ValueError: For invalid parameters
Exception: For provider errors or other issues during streaming
Example:
```python
# Display text as it's generated
async for chunk, metadata in client.generate_completion_stream(
prompt="Explain the theory of relativity",
temperature=0.3
):
print(chunk, end="")
if metadata.get("done", False):
print("\nGeneration complete!")
```
Note:
Not all providers support streaming completions. Check the provider
documentation for compatibility.
"""
provider_name = provider or self.default_provider
logger.info("Generating streaming completion...", emoji_key="processing")
# Initialize provider and generate streaming completion
try:
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
stream = provider_instance.generate_completion_stream(
prompt=prompt,
model=model_id,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
async for chunk, metadata in stream:
yield chunk, metadata
except Exception as e:
logger.error(f"Error generating streaming completion: {str(e)}", emoji_key="error")
raise
async def try_providers(
self,
prompt: str,
providers: List[str],
models: Optional[List[str]] = None,
**kwargs
):
"""
Try multiple providers in sequence until one succeeds.
This method implements an automatic fallback mechanism that attempts to generate
a completion using a list of providers in order, continuing to the next provider
if the current one fails. This provides resilience against provider downtime,
rate limits, or other temporary failures.
The method tries each provider exactly once in the order they're specified, with
an optional corresponding model for each. This is useful for scenarios where you
need high availability or want to implement prioritized provider selection.
Args:
prompt: The text prompt to send to the LLM
providers: An ordered list of provider names to try (e.g., ["openai", "anthropic", "google"])
Providers are tried in the specified order until one succeeds
models: Optional list of models to use with each provider
If provided, must be the same length as providers
If None, each provider's default model is used
**kwargs: Additional parameters passed to generate_completion
Applies to all provider attempts
Returns:
CompletionResult from the first successful provider,
with the same structure as generate_completion results
Raises:
ValueError: If no providers are specified or if models list length doesn't match providers
Exception: If all specified providers fail, with details of the last error
Example:
```python
# Try OpenAI first, fall back to Anthropic, then Google
result = await client.try_providers(
prompt="Write a sonnet about programming",
providers=["openai", "anthropic", "google"],
models=["gpt-4", "claude-2", "gemini-pro"],
temperature=0.7,
max_tokens=800
)
print(f"Successfully used {result.provider} with model {result.model}")
print(result.text)
```
Note:
Each provider attempt is logged, making it easy to track which providers
succeeded or failed during the fallback sequence.
"""
if not providers:
raise ValueError("No providers specified")
models = models or [None] * len(providers)
if len(models) != len(providers):
raise ValueError("If models are specified, there must be one for each provider")
last_error = None
for i, provider_name in enumerate(providers):
try:
logger.info(f"Trying provider: {provider_name}", emoji_key="provider")
result = await self.generate_completion(
prompt=prompt,
provider=provider_name,
model=models[i],
**kwargs
)
return result
except Exception as e:
logger.warning(f"Provider {provider_name} failed: {str(e)}", emoji_key="warning")
last_error = e
# If we get here, all providers failed
raise Exception(f"All providers failed. Last error: {str(last_error)}")