Skip to main content
Glama

MCP Orchestration Server

mcp_workflow_engine.py24.9 kB
#!/usr/bin/env python3 """ MCP Workflow Engine - Automated Multi-Agent Task Execution Handles complex workflows like: "Process PDF and email summary to xyz@email.com" """ import asyncio import re import logging from datetime import datetime from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass from enum import Enum class WorkflowStep(Enum): """Workflow step types.""" DOCUMENT_ANALYSIS = "document_analysis" EMAIL_SENDING = "email_sending" DATA_EXTRACTION = "data_extraction" WEATHER_ANALYSIS = "weather_analysis" SUMMARY_GENERATION = "summary_generation" @dataclass class WorkflowTask: """Individual workflow task.""" step_type: WorkflowStep agent_id: str input_data: Dict[str, Any] output_key: str dependencies: List[str] = None @dataclass class WorkflowPlan: """Complete workflow execution plan.""" workflow_id: str description: str tasks: List[WorkflowTask] final_output: str class MCPWorkflowEngine: """Intelligent workflow engine for MCP system.""" def __init__(self, mongodb_integration=None): self.mongodb_integration = mongodb_integration self.logger = self._setup_logging() self.workflow_patterns = self._initialize_patterns() self.active_workflows: Dict[str, Dict[str, Any]] = {} def _setup_logging(self) -> logging.Logger: """Setup logging.""" logger = logging.getLogger("mcp_workflow_engine") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _initialize_patterns(self) -> List[Dict[str, Any]]: """Initialize workflow patterns for common requests.""" return [ { "pattern": r"(?:process|analyze|read)\s+(?:the\s+)?(.+?\.pdf).*?(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", "description": "Process PDF and email results", "workflow_type": "pdf_to_email", "example": "process weather.pdf and email summary to john@example.com" }, { "pattern": r"(?:analyze|extract|summarize)\s+(.+?)(?:\s+and\s+|\s+then\s+)(?:email|send|mail).*?(?:important|key|main)\s+points.*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", "description": "Extract key points and email them", "workflow_type": "extract_and_email", "example": "analyze document and email important points to xyz@email.com" }, { "pattern": r"(?:get|fetch|find)\s+(.+?)(?:\s+from\s+|\s+in\s+)(.+?)(?:\s+and\s+|\s+then\s+)(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", "description": "Extract specific data and email", "workflow_type": "data_extraction_email", "example": "get weather data from report.pdf and email to manager@company.com" }, { "pattern": r"(?:weather|forecast|temperature).*?(?:pdf|document|file).*?(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", "description": "Weather document analysis and email", "workflow_type": "weather_pdf_email", "example": "analyze weather pdf and email forecast to team@company.com" } ] def parse_user_request(self, user_request: str, documents: List[Dict[str, Any]] = None) -> Optional[WorkflowPlan]: """Parse user request and create workflow plan.""" try: self.logger.info(f"Parsing user request: {user_request}") # Try to match against known patterns for pattern_info in self.workflow_patterns: pattern = pattern_info["pattern"] match = re.search(pattern, user_request, re.IGNORECASE) if match: self.logger.info(f"Matched pattern: {pattern_info['description']}") workflow_type = pattern_info["workflow_type"] workflow_id = f"workflow_{datetime.now().timestamp()}" if workflow_type == "pdf_to_email": return self._create_pdf_email_workflow(workflow_id, match, documents, user_request) elif workflow_type == "extract_and_email": return self._create_extract_email_workflow(workflow_id, match, documents, user_request) elif workflow_type == "weather_pdf_email": return self._create_weather_email_workflow(workflow_id, match, documents, user_request) elif workflow_type == "data_extraction_email": return self._create_data_extraction_workflow(workflow_id, match, documents, user_request) # If no pattern matches, try to create a simple workflow return self._create_fallback_workflow(user_request, documents) except Exception as e: self.logger.error(f"Error parsing user request: {e}") return None def _create_pdf_email_workflow(self, workflow_id: str, match: re.Match, documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan: """Create PDF processing and email workflow.""" filename = match.group(1) if match.lastindex >= 1 else "document.pdf" email = match.group(2) if match.lastindex >= 2 else "user@example.com" tasks = [ WorkflowTask( step_type=WorkflowStep.DOCUMENT_ANALYSIS, agent_id="document_processor", input_data={ "documents": documents or [{"filename": filename, "content": "", "type": "pdf"}], "query": "extract key information, important points, and summary", "focus": "important points and summary" }, output_key="document_analysis", dependencies=[] ), WorkflowTask( step_type=WorkflowStep.EMAIL_SENDING, agent_id="gmail_agent", input_data={ "to_email": email, "subject": f"Analysis Results: {filename}", "template": "document_summary" }, output_key="email_result", dependencies=["document_analysis"] ) ] return WorkflowPlan( workflow_id=workflow_id, description=f"Process {filename} and email summary to {email}", tasks=tasks, final_output="email_result" ) def _create_weather_email_workflow(self, workflow_id: str, match: re.Match, documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan: """Create weather document analysis and email workflow.""" email = match.group(1) if match.lastindex >= 1 else "user@example.com" tasks = [ WorkflowTask( step_type=WorkflowStep.WEATHER_ANALYSIS, agent_id="document_processor", input_data={ "documents": documents or [], "query": "extract weather information, forecasts, temperatures, and important weather alerts", "focus": "weather data and forecasts" }, output_key="weather_analysis", dependencies=[] ), WorkflowTask( step_type=WorkflowStep.EMAIL_SENDING, agent_id="gmail_agent", input_data={ "to_email": email, "subject": "Weather Report Summary", "template": "weather_summary" }, output_key="email_result", dependencies=["weather_analysis"] ) ] return WorkflowPlan( workflow_id=workflow_id, description=f"Analyze weather document and email forecast to {email}", tasks=tasks, final_output="email_result" ) def _create_extract_email_workflow(self, workflow_id: str, match: re.Match, documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan: """Create extract key points and email workflow.""" email = match.group(2) if match.lastindex >= 2 else "user@example.com" tasks = [ WorkflowTask( step_type=WorkflowStep.DATA_EXTRACTION, agent_id="document_processor", input_data={ "documents": documents or [], "query": "extract the most important points, key findings, and critical information", "focus": "important points and key information" }, output_key="key_points", dependencies=[] ), WorkflowTask( step_type=WorkflowStep.EMAIL_SENDING, agent_id="gmail_agent", input_data={ "to_email": email, "subject": "Important Points Summary", "template": "key_points_summary" }, output_key="email_result", dependencies=["key_points"] ) ] return WorkflowPlan( workflow_id=workflow_id, description=f"Extract important points and email to {email}", tasks=tasks, final_output="email_result" ) def _create_data_extraction_workflow(self, workflow_id: str, match: re.Match, documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan: """Create data extraction and email workflow.""" data_type = match.group(1) if match.lastindex >= 1 else "data" source = match.group(2) if match.lastindex >= 2 else "document" email = match.group(3) if match.lastindex >= 3 else "user@example.com" tasks = [ WorkflowTask( step_type=WorkflowStep.DATA_EXTRACTION, agent_id="document_processor", input_data={ "documents": documents or [], "query": f"extract {data_type} from {source}", "focus": f"{data_type} extraction" }, output_key="extracted_data", dependencies=[] ), WorkflowTask( step_type=WorkflowStep.EMAIL_SENDING, agent_id="gmail_agent", input_data={ "to_email": email, "subject": f"Extracted {data_type.title()} from {source}", "template": "data_extraction_summary" }, output_key="email_result", dependencies=["extracted_data"] ) ] return WorkflowPlan( workflow_id=workflow_id, description=f"Extract {data_type} from {source} and email to {email}", tasks=tasks, final_output="email_result" ) def _create_fallback_workflow(self, user_request: str, documents: List[Dict[str, Any]]) -> Optional[WorkflowPlan]: """Create fallback workflow for unmatched requests.""" # Look for email addresses in the request email_pattern = r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})' email_match = re.search(email_pattern, user_request) if email_match and documents: email = email_match.group(1) workflow_id = f"fallback_{datetime.now().timestamp()}" tasks = [ WorkflowTask( step_type=WorkflowStep.DOCUMENT_ANALYSIS, agent_id="document_processor", input_data={ "documents": documents, "query": user_request, "focus": "general analysis" }, output_key="analysis_result", dependencies=[] ), WorkflowTask( step_type=WorkflowStep.EMAIL_SENDING, agent_id="gmail_agent", input_data={ "to_email": email, "subject": "Document Analysis Results", "template": "general_analysis" }, output_key="email_result", dependencies=["analysis_result"] ) ] return WorkflowPlan( workflow_id=workflow_id, description=f"Analyze documents and email results to {email}", tasks=tasks, final_output="email_result" ) return None async def execute_workflow(self, workflow_plan: WorkflowPlan) -> Dict[str, Any]: """Execute a complete workflow plan.""" try: self.logger.info(f"Executing workflow: {workflow_plan.description}") workflow_results = {} self.active_workflows[workflow_plan.workflow_id] = { "plan": workflow_plan, "results": workflow_results, "status": "running", "start_time": datetime.now() } # Execute tasks in dependency order for task in workflow_plan.tasks: self.logger.info(f"Executing task: {task.step_type.value}") # Wait for dependencies if task.dependencies: for dep in task.dependencies: if dep not in workflow_results: raise Exception(f"Dependency {dep} not found") # Execute task task_result = await self._execute_task(task, workflow_results) workflow_results[task.output_key] = task_result self.logger.info(f"Task {task.step_type.value} completed") # Mark workflow as completed self.active_workflows[workflow_plan.workflow_id]["status"] = "completed" self.active_workflows[workflow_plan.workflow_id]["end_time"] = datetime.now() final_result = workflow_results.get(workflow_plan.final_output, {}) return { "status": "success", "workflow_id": workflow_plan.workflow_id, "description": workflow_plan.description, "final_result": final_result, "all_results": workflow_results, "execution_time": (datetime.now() - self.active_workflows[workflow_plan.workflow_id]["start_time"]).total_seconds() } except Exception as e: self.logger.error(f"Workflow execution failed: {e}") if workflow_plan.workflow_id in self.active_workflows: self.active_workflows[workflow_plan.workflow_id]["status"] = "failed" self.active_workflows[workflow_plan.workflow_id]["error"] = str(e) return { "status": "error", "workflow_id": workflow_plan.workflow_id, "error": str(e), "description": workflow_plan.description } async def _execute_task(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]: """Execute individual workflow task.""" try: if task.step_type == WorkflowStep.DOCUMENT_ANALYSIS: return await self._execute_document_analysis(task, workflow_results) elif task.step_type == WorkflowStep.EMAIL_SENDING: return await self._execute_email_sending(task, workflow_results) elif task.step_type == WorkflowStep.WEATHER_ANALYSIS: return await self._execute_weather_analysis(task, workflow_results) elif task.step_type == WorkflowStep.DATA_EXTRACTION: return await self._execute_data_extraction(task, workflow_results) else: raise Exception(f"Unknown task type: {task.step_type}") except Exception as e: self.logger.error(f"Task execution failed: {e}") return { "status": "error", "error": str(e), "task_type": task.step_type.value } async def _execute_document_analysis(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]: """Execute document analysis task.""" if self.mongodb_integration: documents = task.input_data.get("documents", []) query = task.input_data.get("query", "analyze this document") if documents: doc = documents[0] # Process first document filename = doc.get("filename", "document.txt") content = doc.get("content", "") result = await self.mongodb_integration.process_document_with_agent(filename, content, query) return result # Fallback simulation return { "status": "success", "agent": "document_processor", "output": { "extracted_text": "Document analysis completed", "important_points": ["Key point 1", "Key point 2", "Key point 3"], "summary": "Document summary generated", "analysis": "Document analyzed successfully" } } async def _execute_weather_analysis(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]: """Execute weather analysis task.""" # Enhanced weather-specific analysis result = await self._execute_document_analysis(task, workflow_results) if result.get("status") == "success" and "output" in result: # Add weather-specific processing result["output"]["weather_forecast"] = "Sunny, 25°C, light winds" result["output"]["weather_alerts"] = ["No severe weather expected"] result["output"]["temperature_range"] = "20-28°C" return result async def _execute_data_extraction(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]: """Execute data extraction task.""" return await self._execute_document_analysis(task, workflow_results) async def _execute_email_sending(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]: """Execute email sending task using real Gmail agent.""" try: to_email = task.input_data.get("to_email", "user@example.com") subject = task.input_data.get("subject", "MCP Analysis Results") template = task.input_data.get("template", "general") # Get data from previous tasks email_content = self._generate_email_content(template, workflow_results, task.dependencies) # Use real Gmail agent if available if self.mongodb_integration: # Try to use real Gmail agent through MongoDB integration try: # Import and use real Gmail agent from agents.communication.real_gmail_agent import RealGmailAgent from agents.base_agent import MCPMessage gmail_agent = RealGmailAgent() # Create email message email_message = MCPMessage( id=f"workflow_email_{datetime.now().timestamp()}", method="send_email", params={ "to_email": to_email, "subject": subject, "content": email_content, "template": template }, timestamp=datetime.now() ) # Send email through real agent email_result = await gmail_agent.process_message(email_message) self.logger.info(f"Real email sent to {to_email}: {subject}") return email_result except ImportError as e: self.logger.warning(f"Real Gmail agent not available: {e}") except Exception as e: self.logger.error(f"Error using real Gmail agent: {e}") # Fallback to simulation email_result = { "status": "success", "agent": "gmail_agent", "email_sent": True, "to_email": to_email, "subject": subject, "content_preview": email_content[:200] + "..." if len(email_content) > 200 else email_content, "timestamp": datetime.now().isoformat(), "message": f"Email simulated to {to_email} (real Gmail agent not available)" } self.logger.info(f"Email simulated to {to_email}: {subject}") return email_result except Exception as e: return { "status": "error", "error": str(e), "agent": "gmail_agent" } def _generate_email_content(self, template: str, workflow_results: Dict[str, Any], dependencies: List[str]) -> str: """Generate email content based on template and workflow results.""" content = f"Subject: Analysis Results\n\n" # Get analysis results from dependencies for dep in dependencies or []: if dep in workflow_results: result = workflow_results[dep] if result.get("status") == "success" and "output" in result: output = result["output"] if template == "weather_summary": content += "Weather Analysis Summary:\n" content += f"Forecast: {output.get('weather_forecast', 'Not available')}\n" content += f"Temperature: {output.get('temperature_range', 'Not available')}\n" content += f"Alerts: {', '.join(output.get('weather_alerts', []))}\n\n" if "important_points" in output: content += "Important Points:\n" for i, point in enumerate(output["important_points"], 1): content += f"{i}. {point}\n" content += "\n" if "summary" in output: content += f"Summary:\n{output['summary']}\n\n" if "analysis" in output: content += f"Analysis:\n{output['analysis']}\n\n" content += "Generated by MCP Workflow Engine\n" content += f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" return content # Test function async def test_workflow_engine(): """Test the workflow engine.""" print("🔄 Testing MCP Workflow Engine") print("=" * 50) engine = MCPWorkflowEngine() # Test cases test_requests = [ "process weather.pdf and email summary to john@example.com", "analyze the quarterly report and email important points to manager@company.com", "get weather data from forecast.pdf and email to team@weather.com", "extract key findings from research.pdf and send to researcher@university.edu" ] for i, request in enumerate(test_requests, 1): print(f"\n{i}. Testing: {request}") # Parse request workflow_plan = engine.parse_user_request(request) if workflow_plan: print(f" ✅ Workflow created: {workflow_plan.description}") print(f" 📋 Tasks: {len(workflow_plan.tasks)}") # Execute workflow (simulation) result = await engine.execute_workflow(workflow_plan) if result["status"] == "success": print(f" ✅ Workflow executed successfully!") print(f" ⏱️ Execution time: {result['execution_time']:.3f}s") else: print(f" ❌ Workflow failed: {result.get('error', 'unknown error')}") else: print(f" ❌ Could not parse request") print(f"\n✅ Workflow engine testing completed!") if __name__ == "__main__": asyncio.run(test_workflow_engine())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server