llama_service.pyโข15.5 kB
"""
Llama Service Module
Author: Yobie Benjamin
Version: 0.9
Date: August 1, 2025
This module provides the interface to Llama models via Ollama.
It handles model initialization, completion requests, streaming,
and function calling capabilities.
Features:
- Automatic model pulling if not available
- Support for both prompt and chat completions
- Streaming response support
- Function calling/tool use
- Model warm-up for better performance
- Retry logic with exponential backoff
"""
import asyncio
import json
from typing import Any, AsyncIterator, Dict, List, Optional, Union
import httpx
from ollama import AsyncClient, Client
from structlog import get_logger
from .config import Config
logger = get_logger(__name__)
class LlamaService:
"""
Service class for interacting with Llama models through Ollama.
This class provides:
- Model management (checking, pulling, listing)
- Text completion generation
- Chat-based completions
- Streaming responses
- Function calling support
- Error handling and retries
"""
def __init__(self, config: Config):
"""
Initialize the Llama service.
Args:
config: Configuration object with Ollama settings
"""
self.config = config
self.logger = logger.bind(service="llama")
# Initialize Ollama client
self.client = AsyncClient(host=self.config.llama_api_url)
self.sync_client = Client(host=self.config.llama_api_url)
# Model configuration
self.model_name = config.llama_model_name
self.default_params = config.get_model_params()
# State tracking
self.initialized = False
self.available_models: List[str] = []
self.logger.info(
"Llama service created",
model=self.model_name,
api_url=self.config.llama_api_url
)
async def initialize(self):
"""
Initialize the Llama service.
This method:
1. Checks Ollama connectivity
2. Verifies model availability
3. Pulls model if necessary
4. Performs warm-up
"""
if self.initialized:
return
self.logger.info("Initializing Llama service...")
try:
# Check Ollama connectivity
await self._check_ollama_connection()
# Check and pull model if needed
await self._ensure_model_available()
# Warm up the model
await self._warm_up_model()
self.initialized = True
self.logger.info(
"Llama service initialized successfully",
model=self.model_name,
available_models=self.available_models
)
except Exception as e:
self.logger.error(f"Failed to initialize Llama service: {e}", error=str(e))
# Don't fail completely - server can work in limited mode
self.logger.warning("Running in limited mode without Ollama")
async def _check_ollama_connection(self):
"""
Check if Ollama is accessible.
Raises:
ConnectionError: If Ollama is not reachable
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.config.llama_api_url}/api/tags",
timeout=5.0
)
response.raise_for_status()
self.logger.debug("Ollama connection verified")
except Exception as e:
raise ConnectionError(f"Cannot connect to Ollama at {self.config.llama_api_url}: {e}")
async def _ensure_model_available(self):
"""
Ensure the configured model is available locally.
If the model is not found, attempts to pull it automatically.
"""
try:
# List available models
models = await self.client.list()
self.available_models = [model['name'] for model in models.get('models', [])]
self.logger.debug(
"Available models",
models=self.available_models,
target=self.model_name
)
# Check if our model is available
if self.model_name not in self.available_models:
self.logger.warning(
f"Model {self.model_name} not found locally",
available=self.available_models
)
await self._pull_model()
else:
self.logger.info(f"Model {self.model_name} is available")
except Exception as e:
self.logger.error(f"Failed to check model availability: {e}", error=str(e))
raise
async def _pull_model(self):
"""
Pull (download) the configured model from Ollama registry.
This can take several minutes for large models.
Progress updates are logged.
"""
self.logger.info(f"Pulling model {self.model_name}... This may take a while.")
try:
# Pull with progress tracking
progress_stream = await self.client.pull(
model=self.model_name,
stream=True
)
last_percent = 0
async for progress in progress_stream:
# Log progress updates
if 'completed' in progress and 'total' in progress:
percent = int((progress['completed'] / progress['total']) * 100)
if percent > last_percent + 10: # Log every 10%
self.logger.info(
f"Pull progress: {percent}%",
model=self.model_name,
completed=progress['completed'],
total=progress['total']
)
last_percent = percent
# Check for completion
if progress.get('status') == 'success':
self.logger.info(f"Successfully pulled model {self.model_name}")
self.available_models.append(self.model_name)
return
except Exception as e:
self.logger.error(f"Failed to pull model: {e}", error=str(e))
raise RuntimeError(f"Cannot pull model {self.model_name}: {e}")
async def _warm_up_model(self):
"""
Warm up the model with a simple request.
This improves response time for subsequent requests.
"""
try:
self.logger.debug("Warming up model...")
await self.complete(
prompt="Hello",
max_tokens=1,
temperature=0.0
)
self.logger.debug("Model warm-up complete")
except Exception as e:
self.logger.warning(f"Model warm-up failed: {e}", error=str(e))
# Don't fail initialization if warm-up fails
async def complete(
self,
prompt: str,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
model: Optional[str] = None,
stream: bool = False,
**kwargs
) -> Union[str, AsyncIterator[str]]:
"""
Generate a completion for a single prompt.
Args:
prompt: Input prompt text
max_tokens: Maximum tokens to generate
temperature: Sampling temperature (0.0-2.0)
model: Model to use (overrides default)
stream: Whether to stream the response
**kwargs: Additional model parameters
Returns:
Generated text completion (or async iterator if streaming)
"""
model_name = model or self.model_name
# Merge parameters
params = self.default_params.copy()
if temperature is not None:
params['temperature'] = temperature
params.update(kwargs)
# Add max_tokens if specified
options = {}
if max_tokens:
options['num_predict'] = max_tokens
options.update(params)
self.logger.debug(
"Generating completion",
model=model_name,
prompt_length=len(prompt),
stream=stream,
options=options
)
try:
if stream and self.config.enable_streaming:
return self._stream_completion(prompt, model_name, options)
else:
response = await self.client.generate(
model=model_name,
prompt=prompt,
stream=False,
options=options
)
completion = response.get('response', '')
self.logger.debug(
"Completion generated",
length=len(completion),
tokens_used=response.get('eval_count', 0)
)
return completion
except Exception as e:
self.logger.error(f"Completion failed: {e}", error=str(e))
raise
async def _stream_completion(
self,
prompt: str,
model: str,
options: Dict[str, Any]
) -> AsyncIterator[str]:
"""
Stream a completion token by token.
Args:
prompt: Input prompt
model: Model name
options: Generation options
Yields:
Generated text tokens
"""
try:
stream = await self.client.generate(
model=model,
prompt=prompt,
stream=True,
options=options
)
async for chunk in stream:
if 'response' in chunk:
yield chunk['response']
except Exception as e:
self.logger.error(f"Streaming failed: {e}", error=str(e))
raise
async def complete_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
model: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
**kwargs
) -> str:
"""
Generate a completion for a chat conversation.
Args:
messages: List of chat messages with 'role' and 'content'
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
model: Model to use
tools: Available tools for function calling
**kwargs: Additional parameters
Returns:
Generated assistant response
"""
model_name = model or self.model_name
# Convert messages to Ollama format
ollama_messages = []
for msg in messages:
ollama_messages.append({
'role': msg.get('role', 'user'),
'content': msg.get('content', '')
})
# Merge parameters
params = self.default_params.copy()
if temperature is not None:
params['temperature'] = temperature
params.update(kwargs)
# Add max_tokens if specified
options = {}
if max_tokens:
options['num_predict'] = max_tokens
options.update(params)
self.logger.debug(
"Generating chat completion",
model=model_name,
message_count=len(messages),
has_tools=bool(tools)
)
try:
# Add tools if provided and function calling is enabled
if tools and self.config.enable_function_calling:
response = await self.client.chat(
model=model_name,
messages=ollama_messages,
tools=tools,
stream=False,
options=options
)
else:
response = await self.client.chat(
model=model_name,
messages=ollama_messages,
stream=False,
options=options
)
# Extract response
message = response.get('message', {})
content = message.get('content', '')
# Check for tool calls
if 'tool_calls' in message:
self.logger.debug(
"Model made tool calls",
tool_calls=message['tool_calls']
)
# Return tool calls as JSON for processing
return json.dumps({
'content': content,
'tool_calls': message['tool_calls']
})
self.logger.debug(
"Chat completion generated",
length=len(content),
tokens_used=response.get('eval_count', 0)
)
return content
except Exception as e:
self.logger.error(f"Chat completion failed: {e}", error=str(e))
raise
async def list_models(self) -> List[str]:
"""
List all available models.
Returns:
List of model names
"""
try:
models = await self.client.list()
model_names = [model['name'] for model in models.get('models', [])]
self.available_models = model_names
return model_names
except Exception as e:
self.logger.error(f"Failed to list models: {e}", error=str(e))
return []
async def get_model_info(self, model: Optional[str] = None) -> Dict[str, Any]:
"""
Get detailed information about a model.
Args:
model: Model name (uses default if not specified)
Returns:
Model information dictionary
"""
model_name = model or self.model_name
try:
info = await self.client.show(model=model_name)
return {
'name': model_name,
'parameters': info.get('parameters', {}),
'template': info.get('template', ''),
'model_info': info.get('model_info', {}),
}
except Exception as e:
self.logger.error(f"Failed to get model info: {e}", error=str(e))
return {'name': model_name, 'error': str(e)}
async def cleanup(self):
"""
Clean up resources.
This method ensures proper cleanup of:
- HTTP connections
- Model resources
- Temporary data
"""
self.logger.debug("Cleaning up Llama service...")
try:
# Close client connections if needed
pass # Ollama client handles this automatically
except Exception as e:
self.logger.error(f"Error during cleanup: {e}", error=str(e))