Skip to main content
Glama

Llama 4 Maverick MCP Server

by YobieBen
llama_service.pyโ€ข15.5 kB
""" Llama Service Module Author: Yobie Benjamin Version: 0.9 Date: August 1, 2025 This module provides the interface to Llama models via Ollama. It handles model initialization, completion requests, streaming, and function calling capabilities. Features: - Automatic model pulling if not available - Support for both prompt and chat completions - Streaming response support - Function calling/tool use - Model warm-up for better performance - Retry logic with exponential backoff """ import asyncio import json from typing import Any, AsyncIterator, Dict, List, Optional, Union import httpx from ollama import AsyncClient, Client from structlog import get_logger from .config import Config logger = get_logger(__name__) class LlamaService: """ Service class for interacting with Llama models through Ollama. This class provides: - Model management (checking, pulling, listing) - Text completion generation - Chat-based completions - Streaming responses - Function calling support - Error handling and retries """ def __init__(self, config: Config): """ Initialize the Llama service. Args: config: Configuration object with Ollama settings """ self.config = config self.logger = logger.bind(service="llama") # Initialize Ollama client self.client = AsyncClient(host=self.config.llama_api_url) self.sync_client = Client(host=self.config.llama_api_url) # Model configuration self.model_name = config.llama_model_name self.default_params = config.get_model_params() # State tracking self.initialized = False self.available_models: List[str] = [] self.logger.info( "Llama service created", model=self.model_name, api_url=self.config.llama_api_url ) async def initialize(self): """ Initialize the Llama service. This method: 1. Checks Ollama connectivity 2. Verifies model availability 3. Pulls model if necessary 4. Performs warm-up """ if self.initialized: return self.logger.info("Initializing Llama service...") try: # Check Ollama connectivity await self._check_ollama_connection() # Check and pull model if needed await self._ensure_model_available() # Warm up the model await self._warm_up_model() self.initialized = True self.logger.info( "Llama service initialized successfully", model=self.model_name, available_models=self.available_models ) except Exception as e: self.logger.error(f"Failed to initialize Llama service: {e}", error=str(e)) # Don't fail completely - server can work in limited mode self.logger.warning("Running in limited mode without Ollama") async def _check_ollama_connection(self): """ Check if Ollama is accessible. Raises: ConnectionError: If Ollama is not reachable """ try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.config.llama_api_url}/api/tags", timeout=5.0 ) response.raise_for_status() self.logger.debug("Ollama connection verified") except Exception as e: raise ConnectionError(f"Cannot connect to Ollama at {self.config.llama_api_url}: {e}") async def _ensure_model_available(self): """ Ensure the configured model is available locally. If the model is not found, attempts to pull it automatically. """ try: # List available models models = await self.client.list() self.available_models = [model['name'] for model in models.get('models', [])] self.logger.debug( "Available models", models=self.available_models, target=self.model_name ) # Check if our model is available if self.model_name not in self.available_models: self.logger.warning( f"Model {self.model_name} not found locally", available=self.available_models ) await self._pull_model() else: self.logger.info(f"Model {self.model_name} is available") except Exception as e: self.logger.error(f"Failed to check model availability: {e}", error=str(e)) raise async def _pull_model(self): """ Pull (download) the configured model from Ollama registry. This can take several minutes for large models. Progress updates are logged. """ self.logger.info(f"Pulling model {self.model_name}... This may take a while.") try: # Pull with progress tracking progress_stream = await self.client.pull( model=self.model_name, stream=True ) last_percent = 0 async for progress in progress_stream: # Log progress updates if 'completed' in progress and 'total' in progress: percent = int((progress['completed'] / progress['total']) * 100) if percent > last_percent + 10: # Log every 10% self.logger.info( f"Pull progress: {percent}%", model=self.model_name, completed=progress['completed'], total=progress['total'] ) last_percent = percent # Check for completion if progress.get('status') == 'success': self.logger.info(f"Successfully pulled model {self.model_name}") self.available_models.append(self.model_name) return except Exception as e: self.logger.error(f"Failed to pull model: {e}", error=str(e)) raise RuntimeError(f"Cannot pull model {self.model_name}: {e}") async def _warm_up_model(self): """ Warm up the model with a simple request. This improves response time for subsequent requests. """ try: self.logger.debug("Warming up model...") await self.complete( prompt="Hello", max_tokens=1, temperature=0.0 ) self.logger.debug("Model warm-up complete") except Exception as e: self.logger.warning(f"Model warm-up failed: {e}", error=str(e)) # Don't fail initialization if warm-up fails async def complete( self, prompt: str, max_tokens: Optional[int] = None, temperature: Optional[float] = None, model: Optional[str] = None, stream: bool = False, **kwargs ) -> Union[str, AsyncIterator[str]]: """ Generate a completion for a single prompt. Args: prompt: Input prompt text max_tokens: Maximum tokens to generate temperature: Sampling temperature (0.0-2.0) model: Model to use (overrides default) stream: Whether to stream the response **kwargs: Additional model parameters Returns: Generated text completion (or async iterator if streaming) """ model_name = model or self.model_name # Merge parameters params = self.default_params.copy() if temperature is not None: params['temperature'] = temperature params.update(kwargs) # Add max_tokens if specified options = {} if max_tokens: options['num_predict'] = max_tokens options.update(params) self.logger.debug( "Generating completion", model=model_name, prompt_length=len(prompt), stream=stream, options=options ) try: if stream and self.config.enable_streaming: return self._stream_completion(prompt, model_name, options) else: response = await self.client.generate( model=model_name, prompt=prompt, stream=False, options=options ) completion = response.get('response', '') self.logger.debug( "Completion generated", length=len(completion), tokens_used=response.get('eval_count', 0) ) return completion except Exception as e: self.logger.error(f"Completion failed: {e}", error=str(e)) raise async def _stream_completion( self, prompt: str, model: str, options: Dict[str, Any] ) -> AsyncIterator[str]: """ Stream a completion token by token. Args: prompt: Input prompt model: Model name options: Generation options Yields: Generated text tokens """ try: stream = await self.client.generate( model=model, prompt=prompt, stream=True, options=options ) async for chunk in stream: if 'response' in chunk: yield chunk['response'] except Exception as e: self.logger.error(f"Streaming failed: {e}", error=str(e)) raise async def complete_chat( self, messages: List[Dict[str, str]], max_tokens: Optional[int] = None, temperature: Optional[float] = None, model: Optional[str] = None, tools: Optional[List[Dict[str, Any]]] = None, **kwargs ) -> str: """ Generate a completion for a chat conversation. Args: messages: List of chat messages with 'role' and 'content' max_tokens: Maximum tokens to generate temperature: Sampling temperature model: Model to use tools: Available tools for function calling **kwargs: Additional parameters Returns: Generated assistant response """ model_name = model or self.model_name # Convert messages to Ollama format ollama_messages = [] for msg in messages: ollama_messages.append({ 'role': msg.get('role', 'user'), 'content': msg.get('content', '') }) # Merge parameters params = self.default_params.copy() if temperature is not None: params['temperature'] = temperature params.update(kwargs) # Add max_tokens if specified options = {} if max_tokens: options['num_predict'] = max_tokens options.update(params) self.logger.debug( "Generating chat completion", model=model_name, message_count=len(messages), has_tools=bool(tools) ) try: # Add tools if provided and function calling is enabled if tools and self.config.enable_function_calling: response = await self.client.chat( model=model_name, messages=ollama_messages, tools=tools, stream=False, options=options ) else: response = await self.client.chat( model=model_name, messages=ollama_messages, stream=False, options=options ) # Extract response message = response.get('message', {}) content = message.get('content', '') # Check for tool calls if 'tool_calls' in message: self.logger.debug( "Model made tool calls", tool_calls=message['tool_calls'] ) # Return tool calls as JSON for processing return json.dumps({ 'content': content, 'tool_calls': message['tool_calls'] }) self.logger.debug( "Chat completion generated", length=len(content), tokens_used=response.get('eval_count', 0) ) return content except Exception as e: self.logger.error(f"Chat completion failed: {e}", error=str(e)) raise async def list_models(self) -> List[str]: """ List all available models. Returns: List of model names """ try: models = await self.client.list() model_names = [model['name'] for model in models.get('models', [])] self.available_models = model_names return model_names except Exception as e: self.logger.error(f"Failed to list models: {e}", error=str(e)) return [] async def get_model_info(self, model: Optional[str] = None) -> Dict[str, Any]: """ Get detailed information about a model. Args: model: Model name (uses default if not specified) Returns: Model information dictionary """ model_name = model or self.model_name try: info = await self.client.show(model=model_name) return { 'name': model_name, 'parameters': info.get('parameters', {}), 'template': info.get('template', ''), 'model_info': info.get('model_info', {}), } except Exception as e: self.logger.error(f"Failed to get model info: {e}", error=str(e)) return {'name': model_name, 'error': str(e)} async def cleanup(self): """ Clean up resources. This method ensures proper cleanup of: - HTTP connections - Model resources - Temporary data """ self.logger.debug("Cleaning up Llama service...") try: # Close client connections if needed pass # Ollama client handles this automatically except Exception as e: self.logger.error(f"Error during cleanup: {e}", error=str(e))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YobieBen/llama4-maverick-mcp-python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server