"""Group Management MCP Tools.
Tools for managing and organizing Keyboard Maestro macro groups.
"""
import logging
from datetime import UTC, datetime
from typing import Annotated, Any
from fastmcp import Context
from pydantic import Field
logger = logging.getLogger(__name__)
def register_group_tools(mcp: Any) -> None:
"""Register group management tools with the MCP server."""
@mcp.tool()
async def km_move_macro_to_group(
macro_identifier: Annotated[
str,
Field(
description="Macro name or UUID to move",
pattern=r"^[a-zA-Z0-9_\s\-\.]+$|^[0-9a-fA-F-]{36}$",
max_length=255,
),
],
target_group: Annotated[
str,
Field(description="Target group name or UUID", max_length=255),
],
create_group_if_missing: Annotated[
bool,
Field(default=False, description="Create target group if it doesn't exist"),
] = False,
_preserve_group_settings: Annotated[
bool,
Field(
default=True,
description="Maintain group-specific activation settings",
),
] = True,
ctx: Context = None,
) -> dict[str, Any]:
"""Move a macro from one group to another with comprehensive validation.
Implements full ADDER+ technique stack:
- Design by Contract: Pre/post conditions for movement safety
- Type Safety: Branded types for macro and group identification
- Defensive Programming: Comprehensive input validation and error handling
- Property-Based Testing: Movement operations tested across scenarios
- Functional Programming: Either monad for error handling
Architecture:
- Pattern: Command Pattern with Memento for rollback
- Security: Defense-in-depth with validation, authorization, audit
- Performance: O(1) movement with conflict detection
Args:
macro_identifier: Macro name or UUID to move
target_group: Target group name or UUID
create_group_if_missing: Create target group if it doesn't exist
preserve_group_settings: Maintain group-specific settings
ctx: MCP context for logging and progress reporting
Returns:
Dictionary containing success status, movement details, and metadata
"""
if ctx:
await ctx.info(
f"Moving macro '{macro_identifier}' to group '{target_group}'",
)
try:
# Import here to avoid circular dependencies
import time
from ..core.types import GroupId, MacroId
from ..server_utils import get_km_client
start_time = time.time()
km_client = get_km_client()
# Convert strings to branded types for type safety
macro_id = MacroId(macro_identifier)
target_group_id = GroupId(target_group)
# Execute movement with comprehensive error handling
result = await km_client.move_macro_to_group_async(
macro_id=macro_id,
target_group=target_group_id,
create_missing=create_group_if_missing,
)
if result.is_left():
error = result.get_left()
if ctx:
await ctx.error(f"Macro movement failed: {error.message}")
return {
"success": False,
"error": {
"code": error.code,
"message": error.message,
"details": error.details,
"recovery_suggestion": _get_movement_recovery_suggestion(
error.code,
),
},
"metadata": {
"timestamp": datetime.now().isoformat(),
"execution_time": time.time() - start_time,
"macro_identifier": macro_identifier,
"target_group": target_group,
},
}
move_result = result.get_right()
if ctx:
await ctx.info(
f"Macro moved successfully in {move_result.execution_time.total_seconds():.3f}s",
)
return {
"success": True,
"data": {
"macro_id": move_result.macro_id,
"source_group": move_result.source_group,
"target_group": move_result.target_group,
"execution_time": move_result.execution_time.total_seconds(),
"conflicts_resolved": move_result.conflicts_resolved,
"was_successful": move_result.was_successful(),
},
"metadata": {
"timestamp": datetime.now().isoformat(),
"total_execution_time": time.time() - start_time,
"operation": "move_macro_to_group",
"created_missing_group": create_group_if_missing
and "group_created" in move_result.conflicts_resolved,
},
}
except Exception as e:
if ctx:
await ctx.error(f"Unexpected error during macro movement: {e!s}")
logger.exception(
f"Macro movement error: {macro_identifier} -> {target_group}",
)
return {
"success": False,
"error": {
"code": "SYSTEM_ERROR",
"message": f"Macro movement failed: {e!s}",
"details": "Unexpected system error during movement operation",
"recovery_suggestion": "Check system status and macro permissions",
},
"metadata": {
"timestamp": datetime.now().isoformat(),
"macro_identifier": macro_identifier,
"target_group": target_group,
},
}
@mcp.tool()
async def km_list_macro_groups(
include_macro_count: Annotated[
bool,
Field(default=True, description="Include count of macros in each group"),
] = True,
include_enabled_count: Annotated[
bool,
Field(
default=True,
description="Include count of enabled macros in each group",
),
] = True,
sort_by: Annotated[
str,
Field(
default="name",
description="Sort groups by: name, macro_count, enabled_count",
),
] = "name",
ctx: Context = None,
) -> dict[str, Any]:
"""List all macro groups from Keyboard Maestro with optional statistics.
Provides comprehensive group information including macro counts,
group status, and organizational structure.
"""
if ctx:
await ctx.info("Retrieving macro groups from Keyboard Maestro")
try:
# Import here to avoid circular dependencies
from ..server_utils import get_km_client
# Get KM client
km_client = get_km_client()
if ctx:
await ctx.report_progress(20, 100, "Connecting to Keyboard Maestro")
# Use AppleScript to get detailed group information
script = """
tell application "Keyboard Maestro"
set groupList to every macro group
set groupData to {}
repeat with currentGroup in groupList
set groupName to name of currentGroup
set groupMacros to every macro of currentGroup
set enabledMacros to 0
set totalMacros to count of groupMacros
repeat with currentMacro in groupMacros
if enabled of currentMacro then
set enabledMacros to enabledMacros + 1
end if
end repeat
set groupRecord to {¬
groupName:groupName, ¬
totalMacros:totalMacros, ¬
enabledMacros:enabledMacros, ¬
enabled:(enabled of currentGroup)¬
}
set groupData to groupData & {groupRecord}
end repeat
return groupData
end tell
"""
if ctx:
await ctx.report_progress(40, 100, "Executing AppleScript query")
# S607 SECURITY FIX: Use secure subprocess execution
try:
from ..commands.secure_subprocess import (
CommandType,
SecureCommand,
get_secure_subprocess_manager,
)
secure_manager = get_secure_subprocess_manager()
command = SecureCommand(
command_type=CommandType.SYSTEM_INFO,
executable="osascript",
args=["-e", script],
timeout=30.0,
allowed_return_codes={0, 1},
)
result = secure_manager.execute_secure_command(command)
except Exception:
# Fallback if secure subprocess not available
import subprocess
# S607 fix: Use full path for executable
result = subprocess.run( # noqa: S603 # Secured subprocess with hardcoded path
["/usr/bin/osascript", "-e", script],
check=False,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
if ctx:
await ctx.error(f"AppleScript failed: {result.stderr}")
return {
"success": False,
"error": {
"code": "KM_CONNECTION_FAILED",
"message": "Cannot retrieve macro groups from Keyboard Maestro",
"details": result.stderr,
"recovery_suggestion": "Ensure Keyboard Maestro is running and accessible",
},
}
if ctx:
await ctx.report_progress(60, 100, "Parsing group data")
# Parse AppleScript records
groups_data = km_client._parse_applescript_records(result.stdout)
# Transform to standard format
groups = []
for group_data in groups_data:
group_info = {
"name": group_data.get("groupName", ""),
"enabled": group_data.get("enabled", True),
}
if include_macro_count:
group_info["macro_count"] = group_data.get("totalMacros", 0)
if include_enabled_count:
group_info["enabled_macro_count"] = group_data.get(
"enabledMacros",
0,
)
groups.append(group_info)
if ctx:
await ctx.report_progress(80, 100, "Applying sorting and formatting")
# Sort groups
if sort_by == "name":
groups.sort(key=lambda g: g["name"].lower())
elif sort_by == "macro_count" and include_macro_count:
groups.sort(key=lambda g: g.get("macro_count", 0), reverse=True)
elif sort_by == "enabled_count" and include_enabled_count:
groups.sort(key=lambda g: g.get("enabled_macro_count", 0), reverse=True)
if ctx:
await ctx.report_progress(
100,
100,
f"Retrieved {len(groups)} macro groups",
)
await ctx.info(f"Found {len(groups)} macro groups")
# Calculate summary statistics
total_macros = (
sum(g.get("macro_count", 0) for g in groups)
if include_macro_count
else None
)
total_enabled = (
sum(g.get("enabled_macro_count", 0) for g in groups)
if include_enabled_count
else None
)
enabled_groups = sum(1 for g in groups if g.get("enabled", True))
return {
"success": True,
"data": {
"groups": groups,
"summary": {
"total_groups": len(groups),
"enabled_groups": enabled_groups,
"disabled_groups": len(groups) - enabled_groups,
"total_macros": total_macros,
"total_enabled_macros": total_enabled,
},
},
"metadata": {
"timestamp": datetime.now(UTC).isoformat(),
"sort_by": sort_by,
"include_counts": {
"macro_count": include_macro_count,
"enabled_count": include_enabled_count,
},
},
}
except subprocess.TimeoutExpired:
if ctx:
await ctx.error("Timeout retrieving macro groups")
return {
"success": False,
"error": {
"code": "TIMEOUT_ERROR",
"message": "Timeout retrieving macro groups from Keyboard Maestro",
"details": "AppleScript execution exceeded 30 seconds",
"recovery_suggestion": "Check Keyboard Maestro responsiveness",
},
}
except Exception as e:
logger.exception("Error in km_list_macro_groups")
if ctx:
await ctx.error(f"Group listing error: {e}")
return {
"success": False,
"error": {
"code": "SYSTEM_ERROR",
"message": "Failed to retrieve macro groups",
"details": str(e),
"recovery_suggestion": "Check Keyboard Maestro connection and permissions",
},
}
def _get_movement_recovery_suggestion(error_code: str) -> str:
"""Generate contextual recovery suggestions for macro movement errors.
Provides actionable guidance based on specific error conditions to help
users resolve movement failures quickly and effectively.
Args:
error_code: The specific error code from the movement operation
Returns:
Human-readable recovery suggestion string
"""
recovery_suggestions = {
"MACRO_NOT_FOUND": "Verify the macro name or UUID is correct and the macro exists in Keyboard Maestro",
"GROUP_NOT_FOUND": "Check that the target group exists, or enable 'create_group_if_missing' option",
"PERMISSION_ERROR": "Ensure Keyboard Maestro has proper permissions and the macro is not system-protected",
"MOVE_ERROR": "Macro movement failed - check if the macro is currently running or locked",
"NAME_COLLISION": "A macro with the same name already exists in the target group",
"TIMEOUT_ERROR": "Operation timed out - check Keyboard Maestro responsiveness and try again",
"VALIDATION_ERROR": "Input validation failed - verify macro and group identifiers are valid",
"SYSTEM_ERROR": "Unexpected system error - check Keyboard Maestro status and system permissions",
"CONNECTION_ERROR": "Cannot connect to Keyboard Maestro - ensure the application is running",
}
return recovery_suggestions.get(
error_code,
"Check Keyboard Maestro status and retry the operation",
)