Skip to main content
Glama
executor.py19.2 kB
""" Command execution framework for slash commands. This module provides the execution engine that integrates the markdown parser and command registry to execute slash commands with full context management. """ import asyncio import inspect from pathlib import Path from typing import Dict, Any, List, Optional, Union, Set from dataclasses import dataclass, field from datetime import datetime from enum import Enum import structlog from .parser import MarkdownParser, CommandBlock, FrontmatterData, CommandBlockType from .registry import CommandRegistry, Command, CommandCategory from ..utils.logging import get_logger from ..utils.errors import ValidationError, SystemError from ..utils.notifications import emit, EventCategory, EventPriority logger = get_logger(__name__) class ExecutionStatus(Enum): """Status of command execution.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" TIMEOUT = "timeout" class ExecutionPriority(Enum): """Priority levels for command execution.""" LOW = "low" NORMAL = "normal" HIGH = "high" CRITICAL = "critical" @dataclass class ExecutionContext: """Context for command execution.""" # Session information session_id: Optional[str] = None user_id: Optional[str] = None user_roles: Set[str] = field(default_factory=set) # File context file_path: Optional[Path] = None working_directory: Optional[Path] = None # Execution environment environment: Dict[str, str] = field(default_factory=dict) variables: Dict[str, Any] = field(default_factory=dict) # Frontmatter data frontmatter: Optional[FrontmatterData] = None # Execution settings timeout: Optional[float] = None priority: ExecutionPriority = ExecutionPriority.NORMAL dry_run: bool = False # Metadata metadata: Dict[str, Any] = field(default_factory=dict) def get_variable(self, name: str, default: Any = None) -> Any: """Get variable with fallback to environment.""" if name in self.variables: return self.variables[name] if name in self.environment: return self.environment[name] if self.frontmatter and self.frontmatter.has(name): return self.frontmatter.get(name) return default def set_variable(self, name: str, value: Any) -> None: """Set variable in context.""" self.variables[name] = value def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "session_id": self.session_id, "user_id": self.user_id, "user_roles": list(self.user_roles), "file_path": str(self.file_path) if self.file_path else None, "working_directory": str(self.working_directory) if self.working_directory else None, "environment": self.environment, "variables": self.variables, "frontmatter": self.frontmatter.to_dict() if self.frontmatter else None, "timeout": self.timeout, "priority": self.priority.value, "dry_run": self.dry_run, "metadata": self.metadata } @dataclass class ExecutionResult: """Result of command execution.""" # Command information command_name: str command_block: CommandBlock # Execution details status: ExecutionStatus start_time: datetime end_time: Optional[datetime] = None duration: Optional[float] = None # Results return_value: Any = None output: str = "" error: Optional[str] = None exit_code: Optional[int] = None # Context context: Optional[ExecutionContext] = None # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @property def success(self) -> bool: """Check if execution was successful.""" return self.status == ExecutionStatus.COMPLETED and ( self.exit_code is None or self.exit_code == 0 ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "command_name": self.command_name, "command_block": self.command_block.to_dict(), "status": self.status.value, "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat() if self.end_time else None, "duration": self.duration, "return_value": self.return_value, "output": self.output, "error": self.error, "exit_code": self.exit_code, "context": self.context.to_dict() if self.context else None, "metadata": self.metadata, "success": self.success } class CommandExecutor: """Command execution engine.""" def __init__(self, registry: Optional[CommandRegistry] = None): """Initialize command executor.""" self.registry = registry or CommandRegistry() self.parser = MarkdownParser() # Execution tracking self._active_executions: Dict[str, ExecutionResult] = {} self._execution_history: List[ExecutionResult] = [] self._execution_lock = asyncio.Lock() # Configuration self.max_concurrent_executions = 10 self.max_history_size = 1000 self.default_timeout = 300.0 # 5 minutes self.enable_history = True # Execution hooks self._pre_execution_hooks: List[callable] = [] self._post_execution_hooks: List[callable] = [] logger.info("command_executor_initialized") def add_pre_execution_hook(self, hook: callable) -> None: """Add pre-execution hook.""" self._pre_execution_hooks.append(hook) logger.debug("pre_execution_hook_added", hook=hook.__name__) def add_post_execution_hook(self, hook: callable) -> None: """Add post-execution hook.""" self._post_execution_hooks.append(hook) logger.debug("post_execution_hook_added", hook=hook.__name__) async def execute_markdown( self, content: str, context: Optional[ExecutionContext] = None, file_path: Optional[Path] = None ) -> List[ExecutionResult]: """ Execute all commands found in markdown content. Args: content: Markdown content to parse and execute context: Execution context file_path: Optional file path for context Returns: List of execution results """ # Parse markdown content parsed = self.parser.parse(content, file_path) # Create or update context if context is None: context = ExecutionContext() if file_path: context.file_path = file_path context.working_directory = file_path.parent # Add frontmatter to context if parsed.get('frontmatter'): context.frontmatter = FrontmatterData(**parsed['frontmatter']) # Execute all command blocks results = [] for block_data in parsed.get('command_blocks', []): command_block = CommandBlock(**block_data) try: result = await self.execute_command_block(command_block, context) results.append(result) except Exception as e: # Create failed result result = ExecutionResult( command_name=command_block.command_name, command_block=command_block, status=ExecutionStatus.FAILED, start_time=datetime.utcnow(), end_time=datetime.utcnow(), duration=0.0, error=str(e), context=context ) results.append(result) logger.error( "command_block_execution_failed", command=command_block.command_name, error=str(e) ) return results async def execute_markdown_file( self, file_path: Path, context: Optional[ExecutionContext] = None ) -> List[ExecutionResult]: """Execute commands in a markdown file.""" content = file_path.read_text(encoding='utf-8') return await self.execute_markdown(content, context, file_path) async def execute_command_block( self, command_block: CommandBlock, context: Optional[ExecutionContext] = None ) -> ExecutionResult: """ Execute a single command block. Args: command_block: Command block to execute context: Execution context Returns: Execution result """ if context is None: context = ExecutionContext() # Create execution result execution_id = f"{command_block.command_name}_{datetime.utcnow().timestamp()}" result = ExecutionResult( command_name=command_block.command_name, command_block=command_block, status=ExecutionStatus.PENDING, start_time=datetime.utcnow(), context=context ) # Check concurrent execution limit async with self._execution_lock: if len(self._active_executions) >= self.max_concurrent_executions: result.status = ExecutionStatus.FAILED result.error = "Maximum concurrent executions exceeded" result.end_time = datetime.utcnow() result.duration = 0.0 return result self._active_executions[execution_id] = result try: # Run pre-execution hooks for hook in self._pre_execution_hooks: try: if asyncio.iscoroutinefunction(hook): await hook(command_block, context) else: hook(command_block, context) except Exception as e: logger.warning( "pre_execution_hook_failed", hook=hook.__name__, error=str(e) ) # Update status result.status = ExecutionStatus.RUNNING # Execute command timeout = context.timeout or self.default_timeout if context.dry_run: # Dry run mode - don't actually execute result.status = ExecutionStatus.COMPLETED result.output = f"[DRY RUN] Would execute: {command_block.command_name}" result.return_value = {"dry_run": True} else: # Execute with timeout try: result.return_value = await asyncio.wait_for( self._execute_command(command_block, context), timeout=timeout ) result.status = ExecutionStatus.COMPLETED except asyncio.TimeoutError: result.status = ExecutionStatus.TIMEOUT result.error = f"Command timed out after {timeout}s" except Exception as e: result.status = ExecutionStatus.FAILED result.error = str(e) raise # Run post-execution hooks for hook in self._post_execution_hooks: try: if asyncio.iscoroutinefunction(hook): await hook(command_block, context, result) else: hook(command_block, context, result) except Exception as e: logger.warning( "post_execution_hook_failed", hook=hook.__name__, error=str(e) ) except Exception as e: result.status = ExecutionStatus.FAILED result.error = str(e) logger.error( "command_execution_error", command=command_block.command_name, error=str(e), exc_info=True ) finally: # Update result timing result.end_time = datetime.utcnow() result.duration = (result.end_time - result.start_time).total_seconds() # Remove from active executions async with self._execution_lock: self._active_executions.pop(execution_id, None) # Add to history if self.enable_history: self._execution_history.append(result) # Trim history if needed if len(self._execution_history) > self.max_history_size: self._execution_history = self._execution_history[-self.max_history_size:] # Emit execution event await self._emit_execution_event(result) return result async def _execute_command( self, command_block: CommandBlock, context: ExecutionContext ) -> Any: """Execute the actual command.""" # Get command from registry command = self.registry.get_command(command_block.command_name) if not command: raise ValidationError( "command", command_block.command_name, "Command not found in registry" ) # Execute using registry return await self.registry.execute_command( command_block.command_name, command_block.arguments, command_block.options, context=context.to_dict(), user_id=context.user_id, user_roles=context.user_roles ) async def execute_single_command( self, command_name: str, arguments: List[str] = None, options: Dict[str, Any] = None, context: Optional[ExecutionContext] = None ) -> ExecutionResult: """Execute a single command by name.""" if arguments is None: arguments = [] if options is None: options = {} if context is None: context = ExecutionContext() # Create command block command_block = CommandBlock( command_type=CommandBlockType.SLASH_COMMAND, command_name=command_name, arguments=arguments, options=options ) return await self.execute_command_block(command_block, context) def get_active_executions(self) -> Dict[str, ExecutionResult]: """Get currently active executions.""" return self._active_executions.copy() def get_execution_history( self, limit: Optional[int] = None, status: Optional[ExecutionStatus] = None ) -> List[ExecutionResult]: """Get execution history.""" history = self._execution_history.copy() # Filter by status if status: history = [r for r in history if r.status == status] # Apply limit if limit: history = history[-limit:] return history async def cancel_execution(self, execution_id: str) -> bool: """Cancel an active execution.""" async with self._execution_lock: if execution_id in self._active_executions: result = self._active_executions[execution_id] result.status = ExecutionStatus.CANCELLED result.end_time = datetime.utcnow() result.duration = (result.end_time - result.start_time).total_seconds() logger.info( "execution_cancelled", execution_id=execution_id, command=result.command_name ) return True return False async def _emit_execution_event(self, result: ExecutionResult) -> None: """Emit execution event.""" await emit( "command_executed", EventCategory.SYSTEM, { "command_name": result.command_name, "status": result.status.value, "duration": result.duration, "success": result.success }, priority=EventPriority.LOW ) def get_stats(self) -> Dict[str, Any]: """Get execution statistics.""" history = self._execution_history total_executions = len(history) if total_executions == 0: return { "total_executions": 0, "active_executions": len(self._active_executions), "success_rate": 0.0, "average_duration": 0.0, "by_status": {}, "by_command": {} } # Calculate statistics successful = len([r for r in history if r.success]) success_rate = successful / total_executions durations = [r.duration for r in history if r.duration is not None] average_duration = sum(durations) / len(durations) if durations else 0.0 # Group by status by_status = {} for status in ExecutionStatus: count = len([r for r in history if r.status == status]) if count > 0: by_status[status.value] = count # Group by command by_command = {} for result in history: cmd = result.command_name if cmd not in by_command: by_command[cmd] = {"count": 0, "success": 0, "avg_duration": 0.0} by_command[cmd]["count"] += 1 if result.success: by_command[cmd]["success"] += 1 if result.duration: current_avg = by_command[cmd]["avg_duration"] count = by_command[cmd]["count"] by_command[cmd]["avg_duration"] = ( (current_avg * (count - 1) + result.duration) / count ) return { "total_executions": total_executions, "active_executions": len(self._active_executions), "success_rate": success_rate, "average_duration": average_duration, "by_status": by_status, "by_command": by_command } # Export public API __all__ = [ 'CommandExecutor', 'ExecutionContext', 'ExecutionResult', 'ExecutionStatus', 'ExecutionPriority' ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server