Skip to main content
Glama

Smart Code Search MCP Server

dynamic_context_manager.py19 kB
""" Dynamic Context Manager Automatically adapts to model capabilities queried from provider APIs """ import os import asyncio import logging from typing import Dict, Optional, List, Any from pathlib import Path from .context_manager import ContextManager, ContextItem from .model_info_manager import ModelInfoManager, ModelInfo, get_model_manager logger = logging.getLogger(__name__) class DynamicContextManager(ContextManager): """ Context manager that dynamically adapts to model capabilities Queries provider APIs for actual context windows and limits """ def __init__(self, model: str = None, provider: str = None, target_utilization: float = 0.9, enable_compression: bool = True, auto_detect: bool = True): """ Initialize dynamic context manager Args: model: Model name (auto-detected if not specified) provider: Provider name (auto-detected from model name) target_utilization: Target % of context window to use enable_compression: Whether to enable smart compression auto_detect: Whether to auto-detect model from environment """ # Auto-detect model if not specified if auto_detect and not model: model = self._detect_current_model() logger.info(f"Auto-detected model: {model}") # Initialize model info manager self.model_manager = get_model_manager() self.provider = provider self.model_info: Optional[ModelInfo] = None self.auto_detect = auto_detect # Store configuration self.requested_model = model or 'gpt-4' self.target_utilization = target_utilization # Initialize with conservative defaults first super().__init__( model='gpt-4', # Safe default with 8k context target_utilization=target_utilization, enable_compression=enable_compression ) # Schedule async initialization if we have a specific model if model: # Try synchronous initialization first self._init_model_info_sync(model, provider) def _detect_current_model(self) -> str: """ Detect current model from environment or MCP connection Returns: Model name """ # Check various environment variables model_env_vars = [ 'ANTHROPIC_MODEL', 'CLAUDE_MODEL', 'OPENAI_MODEL', 'GPT_MODEL', 'GEMINI_MODEL', 'GOOGLE_MODEL', 'AI_MODEL', 'LLM_MODEL' ] for var in model_env_vars: value = os.getenv(var) if value: logger.debug(f"Detected model from {var}: {value}") return value # Check if we're in Claude Desktop environment if os.getenv('ANTHROPIC_API_KEY'): # Default to Claude 3 Opus if we have Anthropic API key return 'claude-3-opus-20240229' elif os.getenv('OPENAI_API_KEY'): # Default to GPT-4 Turbo if we have OpenAI API key return 'gpt-4-turbo-preview' # Ultimate default return 'claude-3-opus-20240229' def _init_model_info_sync(self, model: str, provider: str = None): """ Synchronously initialize model information Args: model: Model name provider: Provider name """ try: self.model_info = self.model_manager.get_model_info_sync(model, provider) self._update_limits_from_info() logger.info(f"Model info loaded: {model} - {self.model_info.context_window:,} tokens") except Exception as e: logger.warning(f"Failed to load model info synchronously: {e}") async def init_model_info(self, model: str = None, provider: str = None): """ Asynchronously initialize model information Args: model: Model name (uses stored model if not specified) provider: Provider name """ model = model or self.requested_model try: self.model_info = await self.model_manager.get_model_info(model, provider) self._update_limits_from_info() logger.info(f"Model info loaded async: {model} - {self.model_info.context_window:,} tokens") except Exception as e: logger.error(f"Failed to load model info: {e}") # Fall back to sync version self._init_model_info_sync(model, provider) def _update_limits_from_info(self): """Update context limits from model info""" if not self.model_info: return # Update context window limits self.max_tokens = self.model_info.context_window self.response_buffer = self.model_info.max_output_tokens self.target_tokens = int( (self.max_tokens - self.response_buffer) * self.target_utilization ) # Update model name self.model = self.model_info.name # Adjust compression based on context size self._adjust_compression_strategy() def _adjust_compression_strategy(self): """Adjust compression strategy based on model context size""" if not self.model_info: return context = self.model_info.context_window if context < 10000: # Small context - aggressive compression logger.debug(f"Small context ({context}), using aggressive compression") self.compression_threshold = 0.5 # Compress when 50% full self.compression_ratio = 0.2 # Keep only 20% of content elif context < 50000: # Medium context - moderate compression logger.debug(f"Medium context ({context}), using moderate compression") self.compression_threshold = 0.7 self.compression_ratio = 0.4 elif context < 200000: # Large context - light compression logger.debug(f"Large context ({context}), using light compression") self.compression_threshold = 0.85 self.compression_ratio = 0.6 else: # Very large context - minimal compression logger.debug(f"Very large context ({context}), using minimal compression") self.compression_threshold = 0.9 self.compression_ratio = 0.8 async def adapt_to_model(self, new_model: str, provider: str = None): """ Adapt context to a different model Args: new_model: New model name provider: Provider name """ old_info = self.model_info # Get new model info new_info = await self.model_manager.get_model_info(new_model, provider) # Check if we need to adjust context if old_info and new_info.context_window < old_info.context_window: # Moving to smaller model - need to compress logger.info(f"Adapting from {old_info.context_window:,} to {new_info.context_window:,} tokens") await self._compress_for_smaller_window(new_info.context_window) # Update model info self.model_info = new_info self._update_limits_from_info() logger.info(f"Adapted to {new_model}: {new_info.context_window:,} tokens") async def _compress_for_smaller_window(self, new_window_size: int): """ Compress context to fit in smaller window Args: new_window_size: New context window size in tokens """ # Calculate how much we need to reduce current = self.current_tokens target = int(new_window_size * 0.8) # Leave 20% buffer if current <= target: return # Already fits reduction_needed = current - target logger.info(f"Need to reduce by {reduction_needed:,} tokens") # Sort items by priority (keep high priority) sorted_items = sorted( self.context_items, key=lambda x: (x.priority, -x.timestamp.timestamp()) ) # Aggressively compress or remove low priority items for item in reversed(sorted_items): if self.current_tokens <= target: break if item.priority >= 7: # Remove low priority items self.context_items.remove(item) self.current_tokens -= item.tokens logger.debug(f"Removed low priority item: {item.content_type}") elif item.priority >= 5: # Compress medium priority items compressed = self._compress_content( item.content, item.content_type, target_ratio=0.3 ) new_tokens = self.estimate_tokens(compressed) if new_tokens < item.tokens: saved = item.tokens - new_tokens item.content = compressed item.tokens = new_tokens self.current_tokens -= saved logger.debug(f"Compressed {item.content_type}: saved {saved} tokens") def suggest_model_for_content(self, tokens_needed: int) -> Optional[str]: """ Suggest a model that can handle the content size Args: tokens_needed: Number of tokens needed Returns: Suggested model name or None """ # First check if current model can handle it if self.model_info and tokens_needed <= self.model_info.usable_context: return self.model_info.name # Get suggestion from model manager suggested = self.model_manager.suggest_model_for_size( tokens_needed, prefer_provider=self.provider ) if suggested: logger.info(f"Suggested model for {tokens_needed:,} tokens: {suggested}") else: logger.warning(f"No model found that can handle {tokens_needed:,} tokens") return suggested def get_cost_estimate(self, input_tokens: int = None, output_tokens: int = None) -> Dict[str, float]: """ Estimate cost for current operation Args: input_tokens: Input tokens (uses current context if not specified) output_tokens: Expected output tokens Returns: Cost breakdown dictionary """ if not self.model_info: return {'error': 'Model info not available', 'total': 0.0} # Use current context as input if not specified if input_tokens is None: input_tokens = self.current_tokens # Use reasonable default for output if not specified if output_tokens is None: output_tokens = min(2000, self.model_info.max_output_tokens // 2) input_cost = (input_tokens / 1000) * self.model_info.input_cost_per_1k output_cost = (output_tokens / 1000) * self.model_info.output_cost_per_1k return { 'model': self.model_info.name, 'input_tokens': input_tokens, 'output_tokens': output_tokens, 'input_cost': round(input_cost, 4), 'output_cost': round(output_cost, 4), 'total': round(input_cost + output_cost, 4), 'cost_per_1k_input': self.model_info.input_cost_per_1k, 'cost_per_1k_output': self.model_info.output_cost_per_1k } async def find_cheapest_capable_model(self, tokens_needed: int) -> Optional[str]: """ Find the cheapest model that can handle the token requirement Args: tokens_needed: Number of tokens needed Returns: Model name or None """ candidates = [] # Check all available models available = self.model_manager.get_available_models() for provider, models in available.items(): for model in models: info = await self.model_manager.get_model_info(model, provider) if info.usable_context >= tokens_needed: # Calculate cost for typical operation cost = self.model_manager.estimate_cost( model, tokens_needed, 2000 # Typical output ) candidates.append({ 'model': model, 'provider': provider, 'cost': cost, 'context': info.context_window }) if not candidates: return None # Sort by cost (cheapest first) candidates.sort(key=lambda x: x['cost']) cheapest = candidates[0] logger.info( f"Cheapest model for {tokens_needed:,} tokens: " f"{cheapest['model']} (${cheapest['cost']:.4f})" ) return cheapest['model'] def get_model_comparison(self) -> Dict[str, Any]: """ Get comparison of current model with alternatives Returns: Comparison dictionary """ if not self.model_info: return {'error': 'Model info not available'} # Compare with common alternatives alternatives = [] if self.model_info.provider == 'anthropic': alternatives = ['gpt-4-turbo-preview', 'gemini-1.5-pro'] elif self.model_info.provider == 'openai': alternatives = ['claude-3-opus-20240229', 'gemini-1.5-pro'] else: alternatives = ['claude-3-opus-20240229', 'gpt-4-turbo-preview'] comparisons = {} for alt in alternatives: comparison = self.model_manager.compare_models( self.model_info.name, alt ) comparisons[alt] = comparison return { 'current_model': self.model_info.to_dict(), 'comparisons': comparisons, 'recommendation': self._get_model_recommendation() } def _get_model_recommendation(self) -> str: """Get recommendation based on current context usage""" if not self.model_info: return "Model info not available" utilization = self.current_tokens / self.model_info.context_window if utilization < 0.1: return f"Consider using a smaller, cheaper model. Current utilization: {utilization:.1%}" elif utilization > 0.8: return f"Consider using a larger model for more headroom. Current utilization: {utilization:.1%}" else: return f"Model size appropriate for current usage. Utilization: {utilization:.1%}" def export_config(self) -> Dict[str, Any]: """ Export current configuration for persistence Returns: Configuration dictionary """ config = { 'model': self.model_info.name if self.model_info else self.model, 'provider': self.model_info.provider if self.model_info else 'unknown', 'context_window': self.max_tokens, 'target_utilization': self.target_utilization, 'enable_compression': self.enable_compression, 'auto_detect': self.auto_detect, 'current_usage': { 'tokens': self.current_tokens, 'items': len(self.context_items), 'utilization': f"{(self.current_tokens / self.max_tokens) * 100:.1f}%" } } if self.model_info: config['model_info'] = self.model_info.to_dict() config['cost_estimate'] = self.get_cost_estimate() return config class ModelAdaptiveOrchestrator: """ Orchestrator that automatically adapts to different models """ def __init__(self, db, project_root: str = "."): """ Initialize model-adaptive orchestrator Args: db: Database connection project_root: Project root directory """ self.db = db self.project_root = Path(project_root) # Initialize dynamic context manager self.context_manager = DynamicContextManager(auto_detect=True) # Track model switches self.model_history = [] async def execute_with_best_model(self, flow_name: str, inputs: Dict) -> Dict: """ Execute flow with the best model for the content size Args: flow_name: Flow to execute inputs: Input parameters Returns: Execution results """ # Estimate input size input_tokens = self.context_manager.estimate_tokens(inputs) # Find best model if input_tokens > self.context_manager.max_tokens * 0.8: # Need larger model suggested = self.context_manager.suggest_model_for_content(input_tokens) if suggested and suggested != self.context_manager.model: logger.info(f"Switching to {suggested} for large input") await self.context_manager.adapt_to_model(suggested) self.model_history.append({ 'from': self.context_manager.model, 'to': suggested, 'reason': 'large_input', 'tokens': input_tokens }) # Import orchestrator here to avoid circular import from .context_aware_orchestrator import ContextAwareOrchestrator # Create orchestrator with adapted context orchestrator = ContextAwareOrchestrator( self.db, str(self.project_root), self.context_manager.model ) # Use our dynamic context manager orchestrator.context_manager = self.context_manager # Execute flow result = await orchestrator.execute_flow(flow_name, inputs) # Add model info to result result['_model_info'] = { 'model': self.context_manager.model, 'context_window': self.context_manager.max_tokens, 'cost_estimate': self.context_manager.get_cost_estimate() } return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server