Skip to main content
Glama

JIRA MCP Server

by klauseduard
""" Core JIRA client implementation. """ import logging from typing import Dict, Any, Optional, List from jira import JIRA from jira.exceptions import JIRAError from .config import JiraConfig from ..models.comment import CommentArgs, GetCommentsArgs from ..models.worklog import LogWorkArgs from ..models.issue import CloneIssueArgs, IssueArgs, IssueTransitionArgs logger = logging.getLogger("simple_jira") class JiraError(Exception): """Error raised by JIRA operations.""" pass class JiraClient: """Simple JIRA client.""" def __init__(self, config: JiraConfig): """Initialize the JIRA client with configuration.""" self.config = config self._client = None self._verify_config() def _verify_config(self): """Verify the configuration is valid.""" if not self.config.jira_url or not self.config.jira_username or not self.config.jira_api_token: logger.error("JIRA configuration is incomplete") raise ValueError("JIRA configuration is incomplete") def connect(self) -> bool: """Connect to the JIRA instance.""" try: self._client = JIRA( server=self.config.jira_url, basic_auth=(self.config.jira_username, self.config.jira_api_token), options={ 'verify': True, 'headers': { 'Accept': 'application/json' } } ) logger.info("Successfully connected to JIRA") return True except Exception as e: logger.error(f"Failed to connect to JIRA: {str(e)}") raise JiraError(f"Failed to connect to JIRA: {str(e)}") @property def client(self) -> Optional[JIRA]: """Get the JIRA client, connecting if necessary.""" if self._client is None: self.connect() return self._client def get_issue(self, issue_key: str) -> Optional[Dict[str, Any]]: """Get a JIRA issue by key.""" try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} issue = self.client.issue(issue_key) return { "key": issue.key, "summary": issue.fields.summary, "description": issue.fields.description, "status": issue.fields.status.name, "assignee": issue.fields.assignee.displayName if issue.fields.assignee else None, "reporter": issue.fields.reporter.displayName if issue.fields.reporter else None, "created": issue.fields.created, "updated": issue.fields.updated, "issue_type": issue.fields.issuetype.name, "priority": issue.fields.priority.name if issue.fields.priority else None, } except Exception as e: logger.error(f"Error getting issue {issue_key}: {str(e)}") return {"error": f"Error getting issue: {str(e)}"} def search_issues(self, jql: str, max_results: int = 50, start_at: int = 0, fields: List[str] = None) -> Dict[str, Any]: """Search for issues using JQL.""" try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Default fields if none specified if not fields: fields = ["key", "summary", "status", "assignee", "issuetype", "priority"] # Execute search issues = self.client.search_issues( jql_str=jql, maxResults=max_results, startAt=start_at, fields=",".join(fields) ) # Format results results = [] for issue in issues: issue_dict = {"key": issue.key} for field in fields: if field == "key": continue try: value = getattr(issue.fields, field) if field == "assignee": issue_dict[field] = value.displayName if value else None elif field == "status": issue_dict[field] = value.name if value else None elif field == "issuetype": issue_dict[field] = value.name if value else None elif field == "priority": issue_dict[field] = value.name if value else None else: issue_dict[field] = value except AttributeError: issue_dict[field] = None results.append(issue_dict) return { "total": issues.total, "start_at": start_at, "max_results": max_results, "issues": results } except Exception as e: logger.error(f"Error searching issues with JQL '{jql}': {str(e)}") return {"error": f"Error searching issues: {str(e)}"} def create_issue(self, project_key: str, summary: str, description: Optional[str] = None, issue_type: str = "Task", priority: Optional[str] = None, assignee: Optional[str] = None, labels: List[str] = None, custom_fields: Dict[str, Any] = None) -> Dict[str, Any]: """Create a new JIRA issue.""" try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Prepare issue fields issue_dict = { 'project': project_key, 'summary': summary, 'issuetype': {'name': issue_type} } # Add optional fields if description: issue_dict['description'] = description if priority: issue_dict['priority'] = {'name': priority} if assignee: issue_dict['assignee'] = {'name': assignee} if labels: issue_dict['labels'] = labels if custom_fields: issue_dict.update(custom_fields) # Create the issue issue = self.client.create_issue(fields=issue_dict) # Return the created issue details return { "key": issue.key, "summary": issue.fields.summary, "description": issue.fields.description, "status": issue.fields.status.name, "assignee": issue.fields.assignee.displayName if issue.fields.assignee else None, "reporter": issue.fields.reporter.displayName if issue.fields.reporter else None, "created": issue.fields.created, "updated": issue.fields.updated, "issue_type": issue.fields.issuetype.name, "priority": issue.fields.priority.name if issue.fields.priority else None, "labels": issue.fields.labels } except Exception as e: logger.error(f"Error creating issue: {str(e)}") return {"error": f"Error creating issue: {str(e)}"} def update_issue(self, issue_key: str, summary: Optional[str] = None, description: Optional[str] = None, priority: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[List[str]] = None, comment: Optional[str] = None, custom_fields: Dict[str, Any] = None) -> Dict[str, Any]: """Update a JIRA issue.""" try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Get the issue first issue = self.client.issue(issue_key) # Prepare update fields update_dict = {} # Handle standard fields if summary is not None: update_dict['summary'] = summary if description is not None: update_dict['description'] = description if priority is not None: update_dict['priority'] = {'name': priority} if assignee is not None: update_dict['assignee'] = {'name': assignee} # Update the issue fields if update_dict: issue.update(fields=update_dict) # Handle labels separately as they need special treatment if labels is not None: issue.update(fields={'labels': labels}) # Add comment if provided if comment: issue.add_comment(comment) # Handle custom fields if custom_fields: issue.update(fields=custom_fields) # Return the updated issue details updated_issue = self.client.issue(issue_key) return { "key": updated_issue.key, "summary": updated_issue.fields.summary, "description": updated_issue.fields.description, "status": updated_issue.fields.status.name, "assignee": updated_issue.fields.assignee.displayName if updated_issue.fields.assignee else None, "reporter": updated_issue.fields.reporter.displayName if updated_issue.fields.reporter else None, "created": updated_issue.fields.created, "updated": updated_issue.fields.updated, "issue_type": updated_issue.fields.issuetype.name, "priority": updated_issue.fields.priority.name if updated_issue.fields.priority else None, "labels": updated_issue.fields.labels, "comment_added": bool(comment) } except Exception as e: logger.error(f"Error updating issue {issue_key}: {str(e)}") return {"error": f"Error updating issue: {str(e)}"} def get_projects(self, include_archived: bool = False, max_results: int = 50, start_at: int = 0) -> Dict[str, Any]: """Get list of JIRA projects.""" try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Get all projects logger.debug("Fetching projects from JIRA...") projects = self.client.projects() logger.debug(f"Got projects response type: {type(projects)}") if projects: logger.debug(f"First project type: {type(projects[0])}") logger.debug(f"First project dir: {dir(projects[0])}") # Apply pagination total = len(projects) projects = projects[start_at:start_at + max_results] # Format results results = [] for project in projects: # Get the basic project info that's always available try: project_dict = { "key": project.key, "name": project.name, "id": str(project.id) } results.append(project_dict) except Exception as e: logger.error(f"Error processing project: {str(e)}") logger.error(f"Project object: {project}") continue return { "total": total, "start_at": start_at, "max_results": max_results, "projects": results } except Exception as e: logger.error(f"Error getting projects: {str(e)}") return {"error": f"Error getting projects: {str(e)}"} def add_comment(self, args: CommentArgs) -> Dict[str, Any]: """Add a comment to a JIRA issue.""" logger.info(f"Adding comment to issue {args.issue_key}") try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Get the issue to verify it exists issue = self.client.issue(args.issue_key) # Add the comment using the client's add_comment method if args.visibility: comment = self.client.add_comment( issue=args.issue_key, body=args.comment, visibility=args.visibility ) else: comment = self.client.add_comment( issue=args.issue_key, body=args.comment ) logger.info(f"Successfully added comment to {args.issue_key}") return { "id": comment.id, "issue_key": args.issue_key, "body": comment.body, "author": comment.author.displayName if hasattr(comment.author, 'displayName') else comment.author.name, "created": str(comment.created), "updated": str(comment.updated) if hasattr(comment, 'updated') else None } except Exception as e: logger.error(f"Error adding comment: {str(e)}") return {"error": f"Error adding comment: {str(e)}"} def log_work(self, args: LogWorkArgs) -> Dict[str, Any]: """Log work on a JIRA issue.""" logger.info(f"Logging work on issue {args.issue_key}: {args.time_spent}") try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Create worklog entry worklog = self.client.add_worklog( issue=args.issue_key, timeSpent=args.time_spent, comment=args.comment if args.comment else None, started=args.started_at if args.started_at else None ) logger.info(f"Successfully logged work: {worklog.id}") return { "id": worklog.id, "issue_key": args.issue_key, "time_spent": args.time_spent, "author": worklog.author.displayName, "created": str(worklog.created) } except Exception as e: logger.error(f"Error logging work: {str(e)}") return {"error": f"Error logging work: {str(e)}"} def get_comments(self, args: GetCommentsArgs) -> Dict[str, Any]: """Get comments for a JIRA issue.""" logger.info(f"Getting comments for issue {args.issue_key}") try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Get the issue issue = self.client.issue(args.issue_key) # Get comments comments = self.client.comments(issue) # Apply pagination total = len(comments) paginated_comments = comments[args.start_at:args.start_at + args.max_results] # Format the comments formatted_comments = [] for comment in paginated_comments: formatted_comment = { "id": comment.id, "author": comment.author.displayName if hasattr(comment.author, 'displayName') else str(comment.author), "body": comment.body, "created": str(comment.created), "updated": str(comment.updated) if hasattr(comment, 'updated') else None, } # Add any additional fields that might be useful if hasattr(comment, 'visibility') and comment.visibility: formatted_comment["visibility"] = comment.visibility formatted_comments.append(formatted_comment) return { "issue_key": args.issue_key, "total": total, "start_at": args.start_at, "max_results": args.max_results, "comments": formatted_comments } except Exception as e: logger.error(f"Error getting comments: {str(e)}") return {"error": f"Error getting comments: {str(e)}"} def clone_issue(self, args: CloneIssueArgs) -> Dict[str, Any]: """Clone a JIRA issue.""" logger.info(f"Cloning issue {args.source_issue_key}") try: if not self._client: if not self.connect(): return {"error": "Not connected to JIRA"} # Get the source issue source_issue = self.client.issue(args.source_issue_key) # Extract data from source issue source_project = source_issue.fields.project.key target_project = args.project_key or source_project # Prepare issue fields issue_dict = { 'project': target_project, 'summary': args.summary or f"Clone of {source_issue.fields.summary}", 'issuetype': {'name': args.issue_type or source_issue.fields.issuetype.name} } # Add description if args.description is not None: issue_dict['description'] = args.description else: issue_dict['description'] = source_issue.fields.description # Add priority if available if args.priority is not None: issue_dict['priority'] = {'name': args.priority} elif hasattr(source_issue.fields, 'priority') and source_issue.fields.priority: issue_dict['priority'] = {'name': source_issue.fields.priority.name} # Add assignee if available if args.assignee is not None: issue_dict['assignee'] = {'name': args.assignee} elif hasattr(source_issue.fields, 'assignee') and source_issue.fields.assignee: # Handle different ways JIRA might represent users if hasattr(source_issue.fields.assignee, 'accountId'): issue_dict['assignee'] = {'accountId': source_issue.fields.assignee.accountId} elif hasattr(source_issue.fields.assignee, 'key'): issue_dict['assignee'] = {'key': source_issue.fields.assignee.key} elif hasattr(source_issue.fields.assignee, 'name'): issue_dict['assignee'] = {'name': source_issue.fields.assignee.name} # Add labels if available if args.labels is not None: issue_dict['labels'] = args.labels elif hasattr(source_issue.fields, 'labels') and source_issue.fields.labels: issue_dict['labels'] = source_issue.fields.labels # Handle custom fields - copy over from source issue custom_field_prefixes = ['customfield_'] source_custom_fields = {} # Extract custom fields from source issue for field_name in dir(source_issue.fields): if any(field_name.startswith(prefix) for prefix in custom_field_prefixes): field_value = getattr(source_issue.fields, field_name) if field_value is not None: # Handle complex field values that might be objects if hasattr(field_value, 'id'): source_custom_fields[field_name] = {'id': field_value.id} elif hasattr(field_value, 'value'): source_custom_fields[field_name] = {'value': field_value.value} elif hasattr(field_value, 'name'): source_custom_fields[field_name] = {'name': field_value.name} else: source_custom_fields[field_name] = field_value # Use custom fields from source issue, overridden by any explicitly set fields issue_dict.update(source_custom_fields) # Override with user-specified custom fields if args.custom_fields: issue_dict.update(args.custom_fields) # Collect information about source issue for reference source_info = { "key": source_issue.key, "summary": source_issue.fields.summary, "project": source_project, "issue_type": source_issue.fields.issuetype.name, "status": source_issue.fields.status.name, "priority": source_issue.fields.priority.name if hasattr(source_issue.fields, 'priority') and source_issue.fields.priority else None, "assignee": source_issue.fields.assignee.displayName if hasattr(source_issue.fields, 'assignee') and source_issue.fields.assignee else None, "reporter": source_issue.fields.reporter.displayName if hasattr(source_issue.fields, 'reporter') and source_issue.fields.reporter else None, "created": source_issue.fields.created, "updated": source_issue.fields.updated, "custom_fields": source_custom_fields } # Create the new issue new_issue = self.client.create_issue(fields=issue_dict) # Add link to source issue if requested if args.add_link_to_source: try: self.client.create_issue_link( type="Cloned", inwardIssue=new_issue.key, outwardIssue=source_issue.key, comment={ "body": f"This issue was cloned from {source_issue.key}." } ) logger.info(f"Added link from {new_issue.key} to source issue {source_issue.key}") except Exception as e: logger.warning(f"Failed to create issue link: {str(e)}") # Copy attachments if requested if args.copy_attachments: try: attachments = source_issue.fields.attachment if attachments: for attachment in attachments: # Download the attachment attachment_data = self.client.attachment(attachment.id) # Upload to the new issue self.client.add_attachment( issue=new_issue.key, attachment=attachment_data.get() ) logger.info(f"Copied {len(attachments)} attachments to {new_issue.key}") except Exception as e: logger.warning(f"Failed to copy attachments: {str(e)}") # Return the created issue details along with source info return { "key": new_issue.key, "summary": new_issue.fields.summary, "description": new_issue.fields.description, "status": new_issue.fields.status.name, "assignee": new_issue.fields.assignee.displayName if hasattr(new_issue.fields, 'assignee') and new_issue.fields.assignee else None, "reporter": new_issue.fields.reporter.displayName if hasattr(new_issue.fields, 'reporter') and new_issue.fields.reporter else None, "created": new_issue.fields.created, "updated": new_issue.fields.updated, "issue_type": new_issue.fields.issuetype.name, "priority": new_issue.fields.priority.name if hasattr(new_issue.fields, 'priority') and new_issue.fields.priority else None, "labels": new_issue.fields.labels if hasattr(new_issue.fields, 'labels') else [], "source_issue": source_info, "attachments_copied": args.copy_attachments, "link_added": args.add_link_to_source } except Exception as e: logger.error(f"Error cloning issue: {str(e)}") return {"error": f"Error cloning issue: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/klauseduard/vibe-coded-jira-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server