dynamic_context_manager.py•19 kB
"""
Dynamic Context Manager
Automatically adapts to model capabilities queried from provider APIs
"""
import os
import asyncio
import logging
from typing import Dict, Optional, List, Any
from pathlib import Path
from .context_manager import ContextManager, ContextItem
from .model_info_manager import ModelInfoManager, ModelInfo, get_model_manager
logger = logging.getLogger(__name__)
class DynamicContextManager(ContextManager):
"""
Context manager that dynamically adapts to model capabilities
Queries provider APIs for actual context windows and limits
"""
def __init__(self, model: str = None, provider: str = None,
target_utilization: float = 0.9,
enable_compression: bool = True,
auto_detect: bool = True):
"""
Initialize dynamic context manager
Args:
model: Model name (auto-detected if not specified)
provider: Provider name (auto-detected from model name)
target_utilization: Target % of context window to use
enable_compression: Whether to enable smart compression
auto_detect: Whether to auto-detect model from environment
"""
# Auto-detect model if not specified
if auto_detect and not model:
model = self._detect_current_model()
logger.info(f"Auto-detected model: {model}")
# Initialize model info manager
self.model_manager = get_model_manager()
self.provider = provider
self.model_info: Optional[ModelInfo] = None
self.auto_detect = auto_detect
# Store configuration
self.requested_model = model or 'gpt-4'
self.target_utilization = target_utilization
# Initialize with conservative defaults first
super().__init__(
model='gpt-4', # Safe default with 8k context
target_utilization=target_utilization,
enable_compression=enable_compression
)
# Schedule async initialization if we have a specific model
if model:
# Try synchronous initialization first
self._init_model_info_sync(model, provider)
def _detect_current_model(self) -> str:
"""
Detect current model from environment or MCP connection
Returns:
Model name
"""
# Check various environment variables
model_env_vars = [
'ANTHROPIC_MODEL',
'CLAUDE_MODEL',
'OPENAI_MODEL',
'GPT_MODEL',
'GEMINI_MODEL',
'GOOGLE_MODEL',
'AI_MODEL',
'LLM_MODEL'
]
for var in model_env_vars:
value = os.getenv(var)
if value:
logger.debug(f"Detected model from {var}: {value}")
return value
# Check if we're in Claude Desktop environment
if os.getenv('ANTHROPIC_API_KEY'):
# Default to Claude 3 Opus if we have Anthropic API key
return 'claude-3-opus-20240229'
elif os.getenv('OPENAI_API_KEY'):
# Default to GPT-4 Turbo if we have OpenAI API key
return 'gpt-4-turbo-preview'
# Ultimate default
return 'claude-3-opus-20240229'
def _init_model_info_sync(self, model: str, provider: str = None):
"""
Synchronously initialize model information
Args:
model: Model name
provider: Provider name
"""
try:
self.model_info = self.model_manager.get_model_info_sync(model, provider)
self._update_limits_from_info()
logger.info(f"Model info loaded: {model} - {self.model_info.context_window:,} tokens")
except Exception as e:
logger.warning(f"Failed to load model info synchronously: {e}")
async def init_model_info(self, model: str = None, provider: str = None):
"""
Asynchronously initialize model information
Args:
model: Model name (uses stored model if not specified)
provider: Provider name
"""
model = model or self.requested_model
try:
self.model_info = await self.model_manager.get_model_info(model, provider)
self._update_limits_from_info()
logger.info(f"Model info loaded async: {model} - {self.model_info.context_window:,} tokens")
except Exception as e:
logger.error(f"Failed to load model info: {e}")
# Fall back to sync version
self._init_model_info_sync(model, provider)
def _update_limits_from_info(self):
"""Update context limits from model info"""
if not self.model_info:
return
# Update context window limits
self.max_tokens = self.model_info.context_window
self.response_buffer = self.model_info.max_output_tokens
self.target_tokens = int(
(self.max_tokens - self.response_buffer) * self.target_utilization
)
# Update model name
self.model = self.model_info.name
# Adjust compression based on context size
self._adjust_compression_strategy()
def _adjust_compression_strategy(self):
"""Adjust compression strategy based on model context size"""
if not self.model_info:
return
context = self.model_info.context_window
if context < 10000:
# Small context - aggressive compression
logger.debug(f"Small context ({context}), using aggressive compression")
self.compression_threshold = 0.5 # Compress when 50% full
self.compression_ratio = 0.2 # Keep only 20% of content
elif context < 50000:
# Medium context - moderate compression
logger.debug(f"Medium context ({context}), using moderate compression")
self.compression_threshold = 0.7
self.compression_ratio = 0.4
elif context < 200000:
# Large context - light compression
logger.debug(f"Large context ({context}), using light compression")
self.compression_threshold = 0.85
self.compression_ratio = 0.6
else:
# Very large context - minimal compression
logger.debug(f"Very large context ({context}), using minimal compression")
self.compression_threshold = 0.9
self.compression_ratio = 0.8
async def adapt_to_model(self, new_model: str, provider: str = None):
"""
Adapt context to a different model
Args:
new_model: New model name
provider: Provider name
"""
old_info = self.model_info
# Get new model info
new_info = await self.model_manager.get_model_info(new_model, provider)
# Check if we need to adjust context
if old_info and new_info.context_window < old_info.context_window:
# Moving to smaller model - need to compress
logger.info(f"Adapting from {old_info.context_window:,} to {new_info.context_window:,} tokens")
await self._compress_for_smaller_window(new_info.context_window)
# Update model info
self.model_info = new_info
self._update_limits_from_info()
logger.info(f"Adapted to {new_model}: {new_info.context_window:,} tokens")
async def _compress_for_smaller_window(self, new_window_size: int):
"""
Compress context to fit in smaller window
Args:
new_window_size: New context window size in tokens
"""
# Calculate how much we need to reduce
current = self.current_tokens
target = int(new_window_size * 0.8) # Leave 20% buffer
if current <= target:
return # Already fits
reduction_needed = current - target
logger.info(f"Need to reduce by {reduction_needed:,} tokens")
# Sort items by priority (keep high priority)
sorted_items = sorted(
self.context_items,
key=lambda x: (x.priority, -x.timestamp.timestamp())
)
# Aggressively compress or remove low priority items
for item in reversed(sorted_items):
if self.current_tokens <= target:
break
if item.priority >= 7:
# Remove low priority items
self.context_items.remove(item)
self.current_tokens -= item.tokens
logger.debug(f"Removed low priority item: {item.content_type}")
elif item.priority >= 5:
# Compress medium priority items
compressed = self._compress_content(
item.content,
item.content_type,
target_ratio=0.3
)
new_tokens = self.estimate_tokens(compressed)
if new_tokens < item.tokens:
saved = item.tokens - new_tokens
item.content = compressed
item.tokens = new_tokens
self.current_tokens -= saved
logger.debug(f"Compressed {item.content_type}: saved {saved} tokens")
def suggest_model_for_content(self, tokens_needed: int) -> Optional[str]:
"""
Suggest a model that can handle the content size
Args:
tokens_needed: Number of tokens needed
Returns:
Suggested model name or None
"""
# First check if current model can handle it
if self.model_info and tokens_needed <= self.model_info.usable_context:
return self.model_info.name
# Get suggestion from model manager
suggested = self.model_manager.suggest_model_for_size(
tokens_needed,
prefer_provider=self.provider
)
if suggested:
logger.info(f"Suggested model for {tokens_needed:,} tokens: {suggested}")
else:
logger.warning(f"No model found that can handle {tokens_needed:,} tokens")
return suggested
def get_cost_estimate(self, input_tokens: int = None,
output_tokens: int = None) -> Dict[str, float]:
"""
Estimate cost for current operation
Args:
input_tokens: Input tokens (uses current context if not specified)
output_tokens: Expected output tokens
Returns:
Cost breakdown dictionary
"""
if not self.model_info:
return {'error': 'Model info not available', 'total': 0.0}
# Use current context as input if not specified
if input_tokens is None:
input_tokens = self.current_tokens
# Use reasonable default for output if not specified
if output_tokens is None:
output_tokens = min(2000, self.model_info.max_output_tokens // 2)
input_cost = (input_tokens / 1000) * self.model_info.input_cost_per_1k
output_cost = (output_tokens / 1000) * self.model_info.output_cost_per_1k
return {
'model': self.model_info.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'input_cost': round(input_cost, 4),
'output_cost': round(output_cost, 4),
'total': round(input_cost + output_cost, 4),
'cost_per_1k_input': self.model_info.input_cost_per_1k,
'cost_per_1k_output': self.model_info.output_cost_per_1k
}
async def find_cheapest_capable_model(self, tokens_needed: int) -> Optional[str]:
"""
Find the cheapest model that can handle the token requirement
Args:
tokens_needed: Number of tokens needed
Returns:
Model name or None
"""
candidates = []
# Check all available models
available = self.model_manager.get_available_models()
for provider, models in available.items():
for model in models:
info = await self.model_manager.get_model_info(model, provider)
if info.usable_context >= tokens_needed:
# Calculate cost for typical operation
cost = self.model_manager.estimate_cost(
model,
tokens_needed,
2000 # Typical output
)
candidates.append({
'model': model,
'provider': provider,
'cost': cost,
'context': info.context_window
})
if not candidates:
return None
# Sort by cost (cheapest first)
candidates.sort(key=lambda x: x['cost'])
cheapest = candidates[0]
logger.info(
f"Cheapest model for {tokens_needed:,} tokens: "
f"{cheapest['model']} (${cheapest['cost']:.4f})"
)
return cheapest['model']
def get_model_comparison(self) -> Dict[str, Any]:
"""
Get comparison of current model with alternatives
Returns:
Comparison dictionary
"""
if not self.model_info:
return {'error': 'Model info not available'}
# Compare with common alternatives
alternatives = []
if self.model_info.provider == 'anthropic':
alternatives = ['gpt-4-turbo-preview', 'gemini-1.5-pro']
elif self.model_info.provider == 'openai':
alternatives = ['claude-3-opus-20240229', 'gemini-1.5-pro']
else:
alternatives = ['claude-3-opus-20240229', 'gpt-4-turbo-preview']
comparisons = {}
for alt in alternatives:
comparison = self.model_manager.compare_models(
self.model_info.name,
alt
)
comparisons[alt] = comparison
return {
'current_model': self.model_info.to_dict(),
'comparisons': comparisons,
'recommendation': self._get_model_recommendation()
}
def _get_model_recommendation(self) -> str:
"""Get recommendation based on current context usage"""
if not self.model_info:
return "Model info not available"
utilization = self.current_tokens / self.model_info.context_window
if utilization < 0.1:
return f"Consider using a smaller, cheaper model. Current utilization: {utilization:.1%}"
elif utilization > 0.8:
return f"Consider using a larger model for more headroom. Current utilization: {utilization:.1%}"
else:
return f"Model size appropriate for current usage. Utilization: {utilization:.1%}"
def export_config(self) -> Dict[str, Any]:
"""
Export current configuration for persistence
Returns:
Configuration dictionary
"""
config = {
'model': self.model_info.name if self.model_info else self.model,
'provider': self.model_info.provider if self.model_info else 'unknown',
'context_window': self.max_tokens,
'target_utilization': self.target_utilization,
'enable_compression': self.enable_compression,
'auto_detect': self.auto_detect,
'current_usage': {
'tokens': self.current_tokens,
'items': len(self.context_items),
'utilization': f"{(self.current_tokens / self.max_tokens) * 100:.1f}%"
}
}
if self.model_info:
config['model_info'] = self.model_info.to_dict()
config['cost_estimate'] = self.get_cost_estimate()
return config
class ModelAdaptiveOrchestrator:
"""
Orchestrator that automatically adapts to different models
"""
def __init__(self, db, project_root: str = "."):
"""
Initialize model-adaptive orchestrator
Args:
db: Database connection
project_root: Project root directory
"""
self.db = db
self.project_root = Path(project_root)
# Initialize dynamic context manager
self.context_manager = DynamicContextManager(auto_detect=True)
# Track model switches
self.model_history = []
async def execute_with_best_model(self, flow_name: str, inputs: Dict) -> Dict:
"""
Execute flow with the best model for the content size
Args:
flow_name: Flow to execute
inputs: Input parameters
Returns:
Execution results
"""
# Estimate input size
input_tokens = self.context_manager.estimate_tokens(inputs)
# Find best model
if input_tokens > self.context_manager.max_tokens * 0.8:
# Need larger model
suggested = self.context_manager.suggest_model_for_content(input_tokens)
if suggested and suggested != self.context_manager.model:
logger.info(f"Switching to {suggested} for large input")
await self.context_manager.adapt_to_model(suggested)
self.model_history.append({
'from': self.context_manager.model,
'to': suggested,
'reason': 'large_input',
'tokens': input_tokens
})
# Import orchestrator here to avoid circular import
from .context_aware_orchestrator import ContextAwareOrchestrator
# Create orchestrator with adapted context
orchestrator = ContextAwareOrchestrator(
self.db,
str(self.project_root),
self.context_manager.model
)
# Use our dynamic context manager
orchestrator.context_manager = self.context_manager
# Execute flow
result = await orchestrator.execute_flow(flow_name, inputs)
# Add model info to result
result['_model_info'] = {
'model': self.context_manager.model,
'context_window': self.context_manager.max_tokens,
'cost_estimate': self.context_manager.get_cost_estimate()
}
return result