Skip to main content
Glama
client.py7.77 kB
#!/usr/bin/env python3 """ MCP Router Client - Execute queries using routed models """ import os import json from typing import Optional, Dict, Any, Iterator try: from .router import MCPRouter, RoutingDecision, QueryContext except ImportError: from router import MCPRouter, RoutingDecision, QueryContext try: from openai import OpenAI as OpenAIClient HAS_OPENAI = True except ImportError: HAS_OPENAI = False try: import anthropic HAS_ANTHROPIC = True except ImportError: HAS_ANTHROPIC = False class MCPRouterClient: """Client for executing queries through the router.""" def __init__(self, router: Optional[MCPRouter] = None): """Initialize the client.""" self.router = router or MCPRouter() self.openai_clients = {} self.anthropic_clients = {} def _get_openai_client(self, api_key: Optional[str] = None) -> Optional[Any]: """Get or create OpenAI client.""" if not HAS_OPENAI: return None key = api_key or os.getenv("OPENAI_API_KEY") if not key: return None if key not in self.openai_clients: self.openai_clients[key] = OpenAIClient(api_key=key) return self.openai_clients[key] def _get_anthropic_client(self, api_key: Optional[str] = None) -> Optional[Any]: """Get or create Anthropic client.""" if not HAS_ANTHROPIC: return None key = api_key or os.getenv("ANTHROPIC_API_KEY") if not key: return None if key not in self.anthropic_clients: self.anthropic_clients[key] = anthropic.Anthropic(api_key=key) return self.anthropic_clients[key] def execute( self, query: str, system_prompt: Optional[str] = None, strategy: str = "balanced", **kwargs ) -> Dict[str, Any]: """ Execute a query using the routed model. Args: query: User query system_prompt: Optional system prompt strategy: Routing strategy **kwargs: Additional parameters for the model Returns: Response dictionary with content, model info, and metadata """ # Route query decision = self.router.route(query, strategy=strategy) model = decision.selected_model # Execute based on provider if model.provider == "openai": return self._execute_openai(model, query, system_prompt, **kwargs) elif model.provider == "anthropic": return self._execute_anthropic(model, query, system_prompt, **kwargs) else: raise ValueError(f"Unsupported provider: {model.provider}") def _execute_openai( self, model: Any, query: str, system_prompt: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """Execute query using OpenAI.""" client = self._get_openai_client() if not client: raise ValueError("OpenAI API key not found") messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": query}) response = client.chat.completions.create( model=model.model_id, messages=messages, **kwargs ) return { "content": response.choices[0].message.content, "model": model.model_id, "provider": model.provider, "usage": { "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens }, "metadata": { "finish_reason": response.choices[0].finish_reason } } def _execute_anthropic( self, model: Any, query: str, system_prompt: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """Execute query using Anthropic.""" client = self._get_anthropic_client() if not client: raise ValueError("Anthropic API key not found") messages = [{"role": "user", "content": query}] response = client.messages.create( model=model.model_id, messages=messages, system=system_prompt or "", **kwargs ) return { "content": response.content[0].text, "model": model.model_id, "provider": model.provider, "usage": { "input_tokens": response.usage.input_tokens, "output_tokens": response.usage.output_tokens }, "metadata": {} } def stream( self, query: str, system_prompt: Optional[str] = None, strategy: str = "balanced", **kwargs ) -> Iterator[Dict[str, Any]]: """ Stream response from routed model. Yields: Chunks of the response """ decision = self.router.route(query, strategy=strategy) model = decision.selected_model if not model.supports_streaming: # Fallback to non-streaming result = self.execute(query, system_prompt, strategy, **kwargs) yield result return if model.provider == "openai": yield from self._stream_openai(model, query, system_prompt, **kwargs) elif model.provider == "anthropic": yield from self._stream_anthropic(model, query, system_prompt, **kwargs) def _stream_openai( self, model: Any, query: str, system_prompt: Optional[str] = None, **kwargs ) -> Iterator[Dict[str, Any]]: """Stream response from OpenAI.""" client = self._get_openai_client() if not client: raise ValueError("OpenAI API key not found") messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": query}) stream = client.chat.completions.create( model=model.model_id, messages=messages, stream=True, **kwargs ) for chunk in stream: if chunk.choices[0].delta.content: yield { "content": chunk.choices[0].delta.content, "model": model.model_id, "done": False } yield {"done": True, "model": model.model_id} def _stream_anthropic( self, model: Any, query: str, system_prompt: Optional[str] = None, **kwargs ) -> Iterator[Dict[str, Any]]: """Stream response from Anthropic.""" client = self._get_anthropic_client() if not client: raise ValueError("Anthropic API key not found") messages = [{"role": "user", "content": query}] with client.messages.stream( model=model.model_id, messages=messages, system=system_prompt or "", **kwargs ) as stream: for text in stream.text_stream: yield { "content": text, "model": model.model_id, "done": False } yield {"done": True, "model": model.model_id}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AI-Castle-Labs/mcp-router'

If you have feedback or need assistance with the MCP directory API, please join our Discord server