Skip to main content
Glama
client.py24.7 kB
"""Jenkins API client.""" import logging import re import time from typing import Any from typing import Dict from typing import List from typing import Optional import requests from ..config import get_jenkins_servers from .exceptions import JenkinsBuildNotFoundError from .exceptions import JenkinsError from .exceptions import JenkinsJobNotFoundError from .exceptions import JenkinsPermissionError from .exceptions import JenkinsServerNotFoundError from .types import BuildInfo from .types import JenkinsClient from .types import JenkinsServerConfig from .types import JobInfo from .types import JobParameter from .types import ParameterDict from .types import QueueInfo from .types import StopResult from .types import TriggerResult logger = logging.getLogger(__name__) class JenkinsAPIClient: """Jenkins API client class.""" def __init__(self, server_name: str, timeout: int = 30) -> None: """Initialize Jenkins API client. Args: server_name: Jenkins server name timeout: Request timeout (seconds) Raises: JenkinsServerNotFoundError: Server not found """ self.server_name = server_name self.timeout = timeout self._server_config = self._get_server_config(server_name) self._client = JenkinsClient(self._server_config, timeout) @staticmethod def _get_server_config(server_name: str) -> JenkinsServerConfig: """Get server config. Args: server_name: Server name Returns: Server config dict Raises: JenkinsServerNotFoundError: Server not found """ servers = get_jenkins_servers() for server in servers: if server["name"] == server_name: return server raise JenkinsServerNotFoundError(server_name) def _make_request( self, method: str, url: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> requests.Response: """Send HTTP request. Args: method: HTTP method url: Request URL params: Query parameters **kwargs: Other request parameters Returns: HTTP response object Raises: JenkinsError: Request failed """ try: response = requests.request( method=method, url=url, auth=self._client.auth, params=params, timeout=self.timeout, **kwargs, ) logger.debug( f"Jenkins API request: {method} {url} -> {response.status_code}" ) return response except requests.exceptions.RequestException as e: logger.error(f"Jenkins API request failed: {e}") raise JenkinsError(f"Jenkins API request failed: {e}") from e def _build_job_url(self, job_full_name: str) -> str: """Build job URL. Args: job_full_name: Full job name Returns: Job URL """ parts = job_full_name.split("/") job_path = "".join(f"/job/{part}" for part in parts) return f"{self._client.base_url}{job_path}" def get_job_info(self, job_full_name: str) -> JobInfo: """Get job info. Args: job_full_name: Full job name Returns: Job info Raises: JenkinsJobNotFoundError: Job not found JenkinsError: API request failed """ job_url = self._build_job_url(job_full_name) api_url = f"{job_url}/api/json?tree=name,fullName,url,description,buildable,color,lastBuild[number,url]" response = self._make_request("GET", api_url) if response.status_code == 404: raise JenkinsJobNotFoundError(job_full_name, self.server_name) response.raise_for_status() data = response.json() # Get last build info last_build = data.get("lastBuild") last_build_number = None last_build_url = None if last_build: last_build_number = last_build.get("number") last_build_url = last_build.get("url") return { "name": data.get("name", ""), "fullName": data.get("fullName", job_full_name), "url": data.get("url", ""), "description": data.get("description"), "buildable": data.get("buildable", False), "color": data.get("color", "grey"), "is_parameterized": self._is_job_parameterized(job_full_name), "last_build_number": last_build_number, "last_build_url": last_build_url, } def _is_job_parameterized(self, job_full_name: str) -> bool: """Check if job is parameterized. Args: job_full_name: Full job name Returns: Whether parameterized """ try: job_url = self._build_job_url(job_full_name) api_url = f"{job_url}/api/json?tree=property[parameterDefinitions[name]]" response = self._make_request("GET", api_url) response.raise_for_status() data = response.json() for prop in data.get("property", []): if "parameterDefinitions" in prop and prop["parameterDefinitions"]: return True return False except Exception: logger.warning(f"Failed to check if job {job_full_name} is parameterized") return False def get_job_parameters(self, job_full_name: str) -> List[JobParameter]: """Get job parameter definitions. Args: job_full_name: Full job name Returns: List of parameter definitions Raises: JenkinsError: API request failed """ job_url = self._build_job_url(job_full_name) api_url = ( f"{job_url}/api/json?tree=actions[parameterDefinitions[name,type," "defaultParameterValue[value],choices]],property[parameterDefinitions" "[name,type,defaultParameterValue[value],choices]]" ) response = self._make_request("GET", api_url) response.raise_for_status() data = response.json() params = [] def process_parameter(param_def: Dict[str, Any]) -> JobParameter: """Process parameter definition.""" param_info: JobParameter = { "name": param_def.get("name", ""), "type": param_def.get("type", ""), "default": param_def.get("defaultParameterValue", {}).get("value"), "choices": None, } # If Choice Parameter, add choices list if ( param_def.get("type") == "ChoiceParameterDefinition" and "choices" in param_def ): param_info["choices"] = param_def.get("choices", []) return param_info # Process property field for prop in data.get("property", []): if "parameterDefinitions" in prop: for param_def in prop["parameterDefinitions"]: params.append(process_parameter(param_def)) # Process actions field (compatibility) for action in data.get("actions", []): if "parameterDefinitions" in action: for param_def in action["parameterDefinitions"]: params.append(process_parameter(param_def)) return params def trigger_build( self, job_full_name: str, params: Optional[ParameterDict] = None ) -> TriggerResult: """Trigger build. Args: job_full_name: Full job name params: Build parameters Returns: Trigger result Raises: JenkinsError: Trigger failed """ job_url = self._build_job_url(job_full_name) # Check job parameters job_params = self.get_job_parameters(job_full_name) if job_params: # Parameterized build build_url = f"{job_url}/buildWithParameters" build_params = params or {} else: # Non-parameterized build build_url = f"{job_url}/build" build_params = {} response = self._make_request("POST", build_url, params=build_params) response.raise_for_status() # Get queue location queue_location = response.headers.get("Location", "") queue_id = None if queue_location: match = re.search(r"/queue/item/(\d+)/", queue_location) if match: queue_id = int(match.group(1)) # Wait for build to start return self._wait_for_build_start(queue_id, queue_location) def _wait_for_build_start( self, queue_id: Optional[int], queue_location: str ) -> TriggerResult: """Wait for build to start. Args: queue_id: Queue ID queue_location: Queue location URL Returns: Trigger result """ max_attempts = 10 for attempt in range(max_attempts): if queue_id: queue_info = self.get_queue_info(queue_id) if queue_info.get("build_number"): return { "status": "BUILD_STARTED", "build_number": queue_info["build_number"], "build_url": queue_info.get("build_url"), "queue_id": queue_id, "queue_url": queue_location, "message": None, } if attempt < max_attempts - 1: time.sleep(1) # Timeout, return queue info return { "status": "QUEUED", "build_number": None, "build_url": None, "queue_id": queue_id, "queue_url": queue_location, "message": "Build is queued but did not start within 10 seconds", } def get_queue_info(self, queue_id: int) -> QueueInfo: """Get queue info. Args: queue_id: Queue ID Returns: Queue info Raises: JenkinsError: API request failed """ api_url = f"{self._client.base_url}/queue/item/{queue_id}/api/json" response = self._make_request("GET", api_url) if response.status_code == 404: return { "queue_id": queue_id, "blocked": False, "buildable": False, "stuck": False, "why": "Item not found", "build_number": None, "build_url": None, "status": "NOT_FOUND", } response.raise_for_status() data = response.json() result: QueueInfo = { "queue_id": queue_id, "blocked": data.get("blocked", False), "buildable": data.get("buildable", False), "stuck": data.get("stuck", False), "why": data.get("why"), "build_number": None, "build_url": None, "status": "QUEUED", } # Check if build has started executable = data.get("executable") if executable: result["build_number"] = executable.get("number") result["build_url"] = executable.get("url") result["status"] = "BUILD_STARTED" return result def get_build_status(self, job_full_name: str, build_number: int) -> BuildInfo: """Get build status. Args: job_full_name: Full job name build_number: Build number Returns: Build info Raises: JenkinsBuildNotFoundError: Build not found JenkinsError: API request failed """ job_url = self._build_job_url(job_full_name) api_url = f"{job_url}/{build_number}/api/json" response = self._make_request("GET", api_url) if response.status_code == 404: raise JenkinsBuildNotFoundError( build_number, job_full_name, self.server_name ) response.raise_for_status() data = response.json() return { "number": data.get("number", build_number), "result": data.get("result"), "building": data.get("building", False), "url": data.get("url", ""), "timestamp": data.get("timestamp", 0), "duration": data.get("duration", 0), } def stop_build(self, job_full_name: str, build_number: int) -> StopResult: """Stop build. Args: job_full_name: Full job name build_number: Build number Returns: Stop result Raises: JenkinsError: Stop failed """ job_url = self._build_job_url(job_full_name) stop_url = f"{job_url}/{build_number}/stop" response = self._make_request("POST", stop_url) if response.status_code == 404: return {"status": "NOT_FOUND", "url": None} if response.status_code == 403: # Permission error, check build status return self._handle_stop_permission_error(job_full_name, build_number) response.raise_for_status() return {"status": "STOP_REQUESTED", "url": stop_url} def _handle_stop_permission_error( self, job_full_name: str, build_number: int ) -> StopResult: """Handle permission error when stopping build. Args: job_full_name: Full job name build_number: Build number Returns: Stop result """ # Loop to check build status, confirm if already terminated for attempt in range(10): try: build_info = self.get_build_status(job_full_name, build_number) if not build_info.get("building", True): return {"status": "ALREADY_TERMINATED", "url": None} except (JenkinsBuildNotFoundError, JenkinsError): # Build not found or query failed, consider as terminated return {"status": "ALREADY_TERMINATED", "url": None} if attempt < 9: time.sleep(1) # Still building after 10 checks, raise permission error raise JenkinsPermissionError("stop build", f"{job_full_name}#{build_number}") def get_build_log(self, job_full_name: str, build_number: int) -> str: """Get build log. Args: job_full_name: Full job name build_number: Build number Returns: Build log text Raises: JenkinsBuildNotFoundError: Build not found JenkinsError: API request failed """ job_url = self._build_job_url(job_full_name) log_url = f"{job_url}/{build_number}/consoleText" response = self._make_request("GET", log_url) if response.status_code == 404: raise JenkinsBuildNotFoundError( build_number, job_full_name, self.server_name ) response.raise_for_status() return response.text def search_jobs(self, keyword: str) -> List[JobInfo]: """Search jobs. Args: keyword: Search keyword Returns: List of matching jobs Raises: JenkinsError: API request failed """ api_url = ( f"{self._client.base_url}/api/json?tree=jobs[name,url,fullName," "jobs[name,url,fullName,jobs[name,url,fullName,jobs[name,url,fullName]]]]" ) response = self._make_request("GET", api_url) response.raise_for_status() data = response.json() all_jobs = self._collect_all_jobs(data.get("jobs", [])) # Filter matching jobs matching_jobs = [] for job in all_jobs: if ( keyword.lower() in job["name"].lower() or keyword.lower() in job.get("fullName", "").lower() ): job_info = self.get_job_info(job["fullName"]) matching_jobs.append(job_info) return matching_jobs def _collect_all_jobs( self, jobs: List[Dict[str, Any]], parent: str = "" ) -> List[Dict[str, Any]]: """Recursively collect all jobs. Args: jobs: List of jobs parent: Parent job path Returns: Flattened job list """ result = [] for job in jobs: name = job.get("fullName") or ( f"{parent}/{job['name']}" if parent else job["name"] ) result.append( { "name": job["name"], "fullName": name, "url": job["url"], } ) if "jobs" in job and job["jobs"]: result.extend(self._collect_all_jobs(job["jobs"], name)) return result def create_job(self, job_name: str, job_config: str, folder_path: str = "") -> dict: """Create a new Jenkins job. Args: job_name: Name of the new job job_config: XML configuration for the job folder_path: Optional folder path (e.g., "test/folder1" for nested folders) Returns: Dict containing creation result with status and job_url Raises: JenkinsError: Job creation failed """ # Create folders if they don't exist if folder_path: self._ensure_folders_exist(folder_path) folder_parts = folder_path.split("/") folder_url = "".join(f"/job/{part}" for part in folder_parts) create_url = f"{self._client.base_url}{folder_url}/createItem" job_url = f"{self._client.base_url}{folder_url}/job/{job_name}" else: create_url = f"{self._client.base_url}/createItem" job_url = f"{self._client.base_url}/job/{job_name}" # Create job headers = {"Content-Type": "application/xml"} params = {"name": job_name} response = self._make_request( "POST", create_url, params=params, data=job_config, headers=headers ) if response.status_code == 400: raise JenkinsError( f"Job creation failed: Job '{job_name}' already exists or invalid configuration" ) response.raise_for_status() return { "status": "CREATED", "job_name": job_name, "job_url": job_url, "folder_path": folder_path, } def _ensure_folders_exist(self, folder_path: str) -> None: """Ensure all folders in the path exist, create them if they don't. Args: folder_path: Folder path (e.g., "MCPS/username/subfolder") Raises: JenkinsError: Folder creation failed """ folder_parts = folder_path.split("/") current_path = "" for folder in folder_parts: current_path = f"{current_path}/{folder}" if current_path else folder # Check if folder exists if not self._folder_exists(current_path): self._create_folder(current_path, folder) def _folder_exists(self, folder_path: str) -> bool: """Check if a folder exists. Args: folder_path: Folder path to check Returns: True if folder exists, False otherwise """ try: folder_parts = folder_path.split("/") folder_url = "".join(f"/job/{part}" for part in folder_parts) api_url = f"{self._client.base_url}{folder_url}/api/json" response = self._make_request("GET", api_url) return response.status_code == 200 except Exception: return False def _create_folder(self, folder_path: str, folder_name: str) -> None: """Create a folder. Args: folder_path: Full folder path folder_name: Name of the folder to create Raises: JenkinsError: Folder creation failed """ # Determine parent path for folder creation folder_parts = folder_path.split("/") if len(folder_parts) > 1: parent_path = "/".join(folder_parts[:-1]) parent_url = "".join(f"/job/{part}" for part in parent_path.split("/")) create_url = f"{self._client.base_url}{parent_url}/createItem" else: create_url = f"{self._client.base_url}/createItem" # Folder configuration XML folder_config = """<?xml version='1.1' encoding='UTF-8'?> <com.cloudbees.hudson.plugins.folder.Folder plugin="cloudbees-folder"> <actions/> <description></description> <properties/> <folderViews class="com.cloudbees.hudson.plugins.folder.views.DefaultFolderViewHolder"> <views> <hudson.model.AllView> <owner class="com.cloudbees.hudson.plugins.folder.Folder" reference="../../../.."/> <name>all</name> <filterExecutors>false</filterExecutors> <filterQueue>false</filterQueue> <properties class="hudson.model.View$PropertyList"/> </hudson.model.AllView> </views> <tabBar class="hudson.views.DefaultViewsTabBar"/> </folderViews> <healthMetrics/> <icon class="com.cloudbees.hudson.plugins.folder.icons.StockFolderIcon"/> </com.cloudbees.hudson.plugins.folder.Folder>""" headers = {"Content-Type": "application/xml"} params = { "name": folder_name, "mode": "com.cloudbees.hudson.plugins.folder.Folder", } response = self._make_request( "POST", create_url, params=params, data=folder_config, headers=headers ) if response.status_code == 400: # Folder might already exist, check again if not self._folder_exists(folder_path): raise JenkinsError(f"Failed to create folder '{folder_name}'") else: response.raise_for_status() def update_job(self, job_name: str, job_config: str, folder_path: str = "") -> dict: """Update an existing Jenkins job. Args: job_name: Name of the job to update job_config: New XML configuration for the job folder_path: Folder path where the job is located Returns: Dict containing update result with status and job_url Raises: JenkinsError: Job update failed """ # Build job URL based on folder path if folder_path: folder_parts = folder_path.split("/") folder_url = "".join(f"/job/{part}" for part in folder_parts) update_url = ( f"{self._client.base_url}{folder_url}/job/{job_name}/config.xml" ) job_url = f"{self._client.base_url}{folder_url}/job/{job_name}" else: update_url = f"{self._client.base_url}/job/{job_name}/config.xml" job_url = f"{self._client.base_url}/job/{job_name}" # Get CSRF token if needed headers = {"Content-Type": "application/xml"} try: crumb = self._get_crumb() if crumb: headers[crumb["crumbRequestField"]] = crumb["crumb"] except Exception: # If CSRF is disabled or we can't get crumb, continue without it pass # Try to update using POST to config.xml endpoint response = self._make_request( "POST", update_url, data=job_config, headers=headers ) if response.status_code == 404: raise JenkinsError(f"Job update failed: Job '{job_name}' not found") else: response.raise_for_status() return { "status": "UPDATED", "job_name": job_name, "job_url": job_url, "folder_path": folder_path, } def _get_crumb(self) -> Optional[Dict[str, str]]: """Get CSRF crumb from Jenkins. Returns: Dict with crumb info or None if CSRF is disabled """ try: crumb_url = f"{self._client.base_url}/crumbIssuer/api/json" response = self._make_request("GET", crumb_url) if response.status_code == 200: return response.json() return None except Exception: return None

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xhuaustc/jenkins-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server