"""Keyboard Maestro trigger integration layer.
This module handles the integration between advanced trigger specifications and
Keyboard Maestro's trigger system, generating appropriate XML and AppleScript
for trigger registration and management.
Security: All XML generation includes comprehensive escaping and validation.
Performance: Efficient trigger registration with minimal KM API calls.
Type Safety: Strong integration with trigger specification types.
"""
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from typing import Any
from src.core.either import Either
from src.core.errors import IntegrationError, SecurityError
from src.core.logging import get_logger
from src.core.triggers import TriggerSpec, TriggerType
from src.core.types import MacroId
logger = get_logger(__name__)
class KMTriggerIntegrator:
"""Integrates advanced triggers with Keyboard Maestro."""
def __init__(self):
self.supported_triggers = {
TriggerType.TIME_SCHEDULED,
TriggerType.TIME_RECURRING,
TriggerType.FILE_CREATED,
TriggerType.FILE_MODIFIED,
TriggerType.APP_LAUNCHED,
TriggerType.APP_QUIT,
TriggerType.USER_IDLE,
TriggerType.SYSTEM_STARTUP,
TriggerType.NETWORK_CONNECTED,
}
async def add_trigger_to_macro(
self,
macro_id: MacroId,
trigger_spec: TriggerSpec,
replace_existing: bool = False,
) -> Either[IntegrationError, dict[str, Any]]:
"""Add an advanced trigger to a Keyboard Maestro macro.
Args:
macro_id: Target macro identifier
trigger_spec: Trigger specification with type and configuration
replace_existing: Whether to replace existing triggers
Returns:
Either integration error or success details
"""
try:
logger.info(f"Adding trigger {trigger_spec.trigger_id} to macro {macro_id}")
# Validate trigger type support
if trigger_spec.trigger_type not in self.supported_triggers:
return Either.left(
IntegrationError(
"UNSUPPORTED_TRIGGER",
f"Trigger type {trigger_spec.trigger_type.value} is not supported",
),
)
# Generate trigger XML
xml_result = self._generate_trigger_xml(trigger_spec)
if xml_result.is_left():
return Either.left(
IntegrationError(
"XML_GENERATION_FAILED",
f"Failed to generate trigger XML: {xml_result.get_left().message}",
),
)
trigger_xml = xml_result.get_right()
# Generate AppleScript for trigger registration
applescript_result = self._generate_trigger_applescript(
macro_id,
trigger_xml,
replace_existing,
)
if applescript_result.is_left():
return Either.left(applescript_result.get_left())
applescript = applescript_result.get_right()
# Execute AppleScript (simulated for now)
execution_result = await self._execute_applescript(applescript)
if execution_result.is_left():
return Either.left(execution_result.get_left())
result = {
"trigger_id": trigger_spec.trigger_id,
"macro_id": str(macro_id),
"trigger_type": trigger_spec.trigger_type.value,
"xml_generated": True,
"applescript_executed": True,
"integration_time_ms": execution_result.get_right().get(
"execution_time_ms",
0,
),
"km_trigger_id": f"km_trigger_{trigger_spec.trigger_id[:8]}",
"created_at": datetime.now().isoformat(),
}
logger.info(
f"Successfully added trigger {trigger_spec.trigger_id} to macro {macro_id}",
)
return Either.right(result)
except Exception as e:
logger.error(f"Error adding trigger to macro {macro_id}: {e!s}")
return Either.left(
IntegrationError(
"TRIGGER_INTEGRATION_ERROR",
f"Failed to integrate trigger: {e!s}",
),
)
def _generate_trigger_xml(
self,
trigger_spec: TriggerSpec,
) -> Either[SecurityError, str]:
"""Generate Keyboard Maestro trigger XML."""
try:
root = ET.Element("trigger")
root.set("type", self._map_trigger_type(trigger_spec.trigger_type))
root.set("enabled", str(trigger_spec.enabled).lower())
# Add trigger-specific configuration
config_result = self._add_trigger_config(root, trigger_spec)
if config_result.is_left():
return config_result
# Add conditions if present
if trigger_spec.conditions:
conditions_elem = ET.SubElement(root, "conditions")
for condition in trigger_spec.conditions:
condition_elem = ET.SubElement(conditions_elem, "condition")
for key, value in condition.items():
condition_elem.set(
self._escape_xml_attribute(key),
self._escape_xml_value(str(value)),
)
# Add metadata
metadata_elem = ET.SubElement(root, "metadata")
metadata_elem.set("trigger_id", trigger_spec.trigger_id)
metadata_elem.set("priority", str(trigger_spec.priority))
metadata_elem.set("timeout", str(trigger_spec.timeout_seconds))
if trigger_spec.max_executions:
metadata_elem.set("max_executions", str(trigger_spec.max_executions))
# Convert to string with proper escaping
xml_string = ET.tostring(root, encoding="unicode")
return Either.right(xml_string)
except Exception as e:
return Either.left(
SecurityError(
"XML_GENERATION_ERROR",
f"Failed to generate secure XML: {e!s}",
),
)
def _map_trigger_type(self, trigger_type: TriggerType) -> str:
"""Map trigger type to KM trigger type."""
mapping = {
TriggerType.TIME_SCHEDULED: "scheduled",
TriggerType.TIME_RECURRING: "periodic",
TriggerType.FILE_CREATED: "file_created",
TriggerType.FILE_MODIFIED: "file_changed",
TriggerType.APP_LAUNCHED: "app_launched",
TriggerType.APP_QUIT: "app_quit",
TriggerType.USER_IDLE: "user_idle",
TriggerType.SYSTEM_STARTUP: "system_startup",
TriggerType.NETWORK_CONNECTED: "network_connected",
}
return mapping.get(trigger_type, "unknown")
def _add_trigger_config(
self,
root: ET.Element,
trigger_spec: TriggerSpec,
) -> Either[SecurityError, None]:
"""Add trigger-specific configuration to XML."""
try:
config = trigger_spec.config
if trigger_spec.trigger_type in [
TriggerType.TIME_SCHEDULED,
TriggerType.TIME_RECURRING,
]:
self._add_time_config(root, config)
elif trigger_spec.trigger_type in [
TriggerType.FILE_CREATED,
TriggerType.FILE_MODIFIED,
]:
self._add_file_config(root, config)
elif trigger_spec.trigger_type in [
TriggerType.APP_LAUNCHED,
TriggerType.APP_QUIT,
]:
self._add_app_config(root, config)
elif trigger_spec.trigger_type == TriggerType.USER_IDLE:
self._add_idle_config(root, config)
elif trigger_spec.trigger_type == TriggerType.NETWORK_CONNECTED:
self._add_network_config(root, config)
return Either.right(None)
except Exception as e:
return Either.left(
SecurityError(
"CONFIG_GENERATION_ERROR",
f"Failed to generate trigger configuration: {e!s}",
),
)
def _add_time_config(self, root: ET.Element, config: dict[str, Any]) -> None:
"""Add time trigger configuration."""
time_elem = ET.SubElement(root, "time_config")
if "schedule_time" in config:
schedule_time = config["schedule_time"]
if isinstance(schedule_time, datetime):
time_elem.set("schedule", schedule_time.isoformat())
time_elem.set("timezone", config.get("timezone", "local"))
if "recurring_interval" in config:
interval = config["recurring_interval"]
if isinstance(interval, timedelta):
time_elem.set("interval_seconds", str(int(interval.total_seconds())))
if "recurring_pattern" in config:
pattern = self._escape_xml_value(config["recurring_pattern"])
time_elem.set("cron_pattern", pattern)
def _add_file_config(self, root: ET.Element, config: dict[str, Any]) -> None:
"""Add file trigger configuration."""
file_elem = ET.SubElement(root, "file_config")
watch_path = self._escape_xml_value(config.get("watch_path", ""))
file_elem.set("path", watch_path)
file_elem.set("recursive", str(config.get("recursive", False)).lower())
if "file_pattern" in config:
pattern = self._escape_xml_value(config["file_pattern"])
file_elem.set("pattern", pattern)
file_elem.set("ignore_hidden", str(config.get("ignore_hidden", True)).lower())
file_elem.set("debounce", str(config.get("debounce_seconds", 1.0)))
def _add_app_config(self, root: ET.Element, config: dict[str, Any]) -> None:
"""Add application trigger configuration."""
app_elem = ET.SubElement(root, "app_config")
if "app_bundle_id" in config:
bundle_id = self._escape_xml_value(config["app_bundle_id"])
app_elem.set("bundle_id", bundle_id)
if "app_name" in config:
app_name = self._escape_xml_value(config["app_name"])
app_elem.set("name", app_name)
def _add_idle_config(self, root: ET.Element, config: dict[str, Any]) -> None:
"""Add idle trigger configuration."""
idle_elem = ET.SubElement(root, "idle_config")
threshold = config.get("idle_threshold_seconds", 300)
idle_elem.set("threshold", str(threshold))
def _add_network_config(self, root: ET.Element, config: dict[str, Any]) -> None:
"""Add network trigger configuration."""
network_elem = ET.SubElement(root, "network_config")
if "network_interface" in config:
interface = self._escape_xml_value(config["network_interface"])
network_elem.set("interface", interface)
def _escape_xml_attribute(self, text: str) -> str:
"""Escape XML attribute names."""
# Only allow alphanumeric and underscore in attribute names
return re.sub(r"[^a-zA-Z0-9_]", "_", text)
def _escape_xml_value(self, text: str) -> str:
"""Escape XML attribute values."""
if not isinstance(text, str):
text = str(text)
# Escape XML special characters
text = text.replace("&", "&")
text = text.replace("<", "<")
text = text.replace(">", ">")
text = text.replace('"', """)
text = text.replace("'", "'")
return text
def _generate_trigger_applescript(
self,
macro_id: MacroId,
trigger_xml: str,
replace_existing: bool,
) -> Either[IntegrationError, str]:
"""Generate AppleScript for trigger registration."""
try:
# Escape XML for AppleScript string
escaped_xml = trigger_xml.replace("\\", "\\\\").replace('"', '\\"')
# Escape macro ID
escaped_macro_id = str(macro_id).replace('"', '\\"')
applescript = f"""
tell application "Keyboard Maestro Engine"
try
set targetMacro to macro "{escaped_macro_id}"
{"-- Remove existing triggers" if replace_existing else "-- Keep existing triggers"}
{"delete every trigger of targetMacro" if replace_existing else ""}
-- Add new trigger
set newTrigger to make new trigger at end of triggers of targetMacro
set trigger XML of newTrigger to "{escaped_xml}"
return "SUCCESS: Trigger added to macro {escaped_macro_id}"
on error errorMessage
return "ERROR: " & errorMessage
end try
end tell
"""
return Either.right(applescript)
except Exception as e:
return Either.left(
IntegrationError(
"APPLESCRIPT_GENERATION_ERROR",
f"Failed to generate AppleScript: {e!s}",
),
)
async def _execute_applescript(
self,
_applescript: str,
) -> Either[IntegrationError, dict[str, Any]]:
"""Execute AppleScript for trigger integration."""
# Simulated execution for now - in production this would use osascript
try:
logger.info("Executing AppleScript for trigger integration")
# Simulate execution time
import asyncio
await asyncio.sleep(0.1) # Simulate KM API call
# Simulate successful execution
result = {
"status": "success",
"execution_time_ms": 100,
"applescript_executed": True,
"km_response": "SUCCESS: Trigger added to macro",
}
return Either.right(result)
except Exception as e:
return Either.left(
IntegrationError(
"APPLESCRIPT_EXECUTION_ERROR",
f"Failed to execute AppleScript: {e!s}",
),
)
def validate_trigger_compatibility(
self,
trigger_spec: TriggerSpec,
) -> Either[IntegrationError, None]:
"""Validate that trigger is compatible with Keyboard Maestro."""
if trigger_spec.trigger_type not in self.supported_triggers:
return Either.left(
IntegrationError(
"UNSUPPORTED_TRIGGER_TYPE",
f"Trigger type {trigger_spec.trigger_type.value} is not supported by Keyboard Maestro",
),
)
# Validate configuration completeness
config = trigger_spec.config
if trigger_spec.trigger_type in [
TriggerType.TIME_SCHEDULED,
TriggerType.TIME_RECURRING,
]:
if not any(
key in config
for key in ["schedule_time", "recurring_interval", "recurring_pattern"]
):
return Either.left(
IntegrationError(
"INCOMPLETE_TIME_CONFIG",
"Time trigger requires schedule_time, recurring_interval, or recurring_pattern",
),
)
elif trigger_spec.trigger_type in [
TriggerType.FILE_CREATED,
TriggerType.FILE_MODIFIED,
]:
if "watch_path" not in config:
return Either.left(
IntegrationError(
"INCOMPLETE_FILE_CONFIG",
"File trigger requires watch_path",
),
)
elif trigger_spec.trigger_type in [
TriggerType.APP_LAUNCHED,
TriggerType.APP_QUIT,
] and not any(key in config for key in ["app_bundle_id", "app_name"]):
return Either.left(
IntegrationError(
"INCOMPLETE_APP_CONFIG",
"App trigger requires app_bundle_id or app_name",
),
)
return Either.right(None)
# Helper functions for common KM trigger patterns
def create_km_scheduled_trigger(_macro_id: MacroId, when: datetime) -> TriggerSpec:
"""Create a KM-compatible scheduled trigger."""
from src.core.triggers import TriggerBuilder
result = TriggerBuilder().scheduled_at(when).with_timeout(30).build()
if result.is_right():
return result.get_right()
raise ValueError(f"Failed to create trigger: {result.get_left().constraint}")
def create_km_file_watcher(
_macro_id: MacroId,
watch_path: str,
pattern: str | None = None,
) -> TriggerSpec:
"""Create a KM-compatible file watcher trigger."""
from src.core.triggers import TriggerBuilder
result = (
TriggerBuilder()
.when_file_modified(watch_path, pattern)
.with_timeout(60)
.build()
)
if result.is_right():
return result.get_right()
raise ValueError(f"Failed to create trigger: {result.get_left().constraint}")
def create_km_app_trigger(
_macro_id: MacroId,
app_id: str,
on_launch: bool = True,
) -> TriggerSpec:
"""Create a KM-compatible application trigger."""
from src.core.triggers import TriggerBuilder
builder = TriggerBuilder()
if on_launch:
builder = builder.when_app_launches(app_id)
else:
builder = builder.when_app_quits(app_id)
result = builder.with_timeout(30).build()
if result.is_right():
return result.get_right()
raise ValueError(f"Failed to create trigger: {result.get_left().constraint}")