context_strategies.py•12.8 kB
"""
Context Strategies for Different Orchestration Flows
Defines how each flow should manage its context window
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class ContextStrategy:
"""Defines context management strategy for a flow"""
name: str
max_file_tokens: int
max_total_tokens: Optional[int]
keep_errors: bool
compress_success: bool
history_window: int
priority_mapping: Dict[str, int]
compression_ratios: Dict[str, float]
focus_areas: Optional[List[str]] = None
def to_dict(self) -> Dict:
return {
"name": self.name,
"max_file_tokens": self.max_file_tokens,
"max_total_tokens": self.max_total_tokens,
"keep_errors": self.keep_errors,
"compress_success": self.compress_success,
"history_window": self.history_window,
"priority_mapping": self.priority_mapping,
"compression_ratios": self.compression_ratios,
"focus_areas": self.focus_areas
}
# Predefined strategies for different flows
CONTEXT_STRATEGIES = {
# Code review needs full context but can compress successful checks
"instant_review": ContextStrategy(
name="instant_review",
max_file_tokens=25000,
max_total_tokens=50000,
keep_errors=True,
compress_success=True,
history_window=5,
priority_mapping={
"error": 1,
"critical_issue": 2,
"high_complexity": 3,
"target_file": 3,
"related_file": 5,
"analysis_result": 6,
"suggestion": 7,
"metrics": 8,
"history": 9
},
compression_ratios={
"success": 0.2,
"analysis": 0.4,
"metrics": 0.3,
"history": 0.1
},
focus_areas=["errors", "complexity", "test_coverage"]
),
# Technical debt needs financial context
"debt_orchestrator": ContextStrategy(
name="debt_orchestrator",
max_file_tokens=20000,
max_total_tokens=40000,
keep_errors=True,
compress_success=False, # Keep all debt items
history_window=10,
priority_mapping={
"high_debt": 1,
"roi_calculation": 2,
"complexity_hotspot": 3,
"duplication": 4,
"metrics": 5,
"file_content": 6,
"suggestions": 7,
"history": 9
},
compression_ratios={
"code": 0.5,
"analysis": 0.6,
"history": 0.2
},
focus_areas=["debt_items", "roi", "quick_wins"]
),
# Test generation needs minimal context
"test_gap_analyzer": ContextStrategy(
name="test_gap_analyzer",
max_file_tokens=10000,
max_total_tokens=20000,
keep_errors=False,
compress_success=True,
history_window=3,
priority_mapping={
"untested_functions": 1,
"function_signatures": 2,
"existing_tests": 3,
"test_patterns": 4,
"generated_tests": 5,
"file_content": 7,
"metrics": 8
},
compression_ratios={
"code": 0.3, # Keep only function signatures
"existing_tests": 0.5,
"analysis": 0.4
},
focus_areas=["functions", "test_patterns", "coverage"]
),
# Import optimization needs all imports but little else
"import_optimizer": ContextStrategy(
name="import_optimizer",
max_file_tokens=15000,
max_total_tokens=20000,
keep_errors=True,
compress_success=True,
history_window=1,
priority_mapping={
"imports": 1,
"usage_analysis": 2,
"unused_imports": 2,
"suggestions": 4,
"file_content": 6,
"metrics": 8
},
compression_ratios={
"code": 0.2, # Keep only import section
"analysis": 0.5,
"history": 0.1
},
focus_areas=["imports", "usage"]
),
# Debugging needs maximum context
"debug_analyzer": ContextStrategy(
name="debug_analyzer",
max_file_tokens=50000,
max_total_tokens=100000,
keep_errors=True,
compress_success=False,
history_window=20,
priority_mapping={
"error": 1,
"stack_trace": 1,
"error_context": 2,
"related_code": 3,
"variable_state": 4,
"execution_flow": 5,
"file_content": 6,
"history": 7
},
compression_ratios={
"success": 0.8, # Keep most info for debugging
"code": 0.7,
"analysis": 0.8,
"history": 0.5
},
focus_areas=["errors", "stack_traces", "variables", "flow"]
),
# Performance analysis needs metrics focus
"performance_analyzer": ContextStrategy(
name="performance_analyzer",
max_file_tokens=15000,
max_total_tokens=30000,
keep_errors=False,
compress_success=True,
history_window=5,
priority_mapping={
"bottlenecks": 1,
"performance_metrics": 2,
"hot_paths": 3,
"profiling_data": 4,
"optimization_suggestions": 5,
"code": 7,
"history": 9
},
compression_ratios={
"code": 0.3,
"metrics": 0.6, # Keep more metrics
"analysis": 0.5,
"history": 0.2
},
focus_areas=["performance", "bottlenecks", "optimization"]
),
# Security analysis needs vulnerability focus
"security_analyzer": ContextStrategy(
name="security_analyzer",
max_file_tokens=30000,
max_total_tokens=60000,
keep_errors=True,
compress_success=False, # Keep all security findings
history_window=10,
priority_mapping={
"vulnerability": 1,
"security_issue": 1,
"sensitive_data": 2,
"authentication": 3,
"authorization": 3,
"input_validation": 4,
"code": 5,
"suggestions": 6,
"history": 9
},
compression_ratios={
"code": 0.5,
"analysis": 0.7, # Keep security details
"history": 0.3
},
focus_areas=["vulnerabilities", "authentication", "validation"]
),
# Browser/UI correlation needs different context
"ui_analyzer": ContextStrategy(
name="ui_analyzer",
max_file_tokens=10000,
max_total_tokens=30000,
keep_errors=True,
compress_success=True,
history_window=5,
priority_mapping={
"ui_state": 2,
"dom_snapshot": 3,
"user_interaction": 2,
"screenshot_url": 4,
"console_logs": 5,
"network_requests": 6,
"related_code": 4,
"test_suggestions": 5,
"history": 8
},
compression_ratios={
"dom": 0.3, # Compress DOM heavily
"console": 0.2,
"network": 0.3,
"code": 0.4,
"history": 0.1
},
focus_areas=["ui_elements", "interactions", "errors"]
),
# Default strategy for unknown flows
"default": ContextStrategy(
name="default",
max_file_tokens=20000,
max_total_tokens=40000,
keep_errors=True,
compress_success=True,
history_window=5,
priority_mapping={
"error": 1,
"critical": 2,
"important": 3,
"normal": 5,
"low": 7,
"history": 9
},
compression_ratios={
"code": 0.5,
"analysis": 0.5,
"history": 0.2
}
)
}
class StrategyManager:
"""Manages context strategies for different flows"""
def __init__(self):
self.strategies = CONTEXT_STRATEGIES.copy()
self.custom_strategies = {}
def get_strategy(self, flow_name: str) -> ContextStrategy:
"""
Get strategy for a flow
Args:
flow_name: Name of the orchestration flow
Returns:
ContextStrategy for the flow
"""
# Check custom strategies first
if flow_name in self.custom_strategies:
return self.custom_strategies[flow_name]
# Check predefined strategies
if flow_name in self.strategies:
return self.strategies[flow_name]
# Return default strategy
return self.strategies["default"]
def register_custom_strategy(self, strategy: ContextStrategy):
"""Register a custom strategy"""
self.custom_strategies[strategy.name] = strategy
def update_strategy(self, flow_name: str, updates: Dict[str, Any]):
"""
Update an existing strategy
Args:
flow_name: Name of the flow
updates: Dictionary of fields to update
"""
strategy = self.get_strategy(flow_name)
# Create a copy with updates
strategy_dict = strategy.to_dict()
strategy_dict.update(updates)
# Create new strategy
updated_strategy = ContextStrategy(**strategy_dict)
# Register as custom
self.custom_strategies[flow_name] = updated_strategy
def get_priority_for_content(self, flow_name: str, content_type: str) -> int:
"""
Get priority for a content type in a flow
Args:
flow_name: Name of the flow
content_type: Type of content
Returns:
Priority (1-10)
"""
strategy = self.get_strategy(flow_name)
return strategy.priority_mapping.get(content_type, 5)
def get_compression_ratio(self, flow_name: str, content_type: str) -> float:
"""
Get compression ratio for content type
Args:
flow_name: Name of the flow
content_type: Type of content
Returns:
Compression ratio (0.0-1.0)
"""
strategy = self.get_strategy(flow_name)
# Map content type to compression category
compression_key = content_type
if content_type in ["file", "file_chunk"]:
compression_key = "code"
elif content_type in ["result", "output"]:
compression_key = "analysis"
return strategy.compression_ratios.get(compression_key, 0.5)
def should_compress(self, flow_name: str, content_type: str, success: bool) -> bool:
"""
Determine if content should be compressed
Args:
flow_name: Name of the flow
content_type: Type of content
success: Whether the operation was successful
Returns:
True if should compress
"""
strategy = self.get_strategy(flow_name)
# Never compress errors if keep_errors is True
if content_type == "error" and strategy.keep_errors:
return False
# Compress success results if configured
if success and strategy.compress_success:
return True
# Check priority - compress low priority items
priority = self.get_priority_for_content(flow_name, content_type)
return priority >= 7
def get_max_tokens(self, flow_name: str, content_type: str) -> int:
"""
Get maximum tokens for content type
Args:
flow_name: Name of the flow
content_type: Type of content
Returns:
Maximum tokens allowed
"""
strategy = self.get_strategy(flow_name)
if content_type in ["file", "file_chunk"]:
return strategy.max_file_tokens
# For other content, use portion of total
return strategy.max_total_tokens // 4 if strategy.max_total_tokens else 10000
def list_strategies(self) -> List[str]:
"""List all available strategies"""
all_strategies = list(self.strategies.keys())
all_strategies.extend(self.custom_strategies.keys())
return list(set(all_strategies)) # Remove duplicates
def get_focus_areas(self, flow_name: str) -> Optional[List[str]]:
"""Get focus areas for a flow"""
strategy = self.get_strategy(flow_name)
return strategy.focus_areas