Skip to main content
Glama

E-commerce Local MCP Server

factory.py11.8 kB
""" Factory for creating prompt enhancement components. Implements dependency injection and configuration. """ import logging from typing import Optional, Dict, Any from .interfaces import ( IEnhancementFactory, IQueryAnalyzer, IPromptEnhancer, IEnhancementValidator, IEnhancementCache, IMetricsCollector, IEnhancementOrchestrator, IModelManager ) from .analyzers.query_analyzer import IntelligentQueryAnalyzer, FastIntentPredictor from .enhancers.ai_enhancer import AIPromptEnhancer from .cache.redis_cache import InMemoryEnhancementCache from .orchestrator import EnhancementOrchestrator, SimpleMetricsCollector logger = logging.getLogger(__name__) class ModelManagerAdapter(IModelManager): """ Adapter to integrate existing model manager with enhancement system. """ def __init__(self, model_manager): """ Initialize adapter. Args: model_manager: Existing model manager instance """ self.model_manager = model_manager async def inference(self, prompt: str, max_tokens: int = 50, temperature: float = 0.2) -> Dict[str, Any]: """ Run model inference using dedicated enhancement model. Args: prompt: Input prompt max_tokens: Maximum tokens to generate temperature: Generation temperature Returns: Dict[str, Any]: Inference result """ try: # Use dedicated enhancement model (load once, use for all enhancements) optimal_model = self._get_dedicated_enhancement_model() # Load enhancement model only if not already loaded if self.model_manager.active_model != optimal_model: logger.info(f"Loading dedicated enhancement model: {optimal_model}") success = self.model_manager.load_model(optimal_model) if not success: raise RuntimeError(f"Failed to load enhancement model: {optimal_model}") # Use existing model manager result = self.model_manager.inference(prompt, max_tokens, temperature) # Ensure consistent return format if isinstance(result, dict): return result else: return { "text": str(result), "token_usage": {"total_tokens": max_tokens}, "processing_time": 0 } except Exception as e: logger.error(f"Model inference failed: {e}") raise def _get_dedicated_enhancement_model(self) -> str: """ Get the single, dedicated model for all enhancements. Returns: str: Dedicated enhancement model name """ # Priority order: balanced performance for all enhancement tasks preferred_models = [ "qwen2.5-1.5b", # Best balance of speed (3-4s) and accuracy (90%+) "qwen2.5-0.5b", # Ultra-fast (1-2s) but slightly lower accuracy (85%+) "qwen2.5-3b", # High accuracy (95%+) but slower (7-8s) "phi-3-mini" # Fallback if Qwen models unavailable ] # Return first available model for model_name in preferred_models: model_stats = self.model_manager.model_stats.get(model_name, {}) if model_stats.get("file_exists", False): return model_name # Ultimate fallback return "qwen2.5-1.5b" # Removed complex routing logic - using single dedicated model approach def is_available(self) -> bool: """Check if model is available""" try: # Auto-load a model if none is active if not self.model_manager.active_model: # Try to load the fastest available model for enhancement available_models = ['qwen2.5-1.5b', 'qwen2.5-3b', 'phi-3-mini'] for model_name in available_models: if self.model_manager.load_model(model_name): logger.info(f"Auto-loaded {model_name} for enhancement") break return hasattr(self.model_manager, 'active_model') and self.model_manager.active_model is not None except Exception as e: logger.error(f"Model availability check failed: {e}") return False class EnhancementFactory(IEnhancementFactory): """ Factory for creating enhancement system components. Handles dependency injection and configuration. """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize factory with configuration. Args: config: Configuration dictionary """ self.config = config or {} self._model_manager = None self._analyzer = None self._enhancer = None self._cache = None self._metrics_collector = None def configure_model_manager(self, model_manager) -> 'EnhancementFactory': """ Configure model manager for the factory. Args: model_manager: Model manager instance Returns: EnhancementFactory: Self for method chaining """ self._model_manager = ModelManagerAdapter(model_manager) return self def create_analyzer(self) -> IQueryAnalyzer: """Create query analyzer instance""" if self._analyzer is None: # Create intent predictor intent_predictor = FastIntentPredictor() # Create analyzer self._analyzer = IntelligentQueryAnalyzer(intent_predictor) return self._analyzer def create_enhancer(self, enhancer_type: str = "ai_dynamic") -> IPromptEnhancer: """Create prompt enhancer instance""" if self._enhancer is None: if enhancer_type != "ai_dynamic": raise ValueError(f"Unsupported enhancer type: {enhancer_type}") if not self._model_manager: raise ValueError("Model manager must be configured before creating enhancer") # Create analyzer if not exists analyzer = self.create_analyzer() # Create AI enhancer self._enhancer = AIPromptEnhancer(self._model_manager, analyzer) return self._enhancer def create_validator(self) -> IEnhancementValidator: """Create enhancement validator instance""" # Simple validator for now class SimpleValidator(IEnhancementValidator): async def validate(self, original: str, enhanced: str, context=None) -> float: if enhanced == original: return 0.0 length_ratio = len(enhanced) / len(original) if original else 1 return 0.8 if 1.2 <= length_ratio <= 3.0 else 0.5 return SimpleValidator() def create_cache(self) -> IEnhancementCache: """Create enhancement cache instance""" if self._cache is None: cache_config = self.config.get("cache", {}) max_size = cache_config.get("max_size", 1000) ttl = cache_config.get("ttl", 3600) self._cache = InMemoryEnhancementCache(max_size=max_size, default_ttl=ttl) logger.info("Using in-memory cache for enhancements") return self._cache def create_metrics_collector(self) -> IMetricsCollector: """Create metrics collector instance""" if self._metrics_collector is None: self._metrics_collector = SimpleMetricsCollector() return self._metrics_collector def create_orchestrator(self) -> IEnhancementOrchestrator: """Create enhancement orchestrator instance""" # Create all dependencies enhancer = self.create_enhancer() analyzer = self.create_analyzer() cache = self.create_cache() metrics_collector = self.create_metrics_collector() # Create orchestrator return EnhancementOrchestrator( enhancer=enhancer, analyzer=analyzer, cache=cache, metrics_collector=metrics_collector ) class EnhancementService: """ High-level service for prompt enhancement. Provides simple interface for integration. """ def __init__(self, model_manager, config: Optional[Dict[str, Any]] = None): """ Initialize enhancement service. Args: model_manager: Model manager instance config: Optional configuration """ self.config = config or {} # Create factory and configure factory = EnhancementFactory(config) factory.configure_model_manager(model_manager) # Create main orchestrator self.orchestrator = factory.create_orchestrator() logger.info("Enhancement service initialized successfully") async def enhance_query( self, query: str, context: Optional[Dict[str, Any]] = None, force_enhancement: bool = False ) -> Dict[str, Any]: """ Enhance a user query. Args: query: User query to enhance context: Optional context information force_enhancement: Force enhancement even if not needed Returns: Dict[str, Any]: Enhancement result """ from .models import EnhancementRequest, EnhancementContext # Create context object enhancement_context = None if context: enhancement_context = EnhancementContext( user_id=context.get("user_id"), shop_id=context.get("shop_id"), business_domain=context.get("business_domain", "ecommerce") ) # Create request request = EnhancementRequest( query=query, context=enhancement_context, force_enhancement=force_enhancement ) # Perform enhancement result = await self.orchestrator.enhance_query(request) # Return as dictionary for API response return result.to_dict() async def get_enhancement_preview( self, query: str, context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Get enhancement preview without caching. Args: query: Query to enhance context: Optional context Returns: Dict[str, Any]: Enhancement preview """ from .models import EnhancementContext # Create context object enhancement_context = None if context: enhancement_context = EnhancementContext( user_id=context.get("user_id"), shop_id=context.get("shop_id"), business_domain=context.get("business_domain", "ecommerce") ) # Get preview result = await self.orchestrator.get_enhancement_preview(query, enhancement_context) return result.to_dict() async def health_check(self) -> Dict[str, Any]: """Check health of enhancement system""" return await self.orchestrator.health_check() async def get_metrics(self) -> Dict[str, Any]: """Get enhancement metrics""" metrics = await self.orchestrator.metrics_collector.get_metrics() return { "total_requests": metrics.total_requests, "successful_enhancements": metrics.successful_enhancements, "cache_hits": metrics.cache_hits, "success_rate": metrics.success_rate, "cache_hit_rate": metrics.cache_hit_rate, "average_processing_time": metrics.average_processing_time, "average_confidence": metrics.average_confidence }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnisurRahman06046/mcptestwithmodel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server