Skip to main content
Glama
controller_job_management.py8.86 kB
#!/usr/bin/env python3 """ AAP Controller Job & Execution Management Tool """ from typing import Any, Dict, Optional, Union from fastmcp import FastMCP from pydantic import Field from connectors.aap_connector import get_aap_connector def register_job_tools(mcp: FastMCP): """Register job execution management tools with the MCP server""" @mcp.tool() def job_execution_management( action: str = Field(description="Action: list_jobs, launch, cancel, relaunch, get_status, get_logs, get_events, list_adhoc, launch_adhoc, cancel_adhoc, get_adhoc_status, get_adhoc_logs, get_adhoc_stdout, list_execution_envs, create_execution_env, update_execution_env, delete_execution_env, list_unified_jobs, get_stdout, slice_job, job_host_summaries"), job_template_id: Optional[Union[int, float]] = Field(None, description="Job template ID for launching jobs"), job_id: Optional[Union[int, float]] = Field(None, description="Job ID for operations on specific jobs"), adhoc_id: Optional[Union[int, float]] = Field(None, description="Ad-hoc command ID"), execution_env_id: Optional[Union[int, float]] = Field(None, description="Execution environment ID"), inventory_id: Optional[Union[int, float]] = Field(None, description="Inventory ID for ad-hoc commands"), module_name: Optional[str] = Field(None, description="Ansible module name for ad-hoc commands"), module_args: Optional[str] = Field(None, description="Module arguments for ad-hoc commands"), credential_id: Optional[Union[int, float]] = Field(None, description="Credential ID for ad-hoc commands"), extra_vars: Optional[Dict[str, Any]] = Field(None, description="Extra variables"), limit: Optional[str] = Field(None, description="Limit hosts for execution"), execution_env_data: Optional[Dict[str, Any]] = Field(None, description="Execution environment data"), filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing") ) -> Dict[str, Any]: """ Enhanced job and execution management tool. Handles jobs, ad-hoc commands, execution environments, and job events. """ try: # Job Operations if action == "list_jobs": params = filters or {} return get_aap_connector().get("jobs/", params) elif action == "launch": if not job_template_id: return {"error": "job_template_id is required for launching jobs"} launch_data = {} if extra_vars: launch_data["extra_vars"] = extra_vars if limit: launch_data["limit"] = limit return get_aap_connector().post(f"job_templates/{job_template_id}/launch/", launch_data) elif action == "cancel": if not job_id: return {"error": "job_id is required for canceling jobs"} return get_aap_connector().post(f"jobs/{job_id}/cancel/") elif action == "relaunch": if not job_id: return {"error": "job_id is required for relaunching jobs"} return get_aap_connector().post(f"jobs/{job_id}/relaunch/") elif action == "get_status": if not job_id: return {"error": "job_id is required for getting job status"} return get_aap_connector().get(f"jobs/{job_id}/") elif action == "get_logs": if not job_id: return {"error": "job_id is required for getting job logs"} return get_aap_connector().get_stdout(f"jobs/{job_id}/stdout/") elif action == "get_events": if not job_id: return {"error": "job_id is required for getting job events"} params = filters or {} return get_aap_connector().get(f"jobs/{job_id}/job_events/", params) elif action == "job_host_summaries": if not job_id: return {"error": "job_id is required for getting job host summaries"} params = filters or {} return get_aap_connector().get(f"jobs/{job_id}/job_host_summaries/", params) elif action == "slice_job": if not job_id: return {"error": "job_id is required for slicing jobs"} return get_aap_connector().get(f"jobs/{job_id}/slice_jobs/") # Ad-hoc Command Operations elif action == "list_adhoc": params = filters or {} return get_aap_connector().get("ad_hoc_commands/", params) elif action == "launch_adhoc": if not inventory_id or not module_name: return {"error": "inventory_id and module_name are required for ad-hoc commands"} adhoc_data = { "inventory": int(inventory_id), "module_name": module_name, "module_args": module_args or "", } if credential_id: adhoc_data["credential"] = int(credential_id) if limit: adhoc_data["limit"] = limit if extra_vars: adhoc_data["extra_vars"] = extra_vars return get_aap_connector().post("ad_hoc_commands/", adhoc_data) elif action == "cancel_adhoc": if not adhoc_id: return {"error": "adhoc_id is required for canceling ad-hoc commands"} return get_aap_connector().post(f"ad_hoc_commands/{adhoc_id}/cancel/") elif action == "get_adhoc_status": if not adhoc_id: return {"error": "adhoc_id is required for getting ad-hoc status"} return get_aap_connector().get(f"ad_hoc_commands/{adhoc_id}/") elif action == "get_adhoc_logs": if not adhoc_id: return {"error": "adhoc_id is required for getting ad-hoc logs"} return get_aap_connector().get_stdout(f"ad_hoc_commands/{adhoc_id}/stdout/") elif action == "get_adhoc_stdout": if not adhoc_id: return {"error": "adhoc_id is required for getting ad-hoc stdout"} return get_aap_connector().get_stdout(f"ad_hoc_commands/{adhoc_id}/stdout/") elif action == "relaunch_adhoc": if not adhoc_id: return {"error": "adhoc_id is required for relaunching ad-hoc commands"} return get_aap_connector().post(f"ad_hoc_commands/{adhoc_id}/relaunch/") # Execution Environment Operations elif action == "list_execution_envs": params = filters or {} return get_aap_connector().get("execution_environments/", params) elif action == "create_execution_env": if not execution_env_data: return {"error": "execution_env_data is required for creating execution environments"} return get_aap_connector().post("execution_environments/", execution_env_data) elif action == "update_execution_env": if not execution_env_id or not execution_env_data: return {"error": "execution_env_id and execution_env_data are required"} return get_aap_connector().patch(f"execution_environments/{execution_env_id}/", execution_env_data) elif action == "delete_execution_env": if not execution_env_id: return {"error": "execution_env_id is required for deleting execution environments"} return get_aap_connector().delete(f"execution_environments/{execution_env_id}/") # Unified Jobs (covers all job types) elif action == "list_unified_jobs": params = filters or {} return get_aap_connector().get("unified_jobs/", params) elif action == "get_stdout": if not job_id: return {"error": "job_id is required for getting stdout"} params = filters or {} return get_aap_connector().get(f"unified_jobs/{job_id}/stdout/", params) else: return {"error": f"Unknown action: {action}"} except Exception as e: return {"error": f"Job/Execution management failed: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anshulbehl/aap-mcp-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server