Skip to main content
Glama

Jira-GitLab MCP Server

by gabbar910
jira_client.py18.7 kB
import asyncio import logging import re from typing import Dict, List, Any, Optional from atlassian import Jira from requests.exceptions import HTTPError, ConnectionError, Timeout from utils.error_handler import JiraError, AuthenticationError, retry_on_failure, safe_get_nested logger = logging.getLogger(__name__) # Constants for tag filtering MAX_TAGS_PER_REQUEST = 10 class JiraClient: def __init__(self, config: Dict[str, str]): self.config = config self.project_key = config.get("project_key", "DEMO") # Default fallback self.jira = Jira( url=config["base_url"], username=config["email"], password=config["api_token"] ) logger.info(f"Jira client initialized for {config['base_url']} with project {self.project_key}") @retry_on_failure(max_retries=3, delay=1.0) async def test_connection(self) -> bool: """Test the Jira connection""" try: # Run in thread pool since atlassian library is synchronous loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._test_connection_sync) logger.info("Jira connection test successful") return result except Exception as e: logger.error(f"Jira connection test failed: {e}") raise AuthenticationError(f"Failed to connect to Jira: {str(e)}", "jira") def _test_connection_sync(self) -> bool: """Synchronous connection test""" try: # Try to get current user info to test authentication user = self.jira.myself() return user is not None except HTTPError as e: if e.response.status_code == 401: raise AuthenticationError("Invalid Jira credentials", "jira") elif e.response.status_code == 403: raise AuthenticationError("Insufficient Jira permissions", "jira") else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except ConnectionError: raise JiraError("Failed to connect to Jira server") except Exception as e: raise JiraError(f"Unexpected error: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def get_issues(self, jql: str = None, max_results: int = 50) -> List[Dict[str, Any]]: """Get Jira issues using JQL query""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_issues_sync, jql, max_results) logger.info(f"Retrieved {len(result)} issues from Jira") return result except Exception as e: logger.error(f"Failed to get Jira issues: {e}") raise JiraError(f"Failed to fetch issues: {str(e)}") def _get_issues_sync(self, jql: str, max_results: int) -> List[Dict[str, Any]]: """Synchronous issue retrieval using working API endpoints""" try: # Use default JQL if none provided if jql is None: jql = f"project = {self.project_key} AND status = 'To Do'" # Try the deprecated search endpoint first (for backward compatibility) try: response = self.jira.jql(jql, limit=max_results) issues = response.get("issues", []) if issues: return self._process_issues(issues) except HTTPError as e: if e.response.status_code == 410: # Search endpoint is deprecated, use alternative method logger.info("Search endpoint deprecated (410), using issue picker API") return self._get_issues_via_picker(jql, max_results) else: raise return self._process_issues(issues) except HTTPError as e: if e.response.status_code == 400: raise JiraError(f"Invalid JQL query: {jql}", e.response.status_code) else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except Exception as e: raise JiraError(f"Unexpected error fetching issues: {str(e)}") def _get_issues_via_picker(self, jql: str, max_results: int) -> List[Dict[str, Any]]: """Get issues using the issue picker API (fallback for deprecated search)""" import requests # Extract project key from JQL for picker query project_query = self.project_key if "project" in jql.lower(): # Try to extract more specific query terms jql_lower = jql.lower() if "status" in jql_lower: # For now, just use project key - picker doesn't support full JQL project_query = self.project_key url = f"{self.config['base_url']}/rest/api/2/issue/picker" auth = (self.config["email"], self.config["api_token"]) params = { "query": project_query, "showSubTasks": "true", "showSubTaskParent": "true" } response = requests.get(url, auth=auth, params=params, timeout=30) response.raise_for_status() data = response.json() # Extract issues from picker response all_issues = [] for section in data.get("sections", []): issues = section.get("issues", []) all_issues.extend(issues) # Convert picker format to standard issue format converted_issues = [] for issue in all_issues[:max_results]: # Limit results # Get full issue details for each issue full_issue = self._get_issue_sync(issue["key"]) if full_issue: converted_issues.append(full_issue) logger.info(f"Retrieved {len(converted_issues)} issues via picker API") return converted_issues def _process_issues(self, issues: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process and clean up issue data""" cleaned_issues = [] for issue in issues: cleaned_issue = { "key": issue.get("key", ""), "fields": { "summary": safe_get_nested(issue, "fields.summary", "No summary"), "status": { "name": safe_get_nested(issue, "fields.status.name", "Unknown") }, "assignee": issue["fields"].get("assignee"), "priority": issue["fields"].get("priority"), "created": safe_get_nested(issue, "fields.created", ""), "updated": safe_get_nested(issue, "fields.updated", "") } } cleaned_issues.append(cleaned_issue) return cleaned_issues @retry_on_failure(max_retries=2, delay=0.5) async def comment_issue(self, issue_key: str, comment: str) -> bool: """Add a comment to a Jira issue""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._comment_issue_sync, issue_key, comment) logger.info(f"Added comment to issue {issue_key}") return result except Exception as e: logger.error(f"Failed to comment on issue {issue_key}: {e}") raise JiraError(f"Failed to add comment to {issue_key}: {str(e)}") def _comment_issue_sync(self, issue_key: str, comment: str) -> bool: """Synchronous comment addition""" try: self.jira.issue_add_comment(issue_key, comment) return True except HTTPError as e: if e.response.status_code == 404: raise JiraError(f"Issue {issue_key} not found", e.response.status_code) elif e.response.status_code == 403: raise JiraError(f"No permission to comment on {issue_key}", e.response.status_code) else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except Exception as e: raise JiraError(f"Unexpected error commenting on issue: {str(e)}") async def get_issue(self, issue_key: str) -> Optional[Dict[str, Any]]: """Get a specific Jira issue by key""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_issue_sync, issue_key) logger.info(f"Retrieved issue {issue_key}") return result except Exception as e: logger.error(f"Failed to get issue {issue_key}: {e}") raise JiraError(f"Failed to fetch issue {issue_key}: {str(e)}") def _get_issue_sync(self, issue_key: str) -> Optional[Dict[str, Any]]: """Synchronous single issue retrieval""" try: issue = self.jira.issue(issue_key) if not issue: return None return { "key": issue.get("key", ""), "fields": { "summary": safe_get_nested(issue, "fields.summary", "No summary"), "description": safe_get_nested(issue, "fields.description", ""), "status": { "name": safe_get_nested(issue, "fields.status.name", "Unknown") }, "assignee": issue["fields"].get("assignee"), "priority": issue["fields"].get("priority"), "created": safe_get_nested(issue, "fields.created", ""), "updated": safe_get_nested(issue, "fields.updated", "") } } except HTTPError as e: if e.response.status_code == 404: raise JiraError(f"Issue {issue_key} not found", e.response.status_code) else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except Exception as e: raise JiraError(f"Unexpected error fetching issue: {str(e)}") def _sanitize_jql_value(self, value: str) -> str: """Sanitize values for JQL queries to prevent injection""" if not isinstance(value, str): value = str(value) # Remove potentially dangerous characters clean_value = re.sub(r'["\\\';]', '', value.strip()) # Ensure it's not empty after cleaning if not clean_value: raise JiraError("Invalid tag value after sanitization") # Wrap in quotes for exact match return f'"{clean_value}"' @retry_on_failure(max_retries=2, delay=0.5) async def get_issues_by_tags(self, project_key: str, tags: List[str], max_results: int = 50) -> List[Dict[str, Any]]: """Get Jira issues by project and tags (labels)""" try: # Validate input if not project_key or not project_key.strip(): raise JiraError("Project key is required") if not tags or len(tags) == 0: raise JiraError("At least one tag is required") if len(tags) > MAX_TAGS_PER_REQUEST: raise JiraError(f"Maximum {MAX_TAGS_PER_REQUEST} tags allowed per request") # Sanitize project key and tags clean_project = re.sub(r'[^A-Z0-9_-]', '', project_key.upper()) if not clean_project: raise JiraError("Invalid project key") clean_tags = [self._sanitize_jql_value(tag) for tag in tags] # Build JQL query tags_clause = ", ".join(clean_tags) jql = f'project = {clean_project} AND labels IN ({tags_clause})' logger.info(f"Fetching issues with JQL: {jql}") # Use existing get_issues method result = await self.get_issues(jql, max_results) logger.info(f"Retrieved {len(result)} issues with tags {tags} from project {project_key}") return result except JiraError: # Re-raise JiraError as-is raise except Exception as e: logger.error(f"Failed to get issues by tags: {e}") raise JiraError(f"Failed to fetch issues by tags: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def get_issues_by_key_pattern(self, project_key: str, key_pattern: str, max_results: int = 50) -> List[Dict[str, Any]]: """Get Jira issues by project and issue key pattern""" try: # Validate input if not project_key or not project_key.strip(): raise JiraError("Project key is required") if not key_pattern or not key_pattern.strip(): raise JiraError("Key pattern is required") # Sanitize inputs clean_project = re.sub(r'[^A-Z0-9_-]', '', project_key.upper()) clean_pattern = re.sub(r'[^A-Z0-9_*-]', '', key_pattern.upper()) if not clean_project or not clean_pattern: raise JiraError("Invalid project key or pattern") # Build JQL query jql = f'project = {clean_project} AND key ~ "{clean_pattern}"' logger.info(f"Fetching issues with key pattern: {jql}") result = await self.get_issues(jql, max_results) logger.info(f"Retrieved {len(result)} issues matching pattern {key_pattern} from project {project_key}") return result except JiraError: raise except Exception as e: logger.error(f"Failed to get issues by key pattern: {e}") raise JiraError(f"Failed to fetch issues by key pattern: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def update_issue_status(self, issue_key: str, status: str, comment: Optional[str] = None) -> bool: """Update Jira issue status (transition)""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._update_issue_status_sync, issue_key, status, comment) logger.info(f"Updated issue {issue_key} status to '{status}'") return result except Exception as e: logger.error(f"Failed to update issue {issue_key} status: {e}") raise JiraError(f"Failed to update issue status: {str(e)}") def _update_issue_status_sync(self, issue_key: str, status: str, comment: Optional[str] = None) -> bool: """Synchronous issue status update""" try: # Get available transitions for the issue transitions = self.jira.get_issue_transitions(issue_key) # Find the transition that matches the target status target_transition = None for transition in transitions['transitions']: if transition['to']['name'].lower() == status.lower(): target_transition = transition break if not target_transition: # List available transitions for better error message available = [t['to']['name'] for t in transitions['transitions']] raise JiraError(f"Status '{status}' not available for issue {issue_key}. Available: {available}") # Prepare transition data transition_data = { 'transition': { 'id': target_transition['id'] } } # Add comment if provided if comment: transition_data['update'] = { 'comment': [{ 'add': { 'body': comment } }] } # Execute the transition self.jira.transition_issue(issue_key, transition_data) return True except HTTPError as e: if e.response.status_code == 404: raise JiraError(f"Issue {issue_key} not found", e.response.status_code) elif e.response.status_code == 400: raise JiraError(f"Invalid transition for issue {issue_key}", e.response.status_code) else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except Exception as e: raise JiraError(f"Unexpected error updating issue status: {str(e)}") @retry_on_failure(max_retries=2, delay=0.5) async def get_issue_transitions(self, issue_key: str) -> List[Dict[str, Any]]: """Get available transitions for a Jira issue""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self._get_issue_transitions_sync, issue_key) logger.info(f"Retrieved transitions for issue {issue_key}") return result except Exception as e: logger.error(f"Failed to get transitions for issue {issue_key}: {e}") raise JiraError(f"Failed to get issue transitions: {str(e)}") def _get_issue_transitions_sync(self, issue_key: str) -> List[Dict[str, Any]]: """Synchronous issue transitions retrieval""" try: transitions = self.jira.get_issue_transitions(issue_key) transition_list = [] for transition in transitions['transitions']: transition_data = { 'id': transition['id'], 'name': transition['name'], 'to_status': transition['to']['name'], 'to_status_id': transition['to']['id'] } transition_list.append(transition_data) return transition_list except HTTPError as e: if e.response.status_code == 404: raise JiraError(f"Issue {issue_key} not found", e.response.status_code) else: raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code) except Exception as e: raise JiraError(f"Unexpected error getting issue transitions: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gabbar910/MCPJiraGitlab'

If you have feedback or need assistance with the MCP directory API, please join our Discord server