"""Natural language processing for intelligent workflow creation and analysis.
This module provides comprehensive NLP capabilities for workflow intelligence including:
- Intent recognition from natural language descriptions
- Workflow component extraction and parsing
- Action and condition suggestion from descriptions
- Template matching based on semantic analysis
Security: Input sanitization and validation for all NLP operations.
Performance: <2s processing for typical workflow descriptions.
Type Safety: Complete type system with contracts and validation.
"""
import logging
import re
from dataclasses import dataclass
from enum import Enum
from typing import Any
from ..core.contracts import require
from ..core.either import Either
from ..orchestration.ecosystem_architecture import OrchestrationError
class IntentType(Enum):
"""Types of workflow intents that can be recognized."""
AUTOMATION = "automation" # Automate a process
INTEGRATION = "integration" # Connect systems/tools
NOTIFICATION = "notification" # Send alerts/messages
DATA_PROCESSING = "data_processing" # Process/transform data
SCHEDULING = "scheduling" # Time-based operations
CONDITIONAL = "conditional" # If/then logic
MONITORING = "monitoring" # Watch/track something
COMMUNICATION = "communication" # Email/SMS/chat operations
FILE_OPERATION = "file_operation" # File/folder operations
APPLICATION_CONTROL = "application_control" # App launch/control
class ActionIntent(Enum):
"""Specific action intents within workflows."""
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
SEND = "send"
RECEIVE = "receive"
PROCESS = "process"
TRANSFORM = "transform"
VALIDATE = "validate"
MONITOR = "monitor"
TRIGGER = "trigger"
SCHEDULE = "schedule"
NOTIFY = "notify"
CONTROL = "control"
@dataclass
class IntentRecognition:
"""Result of intent recognition from natural language."""
primary_intent: IntentType
secondary_intents: list[IntentType]
action_intents: list[ActionIntent]
confidence_score: float # 0.0 to 1.0
entities: dict[str, list[str]] # Extracted entities by type
keywords: list[str]
suggested_tools: list[str] # Tool IDs that match the intent
complexity_level: str # simple, intermediate, advanced
estimated_steps: int
@require(lambda self: 0.0 <= self.confidence_score <= 1.0)
@require(lambda self: self.estimated_steps > 0)
def __post_init__(self):
pass
@dataclass
class WorkflowGeneration:
"""Generated workflow from natural language description."""
workflow_id: str
name: str
description: str
intent_recognition: IntentRecognition
generated_steps: list[dict[str, Any]]
conditions: list[dict[str, Any]]
error_handling: list[dict[str, Any]]
estimated_duration: float # minutes
required_tools: list[str]
optional_enhancements: list[str]
quality_score: float # 0.0 to 1.0
alternative_approaches: list[str]
@require(lambda self: 0.0 <= self.quality_score <= 1.0)
@require(lambda self: self.estimated_duration > 0)
def __post_init__(self):
pass
@dataclass
class TemplateMatch:
"""Template matching result for workflow generation."""
template_id: str
template_name: str
similarity_score: float # 0.0 to 1.0
matching_elements: list[str]
required_adaptations: list[str]
confidence_level: str # low, medium, high
@require(lambda self: 0.0 <= self.similarity_score <= 1.0)
def __post_init__(self):
pass
class NLPProcessor:
"""Natural language processor for workflow intelligence."""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Intent recognition patterns
self.intent_patterns = self._initialize_intent_patterns()
self.action_patterns = self._initialize_action_patterns()
self.entity_patterns = self._initialize_entity_patterns()
# Tool mapping for intent recognition
self.tool_intent_mapping = self._initialize_tool_mapping()
# Template library for matching
self.workflow_templates = self._initialize_templates()
def _initialize_intent_patterns(self) -> dict[IntentType, list[str]]:
"""Initialize regex patterns for intent recognition."""
return {
IntentType.AUTOMATION: [
r"\b(automat[ie]|when.*then|every.*do|trigger|run automatically)\b",
r"\b(schedule|repeat|periodic|recurring|background)\b",
r"\b(watch|monitor.*and|if.*happens)\b",
],
IntentType.INTEGRATION: [
r"\b(connect|integrate|sync|link|bridge)\b",
r"\b(between.*and|from.*to|with|using)\b",
r"\b(api|webhook|service|system)\b",
],
IntentType.NOTIFICATION: [
r"\b(notify|alert|inform|tell|send.*message)\b",
r"\b(email|sms|slack|teams|notification)\b",
r"\b(remind|warning|status|update)\b",
],
IntentType.DATA_PROCESSING: [
r"\b(process|transform|convert|parse|extract)\b",
r"\b(data|information|content|file|document)\b",
r"\b(analyze|calculate|compute|format)\b",
],
IntentType.SCHEDULING: [
r"\b(schedule|at.*time|every.*day|daily|weekly|monthly)\b",
r"\b(timer|deadline|appointment|calendar)\b",
r"\b(after.*minutes|in.*hours|delay)\b",
],
IntentType.CONDITIONAL: [
r"\b(if|when|unless|provided|condition|check)\b",
r"\b(then|else|otherwise|depending|based on)\b",
r"\b(equals|contains|greater|less|matches)\b",
],
IntentType.MONITORING: [
r"\b(monitor|watch|track|observe|check)\b",
r"\b(status|health|performance|availability)\b",
r"\b(changes|updates|errors|issues)\b",
],
IntentType.COMMUNICATION: [
r"\b(send|email|message|chat|call|communicate)\b",
r"\b(reply|respond|forward|broadcast)\b",
r"\b(mail|sms|phone|video|meeting)\b",
],
IntentType.FILE_OPERATION: [
r"\b(file|folder|directory|document|download)\b",
r"\b(copy|move|delete|rename|create|upload)\b",
r"\b(backup|archive|organize|sort)\b",
],
IntentType.APPLICATION_CONTROL: [
r"\b(open|launch|start|close|quit|kill)\b",
r"\b(application|app|program|software)\b",
r"\b(window|focus|activate|minimize)\b",
],
}
def _initialize_action_patterns(self) -> dict[ActionIntent, list[str]]:
"""Initialize action intent patterns."""
return {
ActionIntent.CREATE: [r"\b(create|make|generate|build|add|new)\b"],
ActionIntent.READ: [r"\b(read|get|fetch|retrieve|find|search)\b"],
ActionIntent.UPDATE: [r"\b(update|modify|change|edit|alter)\b"],
ActionIntent.DELETE: [r"\b(delete|remove|destroy|purge|clean)\b"],
ActionIntent.SEND: [r"\b(send|transmit|deliver|post|publish)\b"],
ActionIntent.RECEIVE: [r"\b(receive|get|accept|collect|gather)\b"],
ActionIntent.PROCESS: [r"\b(process|handle|execute|run|perform)\b"],
ActionIntent.TRANSFORM: [r"\b(transform|convert|translate|format)\b"],
ActionIntent.VALIDATE: [r"\b(validate|verify|check|confirm|test)\b"],
ActionIntent.MONITOR: [r"\b(monitor|watch|track|observe)\b"],
ActionIntent.TRIGGER: [r"\b(trigger|activate|start|initiate)\b"],
ActionIntent.SCHEDULE: [r"\b(schedule|plan|arrange|time)\b"],
ActionIntent.NOTIFY: [r"\b(notify|alert|inform|message)\b"],
ActionIntent.CONTROL: [r"\b(control|manage|operate|command)\b"],
}
def _initialize_entity_patterns(self) -> dict[str, str]:
"""Initialize entity extraction patterns."""
return {
"time": r"\b(\d{1,2}:\d{2}|\d{1,2}\s*(am|pm)|morning|afternoon|evening|night)\b",
"duration": r"\b(\d+\s*(minutes?|hours?|days?|weeks?|months?))\b",
"file_types": r"\b(\w+\.(pdf|doc|txt|csv|json|xml|xlsx|png|jpg|mp4))\b",
"applications": r"\b(chrome|safari|mail|finder|terminal|slack|teams|zoom|outlook)\b",
"frequency": r"\b(daily|weekly|monthly|yearly|every\s+\d+\s+\w+)\b",
"conditions": r"\b(if|when|unless|provided|while|until)\b",
"operators": r"\b(equals?|contains?|greater|less|matches?|includes?)\b",
"numbers": r"\b(\d+(?:\.\d+)?)\b",
"urls": r"\b(https?://[^\s]+)\b",
"emails": r"\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b",
}
def _initialize_tool_mapping(self) -> dict[IntentType, list[str]]:
"""Initialize mapping of intents to relevant tools."""
return {
IntentType.AUTOMATION: [
"km_create_macro",
"km_create_trigger_advanced",
"km_control_flow",
],
IntentType.INTEGRATION: [
"km_web_automation",
"km_remote_triggers",
"km_enterprise_sync",
],
IntentType.NOTIFICATION: ["km_notifications", "km_email_sms_integration"],
IntentType.DATA_PROCESSING: [
"km_dictionary_manager",
"km_token_processor",
"km_calculator",
],
IntentType.SCHEDULING: ["km_create_trigger_advanced", "km_create_macro"],
IntentType.CONDITIONAL: ["km_add_condition", "km_control_flow"],
IntentType.MONITORING: ["km_analytics_engine", "km_ecosystem_orchestrator"],
IntentType.COMMUNICATION: ["km_email_sms_integration", "km_web_automation"],
IntentType.FILE_OPERATION: ["km_file_operations", "km_clipboard_manager"],
IntentType.APPLICATION_CONTROL: ["km_app_control", "km_window_manager"],
}
def _initialize_templates(self) -> dict[str, dict[str, Any]]:
"""Initialize workflow templates for matching."""
return {
"email_automation": {
"name": "Email Automation",
"description": "Send emails based on conditions or schedules",
"intents": [IntentType.COMMUNICATION, IntentType.AUTOMATION],
"tools": ["km_email_sms_integration", "km_add_condition"],
"complexity": "intermediate",
"steps": [
{"action": "check_condition", "tool": "km_add_condition"},
{"action": "send_email", "tool": "km_email_sms_integration"},
],
},
"file_processing": {
"name": "File Processing Workflow",
"description": "Process files when they appear in a folder",
"intents": [IntentType.AUTOMATION, IntentType.FILE_OPERATION],
"tools": ["km_file_operations", "km_create_trigger_advanced"],
"complexity": "intermediate",
"steps": [
{"action": "watch_folder", "tool": "km_create_trigger_advanced"},
{"action": "process_file", "tool": "km_file_operations"},
],
},
"data_sync": {
"name": "Data Synchronization",
"description": "Sync data between systems",
"intents": [IntentType.INTEGRATION, IntentType.DATA_PROCESSING],
"tools": ["km_web_automation", "km_dictionary_manager"],
"complexity": "advanced",
"steps": [
{"action": "fetch_data", "tool": "km_web_automation"},
{"action": "transform_data", "tool": "km_dictionary_manager"},
{"action": "upload_data", "tool": "km_web_automation"},
],
},
}
@require(lambda description: len(description.strip()) >= 10)
async def recognize_intent(
self,
description: str,
) -> Either[OrchestrationError, IntentRecognition]:
"""Recognize intent from natural language workflow description."""
try:
description_lower = description.lower()
# Score each intent type
intent_scores = {}
for intent_type, patterns in self.intent_patterns.items():
score = 0.0
for pattern in patterns:
matches = re.findall(pattern, description_lower, re.IGNORECASE)
score += len(matches) * 0.2 # Each match adds 0.2 to score
if score > 0:
intent_scores[intent_type] = min(1.0, score)
if not intent_scores:
return Either.left(
OrchestrationError.workflow_execution_failed(
"No clear intent recognized in description",
),
)
# Determine primary and secondary intents
sorted_intents = sorted(
intent_scores.items(),
key=lambda x: x[1],
reverse=True,
)
primary_intent = sorted_intents[0][0]
secondary_intents = [
intent for intent, score in sorted_intents[1:3] if score >= 0.3
]
# Recognize action intents
action_intents = []
for action_intent, patterns in self.action_patterns.items():
for pattern in patterns:
if re.search(pattern, description_lower, re.IGNORECASE):
action_intents.append(action_intent)
break
# Extract entities
entities = {}
for entity_type, pattern in self.entity_patterns.items():
matches = re.findall(pattern, description, re.IGNORECASE)
if matches:
entities[entity_type] = list(set(matches))
# Extract keywords (simple approach)
keywords = self._extract_keywords(description)
# Suggest relevant tools
suggested_tools = []
for intent in [primary_intent] + secondary_intents:
if intent in self.tool_intent_mapping:
suggested_tools.extend(self.tool_intent_mapping[intent])
suggested_tools = list(set(suggested_tools))
# Determine complexity and estimate steps
complexity_level = self._assess_complexity(
description,
action_intents,
entities,
)
estimated_steps = self._estimate_steps(
action_intents,
entities,
complexity_level,
)
# Calculate overall confidence
confidence_score = (
sorted_intents[0][1] * 0.7
+ (len(action_intents) * 0.1)
+ (len(entities) * 0.05)
)
confidence_score = min(1.0, confidence_score)
recognition = IntentRecognition(
primary_intent=primary_intent,
secondary_intents=secondary_intents,
action_intents=action_intents,
confidence_score=confidence_score,
entities=entities,
keywords=keywords,
suggested_tools=suggested_tools,
complexity_level=complexity_level,
estimated_steps=estimated_steps,
)
return Either.right(recognition)
except Exception as e:
return Either.left(
OrchestrationError.workflow_execution_failed(
f"Intent recognition failed: {e}",
),
)
def _extract_keywords(self, description: str) -> list[str]:
"""Extract important keywords from description."""
# Remove common words
stop_words = {
"a",
"an",
"and",
"are",
"as",
"at",
"be",
"by",
"for",
"from",
"has",
"he",
"in",
"is",
"it",
"its",
"of",
"on",
"that",
"the",
"to",
"was",
"were",
"will",
"with",
"would",
"i",
"me",
"my",
"we",
"our",
"you",
"your",
"they",
"them",
"their",
"this",
"these",
"those",
"when",
"where",
"how",
"what",
"why",
"which",
"who",
"if",
"then",
"else",
"do",
"does",
"did",
"have",
"had",
"should",
"could",
"can",
"may",
"might",
"must",
"need",
"want",
"like",
"get",
"go",
}
# Extract words and filter
words = re.findall(r"\b[a-zA-Z]{3,}\b", description.lower())
keywords = [word for word in words if word not in stop_words]
# Count frequency and return top keywords
word_counts = {}
for word in keywords:
word_counts[word] = word_counts.get(word, 0) + 1
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
return [word for word, count in sorted_words[:10]]
def _assess_complexity(
self,
description: str,
action_intents: list[ActionIntent],
entities: dict[str, list[str]],
) -> str:
"""Assess workflow complexity based on description elements."""
complexity_indicators = 0
# Count action intents
if len(action_intents) > 5:
complexity_indicators += 2
elif len(action_intents) > 2:
complexity_indicators += 1
# Count conditional keywords
conditional_words = [
"if",
"when",
"unless",
"while",
"until",
"depending",
"based on",
]
for word in conditional_words:
if word in description.lower():
complexity_indicators += 1
# Count integration indicators
integration_words = [
"api",
"webhook",
"database",
"sync",
"integrate",
"connect",
]
for word in integration_words:
if word in description.lower():
complexity_indicators += 1
# Count entities
if len(entities) > 5:
complexity_indicators += 1
# Determine complexity level
if complexity_indicators <= 2:
return "simple"
if complexity_indicators <= 5:
return "intermediate"
return "advanced"
def _estimate_steps(
self,
action_intents: list[ActionIntent],
entities: dict[str, list[str]],
complexity: str,
) -> int:
"""Estimate number of workflow steps."""
base_steps = len(action_intents) or 1
# Add steps for complexity
if complexity == "intermediate":
base_steps += 2
elif complexity == "advanced":
base_steps += 4
# Add steps for conditions
if "conditions" in entities:
base_steps += len(entities["conditions"])
# Add steps for error handling
base_steps += 1 # Always include basic error handling
return max(3, min(15, base_steps)) # Reasonable range
# Global NLP processor instance
_global_nlp_processor: NLPProcessor | None = None
def get_nlp_processor() -> NLPProcessor:
"""Get or create the global NLP processor instance."""
global _global_nlp_processor
if _global_nlp_processor is None:
_global_nlp_processor = NLPProcessor()
return _global_nlp_processor