mcp_workflow_engine.py•24.9 kB
#!/usr/bin/env python3
"""
MCP Workflow Engine - Automated Multi-Agent Task Execution
Handles complex workflows like: "Process PDF and email summary to xyz@email.com"
"""
import asyncio
import re
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class WorkflowStep(Enum):
"""Workflow step types."""
DOCUMENT_ANALYSIS = "document_analysis"
EMAIL_SENDING = "email_sending"
DATA_EXTRACTION = "data_extraction"
WEATHER_ANALYSIS = "weather_analysis"
SUMMARY_GENERATION = "summary_generation"
@dataclass
class WorkflowTask:
"""Individual workflow task."""
step_type: WorkflowStep
agent_id: str
input_data: Dict[str, Any]
output_key: str
dependencies: List[str] = None
@dataclass
class WorkflowPlan:
"""Complete workflow execution plan."""
workflow_id: str
description: str
tasks: List[WorkflowTask]
final_output: str
class MCPWorkflowEngine:
"""Intelligent workflow engine for MCP system."""
def __init__(self, mongodb_integration=None):
self.mongodb_integration = mongodb_integration
self.logger = self._setup_logging()
self.workflow_patterns = self._initialize_patterns()
self.active_workflows: Dict[str, Dict[str, Any]] = {}
def _setup_logging(self) -> logging.Logger:
"""Setup logging."""
logger = logging.getLogger("mcp_workflow_engine")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _initialize_patterns(self) -> List[Dict[str, Any]]:
"""Initialize workflow patterns for common requests."""
return [
{
"pattern": r"(?:process|analyze|read)\s+(?:the\s+)?(.+?\.pdf).*?(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})",
"description": "Process PDF and email results",
"workflow_type": "pdf_to_email",
"example": "process weather.pdf and email summary to john@example.com"
},
{
"pattern": r"(?:analyze|extract|summarize)\s+(.+?)(?:\s+and\s+|\s+then\s+)(?:email|send|mail).*?(?:important|key|main)\s+points.*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})",
"description": "Extract key points and email them",
"workflow_type": "extract_and_email",
"example": "analyze document and email important points to xyz@email.com"
},
{
"pattern": r"(?:get|fetch|find)\s+(.+?)(?:\s+from\s+|\s+in\s+)(.+?)(?:\s+and\s+|\s+then\s+)(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})",
"description": "Extract specific data and email",
"workflow_type": "data_extraction_email",
"example": "get weather data from report.pdf and email to manager@company.com"
},
{
"pattern": r"(?:weather|forecast|temperature).*?(?:pdf|document|file).*?(?:email|send|mail).*?(?:to\s+)?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})",
"description": "Weather document analysis and email",
"workflow_type": "weather_pdf_email",
"example": "analyze weather pdf and email forecast to team@company.com"
}
]
def parse_user_request(self, user_request: str, documents: List[Dict[str, Any]] = None) -> Optional[WorkflowPlan]:
"""Parse user request and create workflow plan."""
try:
self.logger.info(f"Parsing user request: {user_request}")
# Try to match against known patterns
for pattern_info in self.workflow_patterns:
pattern = pattern_info["pattern"]
match = re.search(pattern, user_request, re.IGNORECASE)
if match:
self.logger.info(f"Matched pattern: {pattern_info['description']}")
workflow_type = pattern_info["workflow_type"]
workflow_id = f"workflow_{datetime.now().timestamp()}"
if workflow_type == "pdf_to_email":
return self._create_pdf_email_workflow(workflow_id, match, documents, user_request)
elif workflow_type == "extract_and_email":
return self._create_extract_email_workflow(workflow_id, match, documents, user_request)
elif workflow_type == "weather_pdf_email":
return self._create_weather_email_workflow(workflow_id, match, documents, user_request)
elif workflow_type == "data_extraction_email":
return self._create_data_extraction_workflow(workflow_id, match, documents, user_request)
# If no pattern matches, try to create a simple workflow
return self._create_fallback_workflow(user_request, documents)
except Exception as e:
self.logger.error(f"Error parsing user request: {e}")
return None
def _create_pdf_email_workflow(self, workflow_id: str, match: re.Match,
documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan:
"""Create PDF processing and email workflow."""
filename = match.group(1) if match.lastindex >= 1 else "document.pdf"
email = match.group(2) if match.lastindex >= 2 else "user@example.com"
tasks = [
WorkflowTask(
step_type=WorkflowStep.DOCUMENT_ANALYSIS,
agent_id="document_processor",
input_data={
"documents": documents or [{"filename": filename, "content": "", "type": "pdf"}],
"query": "extract key information, important points, and summary",
"focus": "important points and summary"
},
output_key="document_analysis",
dependencies=[]
),
WorkflowTask(
step_type=WorkflowStep.EMAIL_SENDING,
agent_id="gmail_agent",
input_data={
"to_email": email,
"subject": f"Analysis Results: {filename}",
"template": "document_summary"
},
output_key="email_result",
dependencies=["document_analysis"]
)
]
return WorkflowPlan(
workflow_id=workflow_id,
description=f"Process {filename} and email summary to {email}",
tasks=tasks,
final_output="email_result"
)
def _create_weather_email_workflow(self, workflow_id: str, match: re.Match,
documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan:
"""Create weather document analysis and email workflow."""
email = match.group(1) if match.lastindex >= 1 else "user@example.com"
tasks = [
WorkflowTask(
step_type=WorkflowStep.WEATHER_ANALYSIS,
agent_id="document_processor",
input_data={
"documents": documents or [],
"query": "extract weather information, forecasts, temperatures, and important weather alerts",
"focus": "weather data and forecasts"
},
output_key="weather_analysis",
dependencies=[]
),
WorkflowTask(
step_type=WorkflowStep.EMAIL_SENDING,
agent_id="gmail_agent",
input_data={
"to_email": email,
"subject": "Weather Report Summary",
"template": "weather_summary"
},
output_key="email_result",
dependencies=["weather_analysis"]
)
]
return WorkflowPlan(
workflow_id=workflow_id,
description=f"Analyze weather document and email forecast to {email}",
tasks=tasks,
final_output="email_result"
)
def _create_extract_email_workflow(self, workflow_id: str, match: re.Match,
documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan:
"""Create extract key points and email workflow."""
email = match.group(2) if match.lastindex >= 2 else "user@example.com"
tasks = [
WorkflowTask(
step_type=WorkflowStep.DATA_EXTRACTION,
agent_id="document_processor",
input_data={
"documents": documents or [],
"query": "extract the most important points, key findings, and critical information",
"focus": "important points and key information"
},
output_key="key_points",
dependencies=[]
),
WorkflowTask(
step_type=WorkflowStep.EMAIL_SENDING,
agent_id="gmail_agent",
input_data={
"to_email": email,
"subject": "Important Points Summary",
"template": "key_points_summary"
},
output_key="email_result",
dependencies=["key_points"]
)
]
return WorkflowPlan(
workflow_id=workflow_id,
description=f"Extract important points and email to {email}",
tasks=tasks,
final_output="email_result"
)
def _create_data_extraction_workflow(self, workflow_id: str, match: re.Match,
documents: List[Dict[str, Any]], user_request: str) -> WorkflowPlan:
"""Create data extraction and email workflow."""
data_type = match.group(1) if match.lastindex >= 1 else "data"
source = match.group(2) if match.lastindex >= 2 else "document"
email = match.group(3) if match.lastindex >= 3 else "user@example.com"
tasks = [
WorkflowTask(
step_type=WorkflowStep.DATA_EXTRACTION,
agent_id="document_processor",
input_data={
"documents": documents or [],
"query": f"extract {data_type} from {source}",
"focus": f"{data_type} extraction"
},
output_key="extracted_data",
dependencies=[]
),
WorkflowTask(
step_type=WorkflowStep.EMAIL_SENDING,
agent_id="gmail_agent",
input_data={
"to_email": email,
"subject": f"Extracted {data_type.title()} from {source}",
"template": "data_extraction_summary"
},
output_key="email_result",
dependencies=["extracted_data"]
)
]
return WorkflowPlan(
workflow_id=workflow_id,
description=f"Extract {data_type} from {source} and email to {email}",
tasks=tasks,
final_output="email_result"
)
def _create_fallback_workflow(self, user_request: str, documents: List[Dict[str, Any]]) -> Optional[WorkflowPlan]:
"""Create fallback workflow for unmatched requests."""
# Look for email addresses in the request
email_pattern = r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
email_match = re.search(email_pattern, user_request)
if email_match and documents:
email = email_match.group(1)
workflow_id = f"fallback_{datetime.now().timestamp()}"
tasks = [
WorkflowTask(
step_type=WorkflowStep.DOCUMENT_ANALYSIS,
agent_id="document_processor",
input_data={
"documents": documents,
"query": user_request,
"focus": "general analysis"
},
output_key="analysis_result",
dependencies=[]
),
WorkflowTask(
step_type=WorkflowStep.EMAIL_SENDING,
agent_id="gmail_agent",
input_data={
"to_email": email,
"subject": "Document Analysis Results",
"template": "general_analysis"
},
output_key="email_result",
dependencies=["analysis_result"]
)
]
return WorkflowPlan(
workflow_id=workflow_id,
description=f"Analyze documents and email results to {email}",
tasks=tasks,
final_output="email_result"
)
return None
async def execute_workflow(self, workflow_plan: WorkflowPlan) -> Dict[str, Any]:
"""Execute a complete workflow plan."""
try:
self.logger.info(f"Executing workflow: {workflow_plan.description}")
workflow_results = {}
self.active_workflows[workflow_plan.workflow_id] = {
"plan": workflow_plan,
"results": workflow_results,
"status": "running",
"start_time": datetime.now()
}
# Execute tasks in dependency order
for task in workflow_plan.tasks:
self.logger.info(f"Executing task: {task.step_type.value}")
# Wait for dependencies
if task.dependencies:
for dep in task.dependencies:
if dep not in workflow_results:
raise Exception(f"Dependency {dep} not found")
# Execute task
task_result = await self._execute_task(task, workflow_results)
workflow_results[task.output_key] = task_result
self.logger.info(f"Task {task.step_type.value} completed")
# Mark workflow as completed
self.active_workflows[workflow_plan.workflow_id]["status"] = "completed"
self.active_workflows[workflow_plan.workflow_id]["end_time"] = datetime.now()
final_result = workflow_results.get(workflow_plan.final_output, {})
return {
"status": "success",
"workflow_id": workflow_plan.workflow_id,
"description": workflow_plan.description,
"final_result": final_result,
"all_results": workflow_results,
"execution_time": (datetime.now() - self.active_workflows[workflow_plan.workflow_id]["start_time"]).total_seconds()
}
except Exception as e:
self.logger.error(f"Workflow execution failed: {e}")
if workflow_plan.workflow_id in self.active_workflows:
self.active_workflows[workflow_plan.workflow_id]["status"] = "failed"
self.active_workflows[workflow_plan.workflow_id]["error"] = str(e)
return {
"status": "error",
"workflow_id": workflow_plan.workflow_id,
"error": str(e),
"description": workflow_plan.description
}
async def _execute_task(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute individual workflow task."""
try:
if task.step_type == WorkflowStep.DOCUMENT_ANALYSIS:
return await self._execute_document_analysis(task, workflow_results)
elif task.step_type == WorkflowStep.EMAIL_SENDING:
return await self._execute_email_sending(task, workflow_results)
elif task.step_type == WorkflowStep.WEATHER_ANALYSIS:
return await self._execute_weather_analysis(task, workflow_results)
elif task.step_type == WorkflowStep.DATA_EXTRACTION:
return await self._execute_data_extraction(task, workflow_results)
else:
raise Exception(f"Unknown task type: {task.step_type}")
except Exception as e:
self.logger.error(f"Task execution failed: {e}")
return {
"status": "error",
"error": str(e),
"task_type": task.step_type.value
}
async def _execute_document_analysis(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute document analysis task."""
if self.mongodb_integration:
documents = task.input_data.get("documents", [])
query = task.input_data.get("query", "analyze this document")
if documents:
doc = documents[0] # Process first document
filename = doc.get("filename", "document.txt")
content = doc.get("content", "")
result = await self.mongodb_integration.process_document_with_agent(filename, content, query)
return result
# Fallback simulation
return {
"status": "success",
"agent": "document_processor",
"output": {
"extracted_text": "Document analysis completed",
"important_points": ["Key point 1", "Key point 2", "Key point 3"],
"summary": "Document summary generated",
"analysis": "Document analyzed successfully"
}
}
async def _execute_weather_analysis(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute weather analysis task."""
# Enhanced weather-specific analysis
result = await self._execute_document_analysis(task, workflow_results)
if result.get("status") == "success" and "output" in result:
# Add weather-specific processing
result["output"]["weather_forecast"] = "Sunny, 25°C, light winds"
result["output"]["weather_alerts"] = ["No severe weather expected"]
result["output"]["temperature_range"] = "20-28°C"
return result
async def _execute_data_extraction(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute data extraction task."""
return await self._execute_document_analysis(task, workflow_results)
async def _execute_email_sending(self, task: WorkflowTask, workflow_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute email sending task using real Gmail agent."""
try:
to_email = task.input_data.get("to_email", "user@example.com")
subject = task.input_data.get("subject", "MCP Analysis Results")
template = task.input_data.get("template", "general")
# Get data from previous tasks
email_content = self._generate_email_content(template, workflow_results, task.dependencies)
# Use real Gmail agent if available
if self.mongodb_integration:
# Try to use real Gmail agent through MongoDB integration
try:
# Import and use real Gmail agent
from agents.communication.real_gmail_agent import RealGmailAgent
from agents.base_agent import MCPMessage
gmail_agent = RealGmailAgent()
# Create email message
email_message = MCPMessage(
id=f"workflow_email_{datetime.now().timestamp()}",
method="send_email",
params={
"to_email": to_email,
"subject": subject,
"content": email_content,
"template": template
},
timestamp=datetime.now()
)
# Send email through real agent
email_result = await gmail_agent.process_message(email_message)
self.logger.info(f"Real email sent to {to_email}: {subject}")
return email_result
except ImportError as e:
self.logger.warning(f"Real Gmail agent not available: {e}")
except Exception as e:
self.logger.error(f"Error using real Gmail agent: {e}")
# Fallback to simulation
email_result = {
"status": "success",
"agent": "gmail_agent",
"email_sent": True,
"to_email": to_email,
"subject": subject,
"content_preview": email_content[:200] + "..." if len(email_content) > 200 else email_content,
"timestamp": datetime.now().isoformat(),
"message": f"Email simulated to {to_email} (real Gmail agent not available)"
}
self.logger.info(f"Email simulated to {to_email}: {subject}")
return email_result
except Exception as e:
return {
"status": "error",
"error": str(e),
"agent": "gmail_agent"
}
def _generate_email_content(self, template: str, workflow_results: Dict[str, Any], dependencies: List[str]) -> str:
"""Generate email content based on template and workflow results."""
content = f"Subject: Analysis Results\n\n"
# Get analysis results from dependencies
for dep in dependencies or []:
if dep in workflow_results:
result = workflow_results[dep]
if result.get("status") == "success" and "output" in result:
output = result["output"]
if template == "weather_summary":
content += "Weather Analysis Summary:\n"
content += f"Forecast: {output.get('weather_forecast', 'Not available')}\n"
content += f"Temperature: {output.get('temperature_range', 'Not available')}\n"
content += f"Alerts: {', '.join(output.get('weather_alerts', []))}\n\n"
if "important_points" in output:
content += "Important Points:\n"
for i, point in enumerate(output["important_points"], 1):
content += f"{i}. {point}\n"
content += "\n"
if "summary" in output:
content += f"Summary:\n{output['summary']}\n\n"
if "analysis" in output:
content += f"Analysis:\n{output['analysis']}\n\n"
content += "Generated by MCP Workflow Engine\n"
content += f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return content
# Test function
async def test_workflow_engine():
"""Test the workflow engine."""
print("🔄 Testing MCP Workflow Engine")
print("=" * 50)
engine = MCPWorkflowEngine()
# Test cases
test_requests = [
"process weather.pdf and email summary to john@example.com",
"analyze the quarterly report and email important points to manager@company.com",
"get weather data from forecast.pdf and email to team@weather.com",
"extract key findings from research.pdf and send to researcher@university.edu"
]
for i, request in enumerate(test_requests, 1):
print(f"\n{i}. Testing: {request}")
# Parse request
workflow_plan = engine.parse_user_request(request)
if workflow_plan:
print(f" ✅ Workflow created: {workflow_plan.description}")
print(f" 📋 Tasks: {len(workflow_plan.tasks)}")
# Execute workflow (simulation)
result = await engine.execute_workflow(workflow_plan)
if result["status"] == "success":
print(f" ✅ Workflow executed successfully!")
print(f" ⏱️ Execution time: {result['execution_time']:.3f}s")
else:
print(f" ❌ Workflow failed: {result.get('error', 'unknown error')}")
else:
print(f" ❌ Could not parse request")
print(f"\n✅ Workflow engine testing completed!")
if __name__ == "__main__":
asyncio.run(test_workflow_engine())