jira_client.py•18.7 kB
import asyncio
import logging
import re
from typing import Dict, List, Any, Optional
from atlassian import Jira
from requests.exceptions import HTTPError, ConnectionError, Timeout
from utils.error_handler import JiraError, AuthenticationError, retry_on_failure, safe_get_nested
logger = logging.getLogger(__name__)
# Constants for tag filtering
MAX_TAGS_PER_REQUEST = 10
class JiraClient:
def __init__(self, config: Dict[str, str]):
self.config = config
self.project_key = config.get("project_key", "DEMO") # Default fallback
self.jira = Jira(
url=config["base_url"],
username=config["email"],
password=config["api_token"]
)
logger.info(f"Jira client initialized for {config['base_url']} with project {self.project_key}")
@retry_on_failure(max_retries=3, delay=1.0)
async def test_connection(self) -> bool:
"""Test the Jira connection"""
try:
# Run in thread pool since atlassian library is synchronous
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._test_connection_sync)
logger.info("Jira connection test successful")
return result
except Exception as e:
logger.error(f"Jira connection test failed: {e}")
raise AuthenticationError(f"Failed to connect to Jira: {str(e)}", "jira")
def _test_connection_sync(self) -> bool:
"""Synchronous connection test"""
try:
# Try to get current user info to test authentication
user = self.jira.myself()
return user is not None
except HTTPError as e:
if e.response.status_code == 401:
raise AuthenticationError("Invalid Jira credentials", "jira")
elif e.response.status_code == 403:
raise AuthenticationError("Insufficient Jira permissions", "jira")
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except ConnectionError:
raise JiraError("Failed to connect to Jira server")
except Exception as e:
raise JiraError(f"Unexpected error: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def get_issues(self, jql: str = None, max_results: int = 50) -> List[Dict[str, Any]]:
"""Get Jira issues using JQL query"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_issues_sync, jql, max_results)
logger.info(f"Retrieved {len(result)} issues from Jira")
return result
except Exception as e:
logger.error(f"Failed to get Jira issues: {e}")
raise JiraError(f"Failed to fetch issues: {str(e)}")
def _get_issues_sync(self, jql: str, max_results: int) -> List[Dict[str, Any]]:
"""Synchronous issue retrieval using working API endpoints"""
try:
# Use default JQL if none provided
if jql is None:
jql = f"project = {self.project_key} AND status = 'To Do'"
# Try the deprecated search endpoint first (for backward compatibility)
try:
response = self.jira.jql(jql, limit=max_results)
issues = response.get("issues", [])
if issues:
return self._process_issues(issues)
except HTTPError as e:
if e.response.status_code == 410:
# Search endpoint is deprecated, use alternative method
logger.info("Search endpoint deprecated (410), using issue picker API")
return self._get_issues_via_picker(jql, max_results)
else:
raise
return self._process_issues(issues)
except HTTPError as e:
if e.response.status_code == 400:
raise JiraError(f"Invalid JQL query: {jql}", e.response.status_code)
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except Exception as e:
raise JiraError(f"Unexpected error fetching issues: {str(e)}")
def _get_issues_via_picker(self, jql: str, max_results: int) -> List[Dict[str, Any]]:
"""Get issues using the issue picker API (fallback for deprecated search)"""
import requests
# Extract project key from JQL for picker query
project_query = self.project_key
if "project" in jql.lower():
# Try to extract more specific query terms
jql_lower = jql.lower()
if "status" in jql_lower:
# For now, just use project key - picker doesn't support full JQL
project_query = self.project_key
url = f"{self.config['base_url']}/rest/api/2/issue/picker"
auth = (self.config["email"], self.config["api_token"])
params = {
"query": project_query,
"showSubTasks": "true",
"showSubTaskParent": "true"
}
response = requests.get(url, auth=auth, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Extract issues from picker response
all_issues = []
for section in data.get("sections", []):
issues = section.get("issues", [])
all_issues.extend(issues)
# Convert picker format to standard issue format
converted_issues = []
for issue in all_issues[:max_results]: # Limit results
# Get full issue details for each issue
full_issue = self._get_issue_sync(issue["key"])
if full_issue:
converted_issues.append(full_issue)
logger.info(f"Retrieved {len(converted_issues)} issues via picker API")
return converted_issues
def _process_issues(self, issues: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process and clean up issue data"""
cleaned_issues = []
for issue in issues:
cleaned_issue = {
"key": issue.get("key", ""),
"fields": {
"summary": safe_get_nested(issue, "fields.summary", "No summary"),
"status": {
"name": safe_get_nested(issue, "fields.status.name", "Unknown")
},
"assignee": issue["fields"].get("assignee"),
"priority": issue["fields"].get("priority"),
"created": safe_get_nested(issue, "fields.created", ""),
"updated": safe_get_nested(issue, "fields.updated", "")
}
}
cleaned_issues.append(cleaned_issue)
return cleaned_issues
@retry_on_failure(max_retries=2, delay=0.5)
async def comment_issue(self, issue_key: str, comment: str) -> bool:
"""Add a comment to a Jira issue"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._comment_issue_sync, issue_key, comment)
logger.info(f"Added comment to issue {issue_key}")
return result
except Exception as e:
logger.error(f"Failed to comment on issue {issue_key}: {e}")
raise JiraError(f"Failed to add comment to {issue_key}: {str(e)}")
def _comment_issue_sync(self, issue_key: str, comment: str) -> bool:
"""Synchronous comment addition"""
try:
self.jira.issue_add_comment(issue_key, comment)
return True
except HTTPError as e:
if e.response.status_code == 404:
raise JiraError(f"Issue {issue_key} not found", e.response.status_code)
elif e.response.status_code == 403:
raise JiraError(f"No permission to comment on {issue_key}", e.response.status_code)
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except Exception as e:
raise JiraError(f"Unexpected error commenting on issue: {str(e)}")
async def get_issue(self, issue_key: str) -> Optional[Dict[str, Any]]:
"""Get a specific Jira issue by key"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_issue_sync, issue_key)
logger.info(f"Retrieved issue {issue_key}")
return result
except Exception as e:
logger.error(f"Failed to get issue {issue_key}: {e}")
raise JiraError(f"Failed to fetch issue {issue_key}: {str(e)}")
def _get_issue_sync(self, issue_key: str) -> Optional[Dict[str, Any]]:
"""Synchronous single issue retrieval"""
try:
issue = self.jira.issue(issue_key)
if not issue:
return None
return {
"key": issue.get("key", ""),
"fields": {
"summary": safe_get_nested(issue, "fields.summary", "No summary"),
"description": safe_get_nested(issue, "fields.description", ""),
"status": {
"name": safe_get_nested(issue, "fields.status.name", "Unknown")
},
"assignee": issue["fields"].get("assignee"),
"priority": issue["fields"].get("priority"),
"created": safe_get_nested(issue, "fields.created", ""),
"updated": safe_get_nested(issue, "fields.updated", "")
}
}
except HTTPError as e:
if e.response.status_code == 404:
raise JiraError(f"Issue {issue_key} not found", e.response.status_code)
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except Exception as e:
raise JiraError(f"Unexpected error fetching issue: {str(e)}")
def _sanitize_jql_value(self, value: str) -> str:
"""Sanitize values for JQL queries to prevent injection"""
if not isinstance(value, str):
value = str(value)
# Remove potentially dangerous characters
clean_value = re.sub(r'["\\\';]', '', value.strip())
# Ensure it's not empty after cleaning
if not clean_value:
raise JiraError("Invalid tag value after sanitization")
# Wrap in quotes for exact match
return f'"{clean_value}"'
@retry_on_failure(max_retries=2, delay=0.5)
async def get_issues_by_tags(self, project_key: str, tags: List[str], max_results: int = 50) -> List[Dict[str, Any]]:
"""Get Jira issues by project and tags (labels)"""
try:
# Validate input
if not project_key or not project_key.strip():
raise JiraError("Project key is required")
if not tags or len(tags) == 0:
raise JiraError("At least one tag is required")
if len(tags) > MAX_TAGS_PER_REQUEST:
raise JiraError(f"Maximum {MAX_TAGS_PER_REQUEST} tags allowed per request")
# Sanitize project key and tags
clean_project = re.sub(r'[^A-Z0-9_-]', '', project_key.upper())
if not clean_project:
raise JiraError("Invalid project key")
clean_tags = [self._sanitize_jql_value(tag) for tag in tags]
# Build JQL query
tags_clause = ", ".join(clean_tags)
jql = f'project = {clean_project} AND labels IN ({tags_clause})'
logger.info(f"Fetching issues with JQL: {jql}")
# Use existing get_issues method
result = await self.get_issues(jql, max_results)
logger.info(f"Retrieved {len(result)} issues with tags {tags} from project {project_key}")
return result
except JiraError:
# Re-raise JiraError as-is
raise
except Exception as e:
logger.error(f"Failed to get issues by tags: {e}")
raise JiraError(f"Failed to fetch issues by tags: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def get_issues_by_key_pattern(self, project_key: str, key_pattern: str, max_results: int = 50) -> List[Dict[str, Any]]:
"""Get Jira issues by project and issue key pattern"""
try:
# Validate input
if not project_key or not project_key.strip():
raise JiraError("Project key is required")
if not key_pattern or not key_pattern.strip():
raise JiraError("Key pattern is required")
# Sanitize inputs
clean_project = re.sub(r'[^A-Z0-9_-]', '', project_key.upper())
clean_pattern = re.sub(r'[^A-Z0-9_*-]', '', key_pattern.upper())
if not clean_project or not clean_pattern:
raise JiraError("Invalid project key or pattern")
# Build JQL query
jql = f'project = {clean_project} AND key ~ "{clean_pattern}"'
logger.info(f"Fetching issues with key pattern: {jql}")
result = await self.get_issues(jql, max_results)
logger.info(f"Retrieved {len(result)} issues matching pattern {key_pattern} from project {project_key}")
return result
except JiraError:
raise
except Exception as e:
logger.error(f"Failed to get issues by key pattern: {e}")
raise JiraError(f"Failed to fetch issues by key pattern: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def update_issue_status(self, issue_key: str, status: str, comment: Optional[str] = None) -> bool:
"""Update Jira issue status (transition)"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._update_issue_status_sync, issue_key, status, comment)
logger.info(f"Updated issue {issue_key} status to '{status}'")
return result
except Exception as e:
logger.error(f"Failed to update issue {issue_key} status: {e}")
raise JiraError(f"Failed to update issue status: {str(e)}")
def _update_issue_status_sync(self, issue_key: str, status: str, comment: Optional[str] = None) -> bool:
"""Synchronous issue status update"""
try:
# Get available transitions for the issue
transitions = self.jira.get_issue_transitions(issue_key)
# Find the transition that matches the target status
target_transition = None
for transition in transitions['transitions']:
if transition['to']['name'].lower() == status.lower():
target_transition = transition
break
if not target_transition:
# List available transitions for better error message
available = [t['to']['name'] for t in transitions['transitions']]
raise JiraError(f"Status '{status}' not available for issue {issue_key}. Available: {available}")
# Prepare transition data
transition_data = {
'transition': {
'id': target_transition['id']
}
}
# Add comment if provided
if comment:
transition_data['update'] = {
'comment': [{
'add': {
'body': comment
}
}]
}
# Execute the transition
self.jira.transition_issue(issue_key, transition_data)
return True
except HTTPError as e:
if e.response.status_code == 404:
raise JiraError(f"Issue {issue_key} not found", e.response.status_code)
elif e.response.status_code == 400:
raise JiraError(f"Invalid transition for issue {issue_key}", e.response.status_code)
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except Exception as e:
raise JiraError(f"Unexpected error updating issue status: {str(e)}")
@retry_on_failure(max_retries=2, delay=0.5)
async def get_issue_transitions(self, issue_key: str) -> List[Dict[str, Any]]:
"""Get available transitions for a Jira issue"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._get_issue_transitions_sync, issue_key)
logger.info(f"Retrieved transitions for issue {issue_key}")
return result
except Exception as e:
logger.error(f"Failed to get transitions for issue {issue_key}: {e}")
raise JiraError(f"Failed to get issue transitions: {str(e)}")
def _get_issue_transitions_sync(self, issue_key: str) -> List[Dict[str, Any]]:
"""Synchronous issue transitions retrieval"""
try:
transitions = self.jira.get_issue_transitions(issue_key)
transition_list = []
for transition in transitions['transitions']:
transition_data = {
'id': transition['id'],
'name': transition['name'],
'to_status': transition['to']['name'],
'to_status_id': transition['to']['id']
}
transition_list.append(transition_data)
return transition_list
except HTTPError as e:
if e.response.status_code == 404:
raise JiraError(f"Issue {issue_key} not found", e.response.status_code)
else:
raise JiraError(f"HTTP error: {e.response.status_code}", e.response.status_code)
except Exception as e:
raise JiraError(f"Unexpected error getting issue transitions: {str(e)}")