Skip to main content
Glama
categorization.py23.6 kB
""" Advanced categorization system for slash commands. This module provides enhanced command categorization capabilities including: - Auto-categorization based on command patterns - Dynamic category management - Category hierarchies and relationships - Smart category suggestions - Category-based command filtering and organization """ import re from typing import Dict, Any, List, Optional, Set, Tuple, Pattern from dataclasses import dataclass, field from enum import Enum from collections import defaultdict, Counter import structlog from .registry import CommandCategory, Command, CommandRegistry from .parser import CommandBlock from ..utils.logging import get_logger from ..utils.errors import ValidationError logger = get_logger(__name__) class AutoCategorizationStrategy(Enum): """Strategies for automatic categorization.""" KEYWORD_BASED = "keyword_based" PATTERN_BASED = "pattern_based" DESCRIPTION_BASED = "description_based" USAGE_BASED = "usage_based" HYBRID = "hybrid" @dataclass class CategoryRule: """Rule for automatic command categorization.""" category: CommandCategory patterns: List[Pattern] = field(default_factory=list) keywords: Set[str] = field(default_factory=set) description_patterns: List[Pattern] = field(default_factory=list) priority: int = 0 def matches(self, command: Command) -> Tuple[bool, float]: """ Check if command matches this rule. Returns: Tuple of (matches, confidence_score) """ confidence = 0.0 total_checks = 0 # Check command name patterns if self.patterns: total_checks += 1 for pattern in self.patterns: if pattern.search(command.metadata.name): confidence += 1.0 break # Check keywords in command name if self.keywords: total_checks += 1 name_words = set(command.metadata.name.lower().split('_')) if self.keywords.intersection(name_words): confidence += 1.0 # Check description patterns if self.description_patterns: total_checks += 1 for pattern in self.description_patterns: if pattern.search(command.metadata.description.lower()): confidence += 1.0 break if total_checks == 0: return False, 0.0 final_confidence = confidence / total_checks return final_confidence > 0.5, final_confidence @dataclass class CategoryMetrics: """Metrics for a command category.""" category: CommandCategory command_count: int = 0 usage_count: int = 0 average_execution_time: float = 0.0 success_rate: float = 0.0 most_used_commands: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "category": self.category.value, "command_count": self.command_count, "usage_count": self.usage_count, "average_execution_time": self.average_execution_time, "success_rate": self.success_rate, "most_used_commands": self.most_used_commands } @dataclass class CategoryHierarchy: """Hierarchical category relationships.""" parent: Optional[CommandCategory] = None children: Set[CommandCategory] = field(default_factory=set) level: int = 0 def add_child(self, child: CommandCategory) -> None: """Add child category.""" self.children.add(child) def remove_child(self, child: CommandCategory) -> None: """Remove child category.""" self.children.discard(child) def get_descendants(self) -> Set[CommandCategory]: """Get all descendant categories.""" descendants = set(self.children) for child in self.children: # This would require access to the full hierarchy # Implementation depends on CategoryManager context pass return descendants class CategoryManager: """Advanced category management system.""" def __init__(self, registry: Optional[CommandRegistry] = None): """Initialize category manager.""" self.registry = registry self._rules: List[CategoryRule] = [] self._hierarchy: Dict[CommandCategory, CategoryHierarchy] = {} self._metrics: Dict[CommandCategory, CategoryMetrics] = {} # Auto-categorization settings self.auto_categorization_enabled = True self.default_strategy = AutoCategorizationStrategy.HYBRID self.confidence_threshold = 0.7 # Initialize default rules self._initialize_default_rules() self._initialize_hierarchy() logger.info("category_manager_initialized") def _initialize_default_rules(self) -> None: """Initialize default categorization rules.""" default_rules = [ # System commands CategoryRule( category=CommandCategory.SYSTEM, patterns=[ re.compile(r'^(config|settings|status|health|info|version|help)'), re.compile(r'^(start|stop|restart|shutdown|reload)') ], keywords={'system', 'config', 'status', 'health', 'admin'}, description_patterns=[ re.compile(r'system|configuration|admin|manage'), re.compile(r'server|service|daemon') ], priority=10 ), # Session management CategoryRule( category=CommandCategory.SESSION, patterns=[ re.compile(r'^(session|connect|disconnect|login|logout)'), re.compile(r'^(save|load|restore|checkpoint)') ], keywords={'session', 'connect', 'save', 'load', 'restore'}, description_patterns=[ re.compile(r'session|connection|save|load|checkpoint'), re.compile(r'restore|backup|state') ], priority=9 ), # Development commands CategoryRule( category=CommandCategory.DEVELOPMENT, patterns=[ re.compile(r'^(build|compile|test|debug|deploy)'), re.compile(r'^(git|npm|pip|docker|k8s)') ], keywords={'build', 'test', 'debug', 'deploy', 'dev', 'code'}, description_patterns=[ re.compile(r'build|compile|test|debug|development'), re.compile(r'code|programming|software|deploy') ], priority=8 ), # Analysis commands CategoryRule( category=CommandCategory.ANALYSIS, patterns=[ re.compile(r'^(analyze|parse|extract|process)'), re.compile(r'^(report|stats|metrics|summary)') ], keywords={'analyze', 'parse', 'extract', 'report', 'stats'}, description_patterns=[ re.compile(r'analyze|analysis|parse|extract|process'), re.compile(r'report|statistics|metrics|data|summary') ], priority=7 ), # Automation commands CategoryRule( category=CommandCategory.AUTOMATION, patterns=[ re.compile(r'^(auto|schedule|cron|batch|bulk)'), re.compile(r'^(workflow|pipeline|task)') ], keywords={'auto', 'schedule', 'batch', 'workflow', 'automation'}, description_patterns=[ re.compile(r'automat|schedule|batch|workflow|pipeline'), re.compile(r'task|job|cron|recurring') ], priority=6 ), # Integration commands CategoryRule( category=CommandCategory.INTEGRATION, patterns=[ re.compile(r'^(api|webhook|sync|import|export)'), re.compile(r'^(github|slack|discord|email)') ], keywords={'api', 'webhook', 'sync', 'import', 'export', 'integration'}, description_patterns=[ re.compile(r'api|webhook|integrat|sync|import|export'), re.compile(r'github|slack|discord|email|external') ], priority=5 ) ] self._rules = sorted(default_rules, key=lambda r: r.priority, reverse=True) logger.debug("default_categorization_rules_initialized", rule_count=len(self._rules)) def _initialize_hierarchy(self) -> None: """Initialize category hierarchy.""" # System is top-level self._hierarchy[CommandCategory.SYSTEM] = CategoryHierarchy(level=0) # Development and Analysis are major categories self._hierarchy[CommandCategory.DEVELOPMENT] = CategoryHierarchy(level=1) self._hierarchy[CommandCategory.ANALYSIS] = CategoryHierarchy(level=1) # Session management under System self._hierarchy[CommandCategory.SESSION] = CategoryHierarchy( parent=CommandCategory.SYSTEM, level=2 ) self._hierarchy[CommandCategory.SYSTEM].add_child(CommandCategory.SESSION) # Automation and Integration as specialized categories self._hierarchy[CommandCategory.AUTOMATION] = CategoryHierarchy(level=1) self._hierarchy[CommandCategory.INTEGRATION] = CategoryHierarchy(level=1) # Utility as catch-all self._hierarchy[CommandCategory.UTILITY] = CategoryHierarchy(level=2) # Custom as user-defined self._hierarchy[CommandCategory.CUSTOM] = CategoryHierarchy(level=0) logger.debug("category_hierarchy_initialized") def add_categorization_rule(self, rule: CategoryRule) -> None: """Add a new categorization rule.""" self._rules.append(rule) self._rules.sort(key=lambda r: r.priority, reverse=True) logger.debug( "categorization_rule_added", category=rule.category.value, priority=rule.priority ) def remove_categorization_rule(self, category: CommandCategory, index: int = 0) -> bool: """Remove a categorization rule.""" rules_for_category = [r for r in self._rules if r.category == category] if index < len(rules_for_category): rule_to_remove = rules_for_category[index] self._rules.remove(rule_to_remove) logger.debug("categorization_rule_removed", category=category.value) return True return False def auto_categorize_command( self, command: Command, strategy: Optional[AutoCategorizationStrategy] = None ) -> Tuple[CommandCategory, float]: """ Automatically categorize a command. Args: command: Command to categorize strategy: Categorization strategy to use Returns: Tuple of (suggested_category, confidence) """ if not self.auto_categorization_enabled: return CommandCategory.UTILITY, 0.0 strategy = strategy or self.default_strategy if strategy == AutoCategorizationStrategy.HYBRID: return self._hybrid_categorization(command) elif strategy == AutoCategorizationStrategy.KEYWORD_BASED: return self._keyword_categorization(command) elif strategy == AutoCategorizationStrategy.PATTERN_BASED: return self._pattern_categorization(command) elif strategy == AutoCategorizationStrategy.DESCRIPTION_BASED: return self._description_categorization(command) elif strategy == AutoCategorizationStrategy.USAGE_BASED: return self._usage_categorization(command) else: return CommandCategory.UTILITY, 0.0 def _hybrid_categorization(self, command: Command) -> Tuple[CommandCategory, float]: """Hybrid categorization using multiple strategies.""" best_category = CommandCategory.UTILITY best_confidence = 0.0 # Check all rules for rule in self._rules: matches, confidence = rule.matches(command) if matches and confidence > best_confidence: best_category = rule.category best_confidence = confidence # Apply confidence threshold if best_confidence < self.confidence_threshold: return CommandCategory.UTILITY, best_confidence return best_category, best_confidence def _keyword_categorization(self, command: Command) -> Tuple[CommandCategory, float]: """Categorize based on keywords in command name.""" name_words = set(command.metadata.name.lower().split('_')) best_category = CommandCategory.UTILITY best_score = 0.0 for rule in self._rules: if rule.keywords: overlap = rule.keywords.intersection(name_words) score = len(overlap) / len(rule.keywords) if score > best_score: best_category = rule.category best_score = score return best_category, best_score def _pattern_categorization(self, command: Command) -> Tuple[CommandCategory, float]: """Categorize based on name patterns.""" best_category = CommandCategory.UTILITY best_confidence = 0.0 for rule in self._rules: for pattern in rule.patterns: if pattern.search(command.metadata.name): confidence = 1.0 # Pattern match is binary if confidence > best_confidence: best_category = rule.category best_confidence = confidence break return best_category, best_confidence def _description_categorization(self, command: Command) -> Tuple[CommandCategory, float]: """Categorize based on command description.""" description = command.metadata.description.lower() best_category = CommandCategory.UTILITY best_confidence = 0.0 for rule in self._rules: for pattern in rule.description_patterns: if pattern.search(description): confidence = 1.0 if confidence > best_confidence: best_category = rule.category best_confidence = confidence break return best_category, best_confidence def _usage_categorization(self, command: Command) -> Tuple[CommandCategory, float]: """Categorize based on usage patterns (requires usage data).""" # This would require integration with execution history # For now, return default return CommandCategory.UTILITY, 0.0 def suggest_category_for_command_block(self, command_block: CommandBlock) -> CommandCategory: """Suggest category for a command block.""" # Create a temporary command for categorization from .registry import CommandMetadata temp_metadata = CommandMetadata( name=command_block.command_name, description=command_block.content or f"Command: {command_block.command_name}", category=CommandCategory.UTILITY ) temp_command = Command(metadata=temp_metadata, handler=lambda: None) category, confidence = self.auto_categorize_command(temp_command) logger.debug( "category_suggested", command=command_block.command_name, suggested_category=category.value, confidence=confidence ) return category def get_category_metrics(self, category: CommandCategory) -> CategoryMetrics: """Get metrics for a category.""" if category not in self._metrics: self._metrics[category] = CategoryMetrics(category=category) # Update metrics if registry is available if self.registry: self._update_category_metrics(category) return self._metrics[category] def _update_category_metrics(self, category: CommandCategory) -> None: """Update metrics for a category.""" commands = self.registry.list_commands(category=category) if not commands: return metrics = self._metrics.setdefault(category, CategoryMetrics(category=category)) metrics.command_count = len(commands) total_usage = sum(cmd.usage_count for cmd in commands) metrics.usage_count = total_usage # Calculate success rate and execution time (simplified) if commands: metrics.success_rate = 1.0 # Would need execution history metrics.average_execution_time = 0.0 # Would need execution history # Most used commands sorted_commands = sorted(commands, key=lambda c: c.usage_count, reverse=True) metrics.most_used_commands = [cmd.metadata.name for cmd in sorted_commands[:5]] def get_all_category_metrics(self) -> Dict[CommandCategory, CategoryMetrics]: """Get metrics for all categories.""" if self.registry: for category in CommandCategory: self._update_category_metrics(category) return self._metrics.copy() def get_category_hierarchy(self) -> Dict[CommandCategory, CategoryHierarchy]: """Get category hierarchy.""" return self._hierarchy.copy() def get_related_categories(self, category: CommandCategory) -> Set[CommandCategory]: """Get categories related to the given one.""" hierarchy = self._hierarchy.get(category) if not hierarchy: return set() related = set() # Add parent if hierarchy.parent: related.add(hierarchy.parent) # Add children related.update(hierarchy.children) # Add siblings (same parent) if hierarchy.parent: parent_hierarchy = self._hierarchy.get(hierarchy.parent) if parent_hierarchy: siblings = parent_hierarchy.children - {category} related.update(siblings) return related def reorganize_commands_by_category(self) -> Dict[CommandCategory, List[str]]: """Reorganize all commands by their optimal categories.""" if not self.registry: return {} reorganization = defaultdict(list) for command in self.registry.list_commands(): optimal_category, confidence = self.auto_categorize_command(command) # Only suggest reorganization if confidence is high if confidence >= self.confidence_threshold and optimal_category != command.metadata.category: reorganization[optimal_category].append(command.metadata.name) logger.debug( "command_reorganization_suggested", command=command.metadata.name, current_category=command.metadata.category.value, suggested_category=optimal_category.value, confidence=confidence ) return dict(reorganization) def validate_category_distribution(self) -> Dict[str, Any]: """Validate the distribution of commands across categories.""" if not self.registry: return {"error": "No registry available"} distribution = Counter() total_commands = 0 for category in CommandCategory: commands = self.registry.list_commands(category=category) count = len(commands) distribution[category.value] = count total_commands += count # Calculate statistics avg_per_category = total_commands / len(CommandCategory) if total_commands > 0 else 0 # Identify categories that might be over/under-utilized overloaded = {cat: count for cat, count in distribution.items() if count > avg_per_category * 2} underutilized = {cat: count for cat, count in distribution.items() if count < avg_per_category * 0.5} return { "total_commands": total_commands, "distribution": dict(distribution), "average_per_category": avg_per_category, "overloaded_categories": overloaded, "underutilized_categories": underutilized, "most_used_category": distribution.most_common(1)[0] if distribution else None, "balance_score": self._calculate_balance_score(distribution) } def _calculate_balance_score(self, distribution: Counter) -> float: """Calculate balance score for category distribution (0-1, higher is better).""" if not distribution: return 0.0 counts = list(distribution.values()) if not counts: return 0.0 # Use coefficient of variation (lower is more balanced) mean_count = sum(counts) / len(counts) if mean_count == 0: return 1.0 variance = sum((count - mean_count) ** 2 for count in counts) / len(counts) std_dev = variance ** 0.5 cv = std_dev / mean_count # Convert to balance score (0-1, where 1 is perfectly balanced) return max(0.0, 1.0 - min(cv, 1.0)) def get_stats(self) -> Dict[str, Any]: """Get categorization system statistics.""" stats = { "auto_categorization_enabled": self.auto_categorization_enabled, "default_strategy": self.default_strategy.value, "confidence_threshold": self.confidence_threshold, "total_rules": len(self._rules), "rules_by_category": {}, "hierarchy_levels": {} } # Rules by category rule_counts = Counter(rule.category.value for rule in self._rules) stats["rules_by_category"] = dict(rule_counts) # Hierarchy levels level_counts = Counter(hierarchy.level for hierarchy in self._hierarchy.values()) stats["hierarchy_levels"] = dict(level_counts) # Category metrics if self.registry: stats["category_metrics"] = { cat.value: metrics.to_dict() for cat, metrics in self.get_all_category_metrics().items() } stats["distribution_analysis"] = self.validate_category_distribution() return stats # Export public API __all__ = [ 'CategoryManager', 'CategoryRule', 'CategoryMetrics', 'CategoryHierarchy', 'AutoCategorizationStrategy' ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server