issues.py•60.7 kB
"""Module for Jira issue operations."""
import logging
from collections import defaultdict
from typing import Any
from requests.exceptions import HTTPError
from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraIssue
from ..models.jira.common import JiraChangelog
from ..utils import parse_date
from .client import JiraClient
from .constants import DEFAULT_READ_JIRA_FIELDS
from .protocols import (
AttachmentsOperationsProto,
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
ProjectsOperationsProto,
UsersOperationsProto,
)
logger = logging.getLogger("mcp-jira")
class IssuesMixin(
JiraClient,
AttachmentsOperationsProto,
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
ProjectsOperationsProto,
UsersOperationsProto,
):
"""Mixin for Jira issue operations."""
def get_issue(
self,
issue_key: str,
expand: str | None = None,
comment_limit: int | str | None = 10,
fields: str | list[str] | tuple[str, ...] | set[str] | None = None,
properties: str | list[str] | None = None,
update_history: bool = True,
) -> JiraIssue:
"""
Get a Jira issue by key.
Args:
issue_key: The issue key (e.g., PROJECT-123)
expand: Fields to expand in the response
comment_limit: Maximum number of comments to include, or "all"
fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
properties: Issue properties to return (comma-separated string or list)
update_history: Whether to update the issue view history
Returns:
JiraIssue model with issue data and metadata
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error retrieving the issue
"""
try:
# Obtain the projects filter from the config.
# These should NOT be overridden by the request.
filter_to_use = self.config.projects_filter
# Apply projects filter if present
if filter_to_use:
# Split projects filter by commas and handle possible whitespace
projects = [p.strip() for p in filter_to_use.split(",")]
# Obtain the project key from issue_key
issue_key_project = issue_key.split("-")[0]
if issue_key_project not in projects:
# If the project key not in the filter, return an empty issue
msg = (
"Issue with project prefix "
f"'{issue_key_project}' are restricted by configuration"
)
raise ValueError(msg)
# Determine fields_param: use provided fields or default from constant
fields_param = fields
if fields_param is None:
fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
elif isinstance(fields_param, list | tuple | set):
fields_param = ",".join(fields_param)
# Ensure necessary fields are included based on special parameters
if (
fields_param == ",".join(DEFAULT_READ_JIRA_FIELDS)
or fields_param == "*all"
):
# Default fields are being used - preserve the order
default_fields_list = (
fields_param.split(",")
if fields_param != "*all"
else list(DEFAULT_READ_JIRA_FIELDS)
)
additional_fields = []
# Add appropriate fields based on expand parameter
if expand:
expand_params = expand.split(",")
if (
"changelog" in expand_params
and "changelog" not in default_fields_list
and "changelog" not in additional_fields
):
additional_fields.append("changelog")
if (
"renderedFields" in expand_params
and "rendered" not in default_fields_list
and "rendered" not in additional_fields
):
additional_fields.append("rendered")
# Add appropriate fields based on properties parameter
if (
properties
and "properties" not in default_fields_list
and "properties" not in additional_fields
):
additional_fields.append("properties")
# Combine default fields with additional fields, preserving order
if additional_fields:
fields_param = ",".join(default_fields_list + additional_fields)
# Handle non-default fields string
# Build expand parameter if provided
expand_param = expand
# Convert properties to proper format if it's a list
properties_param = properties
if properties and isinstance(properties, list | tuple | set):
properties_param = ",".join(properties)
# Get the issue data with all parameters
issue = self.jira.get_issue(
issue_key,
expand=expand_param,
fields=fields_param,
properties=properties_param,
update_history=update_history,
)
if not issue:
msg = f"Issue {issue_key} not found"
raise ValueError(msg)
if not isinstance(issue, dict):
msg = (
f"Unexpected return value type from `jira.get_issue`: {type(issue)}"
)
logger.error(msg)
raise TypeError(msg)
# Extract fields data, safely handling None
fields_data = issue.get("fields", {}) or {}
# Get comments if needed
if "comment" in fields_data:
comment_limit_int = self._normalize_comment_limit(comment_limit)
comments = self._get_issue_comments_if_needed(
issue_key, comment_limit_int
)
# Add comments to the issue data for processing by the model
fields_data["comment"]["comments"] = comments
# Extract epic information
try:
epic_info = self._extract_epic_information(issue)
except Exception as e:
logger.warning(f"Error extracting epic information: {str(e)}")
epic_info = {"epic_key": None, "epic_name": None}
# If this is linked to an epic, add the epic information to the fields
if epic_info.get("epic_key"):
try:
# Get field IDs for epic fields
field_ids = self.get_field_ids_to_epic()
# Add epic link field if it doesn't exist
if (
"epic_link" in field_ids
and field_ids["epic_link"] not in fields_data
):
fields_data[field_ids["epic_link"]] = epic_info["epic_key"]
# Add epic name field if it doesn't exist
if (
epic_info.get("epic_name")
and "epic_name" in field_ids
and field_ids["epic_name"] not in fields_data
):
fields_data[field_ids["epic_name"]] = epic_info["epic_name"]
except Exception as e:
logger.warning(f"Error setting epic fields: {str(e)}")
# Update the issue data with the fields
issue["fields"] = fields_data
# Create and return the JiraIssue model, passing requested_fields
return JiraIssue.from_api_response(
issue,
base_url=self.config.url if hasattr(self, "config") else None,
requested_fields=fields,
)
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API ({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
raise
except Exception as e:
error_msg = str(e)
logger.error(f"Error retrieving issue {issue_key}: {error_msg}")
raise Exception(f"Error retrieving issue {issue_key}: {error_msg}") from e
def _normalize_comment_limit(self, comment_limit: int | str | None) -> int | None:
"""
Normalize the comment limit to an integer or None.
Args:
comment_limit: The comment limit as int, string, or None
Returns:
Normalized comment limit as int or None
"""
if comment_limit is None:
return None
if isinstance(comment_limit, int):
return comment_limit
if comment_limit == "all":
return None # No limit
# Try to convert to int
try:
return int(comment_limit)
except ValueError:
# If conversion fails, default to 10
return 10
def _get_issue_comments_if_needed(
self, issue_key: str, comment_limit: int | None
) -> list[dict]:
"""
Get comments for an issue if needed.
Args:
issue_key: The issue key
comment_limit: Maximum number of comments to include
Returns:
List of comments
"""
if comment_limit is None or comment_limit > 0:
try:
response = self.jira.issue_get_comments(issue_key)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.issue_get_comments`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
comments = response["comments"]
# Limit comments if needed
if comment_limit is not None:
comments = comments[:comment_limit]
return comments
except Exception as e:
logger.warning(f"Error getting comments for {issue_key}: {str(e)}")
return []
return []
def _extract_epic_information(self, issue: dict) -> dict[str, str | None]:
"""
Extract epic information from an issue.
Args:
issue: The issue data
Returns:
Dictionary with epic information
"""
# Initialize with default values
epic_info = {
"epic_key": None,
"epic_name": None,
"epic_summary": None,
"is_epic": False,
}
try:
fields = issue.get("fields", {}) or {}
issue_type = fields.get("issuetype", {}).get("name", "").lower()
# Get field IDs for epic fields
try:
field_ids = self.get_field_ids_to_epic()
except Exception as e:
logger.warning(f"Error getting Jira fields: {str(e)}")
field_ids = {}
# Check if this is an epic
if issue_type == "epic":
epic_info["is_epic"] = True
# Use the discovered field ID for epic name
if "epic_name" in field_ids and field_ids["epic_name"] in fields:
epic_info["epic_name"] = fields.get(field_ids["epic_name"], "")
# If not an epic, check for epic link
elif "epic_link" in field_ids:
epic_link_field = field_ids["epic_link"]
if epic_link_field in fields and fields[epic_link_field]:
epic_key = fields[epic_link_field]
epic_info["epic_key"] = epic_key
# Try to get epic details
try:
epic = self.jira.get_issue(
epic_key,
expand=None,
fields=None,
properties=None,
update_history=True,
)
if not isinstance(epic, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(epic)}"
logger.error(msg)
raise TypeError(msg)
epic_fields = epic.get("fields", {}) or {}
# Get epic name using the discovered field ID
if "epic_name" in field_ids:
epic_info["epic_name"] = epic_fields.get(
field_ids["epic_name"], ""
)
epic_info["epic_summary"] = epic_fields.get("summary", "")
except Exception as e:
logger.warning(
f"Error getting epic details for {epic_key}: {str(e)}"
)
except Exception as e:
logger.warning(f"Error extracting epic information: {str(e)}")
return epic_info
def _format_issue_content(
self,
issue_key: str,
issue: dict,
description: str,
comments: list[dict],
created_date: str,
epic_info: dict[str, str | None],
) -> str:
"""
Format issue content for display.
Args:
issue_key: The issue key
issue: The issue data
description: The issue description
comments: The issue comments
created_date: The formatted creation date
epic_info: Epic information
Returns:
Formatted issue content
"""
fields = issue.get("fields", {})
# Basic issue information
summary = fields.get("summary", "")
status = fields.get("status", {}).get("name", "")
issue_type = fields.get("issuetype", {}).get("name", "")
# Format content
content = [f"# {issue_key}: {summary}"]
content.append(f"**Type**: {issue_type}")
content.append(f"**Status**: {status}")
content.append(f"**Created**: {created_date}")
# Add reporter
reporter = fields.get("reporter", {})
reporter_name = reporter.get("displayName", "") or reporter.get("name", "")
if reporter_name:
content.append(f"**Reporter**: {reporter_name}")
# Add assignee
assignee = fields.get("assignee", {})
assignee_name = assignee.get("displayName", "") or assignee.get("name", "")
if assignee_name:
content.append(f"**Assignee**: {assignee_name}")
# Add epic information
if epic_info["is_epic"]:
content.append(f"**Epic Name**: {epic_info['epic_name']}")
elif epic_info["epic_key"]:
content.append(
f"**Epic**: [{epic_info['epic_key']}] {epic_info['epic_summary']}"
)
# Add description
if description:
content.append("\n## Description\n")
content.append(description)
# Add comments
if comments:
content.append("\n## Comments\n")
for comment in comments:
author = comment.get("author", {})
author_name = author.get("displayName", "") or author.get("name", "")
comment_body = self._clean_text(comment.get("body", ""))
if author_name and comment_body:
comment_date = comment.get("created", "")
if comment_date:
comment_date = parse_date(comment_date)
content.append(f"**{author_name}** ({comment_date}):")
else:
content.append(f"**{author_name}**:")
content.append(f"{comment_body}\n")
return "\n".join(content)
def _create_issue_metadata(
self,
issue_key: str,
issue: dict,
comments: list[dict],
created_date: str,
epic_info: dict[str, str | None],
) -> dict[str, Any]:
"""
Create metadata for a Jira issue.
Args:
issue_key: The issue key
issue: The issue data
comments: The issue comments
created_date: The formatted creation date
epic_info: Epic information
Returns:
Metadata dictionary
"""
fields = issue.get("fields", {})
# Initialize metadata
metadata = {
"key": issue_key,
"title": fields.get("summary", ""),
"status": fields.get("status", {}).get("name", ""),
"type": fields.get("issuetype", {}).get("name", ""),
"created": created_date,
"url": f"{self.config.url}/browse/{issue_key}",
}
# Add assignee if available
assignee = fields.get("assignee", {})
if assignee:
metadata["assignee"] = assignee.get("displayName", "") or assignee.get(
"name", ""
)
# Add epic information
if epic_info["is_epic"]:
metadata["is_epic"] = True
metadata["epic_name"] = epic_info["epic_name"]
elif epic_info["epic_key"]:
metadata["epic_key"] = epic_info["epic_key"]
metadata["epic_name"] = epic_info["epic_name"]
metadata["epic_summary"] = epic_info["epic_summary"]
# Add comment count
metadata["comment_count"] = len(comments)
return metadata
def create_issue(
self,
project_key: str,
summary: str,
issue_type: str,
description: str = "",
assignee: str | None = None,
components: list[str] | None = None,
**kwargs: Any, # noqa: ANN401 - Dynamic field types are necessary for Jira API
) -> JiraIssue:
"""
Create a new Jira issue.
Args:
project_key: The key of the project
summary: The issue summary
issue_type: The type of issue to create
description: The issue description
assignee: The username or account ID of the assignee
components: List of component names to assign (e.g., ["Frontend", "API"])
**kwargs: Additional fields to set on the issue
Returns:
JiraIssue model representing the created issue
Raises:
Exception: If there is an error creating the issue
"""
try:
# Validate required fields
if not project_key:
raise ValueError("Project key is required")
if not summary:
raise ValueError("Summary is required")
if not issue_type:
raise ValueError("Issue type is required")
# Handle Epic and Subtask issue type names across different languages
actual_issue_type = issue_type
if self._is_epic_issue_type(issue_type) and issue_type.lower() == "epic":
# If the user provided "Epic" but we need to find the localized name
epic_type_name = self._find_epic_issue_type_name(project_key)
if epic_type_name:
actual_issue_type = epic_type_name
logger.info(
f"Using localized Epic issue type name: {actual_issue_type}"
)
elif issue_type.lower() in ["subtask", "sub-task"]:
# If the user provided "Subtask" but we need to find the localized name
subtask_type_name = self._find_subtask_issue_type_name(project_key)
if subtask_type_name:
actual_issue_type = subtask_type_name
logger.info(
f"Using localized Subtask issue type name: {actual_issue_type}"
)
# Prepare fields
fields: dict[str, Any] = {
"project": {"key": project_key},
"summary": summary,
"issuetype": {"name": actual_issue_type},
}
# Add description if provided (convert from Markdown to Jira format)
if description:
fields["description"] = self._markdown_to_jira(description)
# Add assignee if provided
if assignee:
try:
# _get_account_id now returns the correct identifier (accountId for cloud, name for server)
assignee_identifier = self._get_account_id(assignee)
self._add_assignee_to_fields(fields, assignee_identifier)
except ValueError as e:
logger.warning(f"Could not assign issue: {str(e)}")
# Add components if provided
if components:
if isinstance(components, list):
# Filter out any None or empty/whitespace-only strings
valid_components = [
comp_name.strip()
for comp_name in components
if isinstance(comp_name, str) and comp_name.strip()
]
if valid_components:
# Format as list of {"name": ...} dicts for the API
fields["components"] = [
{"name": comp_name} for comp_name in valid_components
]
# Make a copy of kwargs to preserve original values for two-step Epic creation
kwargs_copy = kwargs.copy()
# Prepare epic fields if this is an epic
# This step now stores epic-specific fields in kwargs for post-creation update
if self._is_epic_issue_type(issue_type):
self._prepare_epic_fields(fields, summary, kwargs)
# Prepare parent field if this is a subtask
if issue_type.lower() == "subtask" or issue_type.lower() == "sub-task":
self._prepare_parent_fields(fields, kwargs)
# Allow parent field for all issue types when explicitly provided
elif "parent" in kwargs:
self._prepare_parent_fields(fields, kwargs)
# Process **kwargs using the dynamic field map
self._process_additional_fields(fields, kwargs_copy)
# Create the issue
response = self.jira.create_issue(fields=fields)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.create_issue`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Get the created issue key
issue_key = response.get("key")
if not issue_key:
error_msg = "No issue key in response"
raise ValueError(error_msg)
# For Epics, perform the second step: update Epic-specific fields
if self._is_epic_issue_type(issue_type):
# Check if we have any stored Epic fields to update
has_epic_fields = any(k.startswith("__epic_") for k in kwargs)
if has_epic_fields:
logger.info(
f"Performing post-creation update for Epic {issue_key} with Epic-specific fields"
)
try:
return self.update_epic_fields(issue_key, kwargs)
except Exception as update_error:
logger.error(
f"Error during post-creation update of Epic {issue_key}: {str(update_error)}"
)
logger.info(
"Continuing with the original Epic that was successfully created"
)
# Get the full issue data and convert to JiraIssue model
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
except Exception as e:
self._handle_create_issue_error(e, issue_type)
raise # Re-raise after logging
def _is_epic_issue_type(self, issue_type: str) -> bool:
"""
Check if an issue type is an Epic, handling localized names.
Args:
issue_type: The issue type name to check
Returns:
True if the issue type is an Epic, False otherwise
"""
# Common Epic names in different languages
epic_names = {
"epic", # English
"에픽", # Korean
"エピック", # Japanese
"史诗", # Chinese (Simplified)
"史詩", # Chinese (Traditional)
"épica", # Spanish/Portuguese
"épique", # French
"epik", # Turkish
"эпик", # Russian
"епік", # Ukrainian
}
return issue_type.lower() in epic_names or "epic" in issue_type.lower()
def _find_epic_issue_type_name(self, project_key: str) -> str | None:
"""
Find the actual Epic issue type name for a project.
Args:
project_key: The project key
Returns:
The Epic issue type name if found, None otherwise
"""
try:
issue_types = self.get_project_issue_types(project_key)
for issue_type in issue_types:
type_name = issue_type.get("name", "")
if self._is_epic_issue_type(type_name):
return type_name
return None
except Exception as e:
logger.warning(f"Could not get issue types for project {project_key}: {e}")
return None
def _find_subtask_issue_type_name(self, project_key: str) -> str | None:
"""
Find the actual Subtask issue type name for a project.
Args:
project_key: The project key
Returns:
The Subtask issue type name if found, None otherwise
"""
try:
issue_types = self.get_project_issue_types(project_key)
for issue_type in issue_types:
# Check the subtask field - this is the most reliable way
if issue_type.get("subtask", False):
return issue_type.get("name")
return None
except Exception as e:
logger.warning(f"Could not get issue types for project {project_key}: {e}")
return None
def _prepare_epic_fields(
self, fields: dict[str, Any], summary: str, kwargs: dict[str, Any]
) -> None:
"""
Prepare fields for epic creation.
This method delegates to the prepare_epic_fields method in EpicsMixin.
Args:
fields: The fields dictionary to update
summary: The epic summary
kwargs: Additional fields from the user
"""
# Extract project_key from fields if available
project_key = None
if "project" in fields:
if isinstance(fields["project"], dict):
project_key = fields["project"].get("key")
elif isinstance(fields["project"], str):
project_key = fields["project"]
# Delegate to EpicsMixin.prepare_epic_fields with project_key
# Since JiraFetcher inherits from both IssuesMixin and EpicsMixin,
# this will correctly use the prepare_epic_fields method from EpicsMixin
# which implements the two-step Epic creation approach
self.prepare_epic_fields(fields, summary, kwargs, project_key)
def _prepare_parent_fields(
self, fields: dict[str, Any], kwargs: dict[str, Any]
) -> None:
"""
Prepare fields for parent relationship.
Args:
fields: The fields dictionary to update
kwargs: Additional fields from the user
Raises:
ValueError: If parent issue key is not specified for a subtask
"""
if "parent" in kwargs:
parent_key = kwargs.get("parent")
if parent_key:
fields["parent"] = {"key": parent_key}
# Remove parent from kwargs to avoid double processing
kwargs.pop("parent", None)
elif "issuetype" in fields and fields["issuetype"]["name"].lower() in (
"subtask",
"sub-task",
):
# Only raise error if issue type is subtask and parent is missing
raise ValueError(
"Issue type is a sub-task but parent issue key or id not specified. Please provide a 'parent' parameter with the parent issue key."
)
def _add_assignee_to_fields(self, fields: dict[str, Any], assignee: str) -> None:
"""
Add assignee to issue fields.
Args:
fields: The fields dictionary to update
assignee: The assignee account ID
"""
# Cloud instance uses accountId
if self.config.is_cloud:
fields["assignee"] = {"accountId": assignee}
else:
# Server/DC might use name instead of accountId
fields["assignee"] = {"name": assignee}
def _process_additional_fields(
self, fields: dict[str, Any], kwargs: dict[str, Any]
) -> None:
"""
Processes keyword arguments to add standard or custom fields to the issue fields dictionary.
Uses the dynamic field map from FieldsMixin to identify field IDs.
Args:
fields: The fields dictionary to update
kwargs: Additional fields provided via **kwargs
"""
# Ensure field map is loaded/cached
field_map = (
self._generate_field_map()
) # Ensure map is ready (method from FieldsMixin)
if not field_map:
logger.error(
"Could not generate field map. Cannot process additional fields."
)
return
# Process each kwarg
# Iterate over a copy to allow modification of the original kwargs if needed elsewhere
for key, value in kwargs.copy().items():
# Skip keys used internally for epic/parent handling or explicitly handled args like assignee/components
if key.startswith("__epic_") or key in ("parent", "assignee", "components"):
continue
normalized_key = key.lower()
api_field_id = None
# 1. Check if key is a known field name in the map
if normalized_key in field_map:
api_field_id = field_map[normalized_key]
logger.debug(
f"Identified field '{key}' as '{api_field_id}' via name map."
)
# 2. Check if key is a direct custom field ID
elif key.startswith("customfield_"):
api_field_id = key
logger.debug(f"Identified field '{key}' as direct custom field ID.")
# 3. Check if key is a standard system field ID (like 'summary', 'priority')
elif key in field_map: # Check original case for system fields
api_field_id = field_map[key]
logger.debug(f"Identified field '{key}' as standard system field ID.")
if api_field_id:
# Get the full field definition for formatting context if needed
field_definition = self.get_field_by_id(
api_field_id
) # From FieldsMixin
formatted_value = self._format_field_value_for_write(
api_field_id, value, field_definition
)
if formatted_value is not None: # Only add if formatting didn't fail
fields[api_field_id] = formatted_value
logger.debug(
f"Added field '{api_field_id}' from kwarg '{key}': {formatted_value}"
)
else:
logger.warning(
f"Skipping field '{key}' due to formatting error or invalid value."
)
else:
# 4. Unrecognized key - log a warning and skip
logger.warning(
f"Ignoring unrecognized field '{key}' passed via kwargs."
)
def _format_field_value_for_write(
self, field_id: str, value: Any, field_definition: dict | None
) -> Any:
"""Formats field values for the Jira API."""
# Get schema type if definition is available
schema_type = (
field_definition.get("schema", {}).get("type") if field_definition else None
)
# Prefer name from definition if available, else use ID for logging/lookup
field_name_for_format = (
field_definition.get("name", field_id) if field_definition else field_id
)
# Example formatting rules based on standard field names (use lowercase for comparison)
normalized_name = field_name_for_format.lower()
if normalized_name == "priority":
if isinstance(value, str):
return {"name": value}
elif isinstance(value, dict) and ("name" in value or "id" in value):
return value # Assume pre-formatted
else:
logger.warning(
f"Invalid format for priority field: {value}. Expected string name or dict."
)
return None # Or raise error
elif normalized_name == "labels":
if isinstance(value, list) and all(isinstance(item, str) for item in value):
return value
# Allow comma-separated string if passed via additional_fields JSON string
elif isinstance(value, str):
return [label.strip() for label in value.split(",") if label.strip()]
else:
logger.warning(
f"Invalid format for labels field: {value}. Expected list of strings or comma-separated string."
)
return None
elif normalized_name in ["fixversions", "versions", "components"]:
# These expect lists of objects, typically {"name": "..."} or {"id": "..."}
if isinstance(value, list):
formatted_list = []
for item in value:
if isinstance(item, str):
formatted_list.append({"name": item}) # Convert simple strings
elif isinstance(item, dict) and ("name" in item or "id" in item):
formatted_list.append(item) # Keep pre-formatted dicts
else:
logger.warning(
f"Invalid item format in {normalized_name} list: {item}"
)
return formatted_list
else:
logger.warning(
f"Invalid format for {normalized_name} field: {value}. Expected list."
)
return None
elif normalized_name == "reporter":
if isinstance(value, str):
try:
reporter_identifier = self._get_account_id(value)
if self.config.is_cloud:
return {"accountId": reporter_identifier}
else:
return {"name": reporter_identifier}
except ValueError as e:
logger.warning(f"Could not format reporter field: {str(e)}")
return None
elif isinstance(value, dict) and ("name" in value or "accountId" in value):
return value # Assume pre-formatted
else:
logger.warning(f"Invalid format for reporter field: {value}")
return None
# Add more formatting rules for other standard fields based on schema_type or field_id
elif normalized_name == "duedate":
if isinstance(value, str): # Basic check, could add date validation
return value
else:
logger.warning(
f"Invalid format for duedate field: {value}. Expected YYYY-MM-DD string."
)
return None
elif schema_type == "datetime" and isinstance(value, str):
# Example: Ensure datetime fields are in ISO format if needed by API
try:
dt = parse_date(value) # Assuming parse_date handles various inputs
return (
dt.isoformat() if dt else value
) # Return ISO or original if parse fails
except Exception:
logger.warning(
f"Could not parse datetime for field {field_id}: {value}"
)
return value # Return original on error
# Default: return value as is if no specific formatting needed/identified
return value
def _handle_create_issue_error(self, exception: Exception, issue_type: str) -> None:
"""
Handle errors when creating an issue.
Args:
exception: The exception that occurred
issue_type: The type of issue being created
"""
error_msg = str(exception)
# Check for specific error types
if "epic name" in error_msg.lower() or "epicname" in error_msg.lower():
logger.error(
f"Error creating {issue_type}: {error_msg}. "
"Try specifying an epic_name in the additional fields"
)
elif "customfield" in error_msg.lower():
logger.error(
f"Error creating {issue_type}: {error_msg}. "
"This may be due to a required custom field"
)
else:
logger.error(f"Error creating {issue_type}: {error_msg}")
def update_issue(
self,
issue_key: str,
fields: dict[str, Any] | None = None,
**kwargs: Any, # noqa: ANN401 - Dynamic field types are necessary for Jira API
) -> JiraIssue:
"""
Update a Jira issue.
Args:
issue_key: The key of the issue to update
fields: Dictionary of fields to update
**kwargs: Additional fields to update. Special fields include:
- attachments: List of file paths to upload as attachments
- status: New status for the issue (handled via transitions)
- assignee: New assignee for the issue
Returns:
JiraIssue model representing the updated issue
Raises:
Exception: If there is an error updating the issue
"""
try:
# Validate required fields
if not issue_key:
raise ValueError("Issue key is required")
update_fields = fields or {}
attachments_result = None
# Convert description from Markdown to Jira format if present
if "description" in update_fields:
update_fields["description"] = self._markdown_to_jira(
update_fields["description"]
)
# Process kwargs
for key, value in kwargs.items():
if key == "status":
# Status changes are handled separately via transitions
# Add status to fields so _update_issue_with_status can find it
update_fields["status"] = value
return self._update_issue_with_status(issue_key, update_fields)
elif key == "attachments":
# Handle attachments separately - they're not part of fields update
if value and isinstance(value, list | tuple):
# We'll process attachments after updating fields
pass
else:
logger.warning(f"Invalid attachments value: {value}")
elif key == "assignee":
# Handle assignee updates, allow unassignment with None or empty string
if value is None or value == "":
update_fields["assignee"] = None
else:
try:
account_id = self._get_account_id(value)
self._add_assignee_to_fields(update_fields, account_id)
except ValueError as e:
logger.warning(f"Could not update assignee: {str(e)}")
elif key == "description":
# Handle description with markdown conversion
update_fields["description"] = self._markdown_to_jira(value)
else:
# Process regular fields using _process_additional_fields
# Create a temporary dict with just this field
field_kwargs = {key: value}
self._process_additional_fields(update_fields, field_kwargs)
# Update the issue fields
if update_fields:
self.jira.update_issue(
issue_key=issue_key, update={"fields": update_fields}
)
# Handle attachments if provided
if "attachments" in kwargs and kwargs["attachments"]:
try:
attachments_result = self.upload_attachments(
issue_key, kwargs["attachments"]
)
logger.info(
f"Uploaded attachments to {issue_key}: {attachments_result}"
)
except Exception as e:
logger.error(
f"Error uploading attachments to {issue_key}: {str(e)}"
)
# Continue with the update even if attachments fail
# Get the updated issue data and convert to JiraIssue model
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
issue = JiraIssue.from_api_response(issue_data)
# Add attachment results to the response if available
if attachments_result:
issue.custom_fields["attachment_results"] = attachments_result
return issue
except Exception as e:
error_msg = str(e)
logger.error(f"Error updating issue {issue_key}: {error_msg}")
raise ValueError(f"Failed to update issue {issue_key}: {error_msg}") from e
def _update_issue_with_status(
self, issue_key: str, fields: dict[str, Any]
) -> JiraIssue:
"""
Update an issue with a status change.
Args:
issue_key: The key of the issue to update
fields: Dictionary of fields to update
Returns:
JiraIssue model representing the updated issue
Raises:
Exception: If there is an error updating the issue
"""
# Extract status from fields and remove it for the standard update
status = fields.pop("status", None)
# First update any fields if needed
if fields:
self.jira.update_issue(issue_key=issue_key, fields=fields) # type: ignore[call-arg]
# If no status change is requested, return the issue
if not status:
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
# Get available transitions (uses TransitionsMixin's normalized implementation)
transitions = self.get_available_transitions(issue_key) # type: ignore[attr-defined]
# Extract status name or ID depending on what we received
status_name = None
status_id = None
# Handle different input formats for status
if isinstance(status, dict):
# Dictionary format: {"name": "In Progress"} or {"id": "123"}
status_name = status.get("name")
status_id = status.get("id")
elif isinstance(status, str):
# String format: could be a name or an ID
if status.isdigit():
status_id = status
else:
status_name = status
elif isinstance(status, int):
# Integer format: must be an ID
status_id = str(status)
else:
# Unknown format
logger.warning(
f"Unrecognized status format: {status} (type: {type(status)})"
)
status_name = str(status)
# Log what we're searching for
if status_name:
logger.info(f"Looking for transition to status name: '{status_name}'")
if status_id:
logger.info(f"Looking for transition with ID: '{status_id}'")
# Find the appropriate transition
transition_id = None
for transition in transitions:
# TransitionsMixin returns normalized transitions with 'to_status' field
transition_status_name = transition.get("to_status", "")
# Match by name (case-insensitive)
if (
status_name
and transition_status_name
and transition_status_name.lower() == status_name.lower()
):
transition_id = transition.get("id")
logger.info(
f"Found transition ID {transition_id} matching status name '{status_name}'"
)
break
# Direct transition ID match (if status is actually a transition ID)
if status_id and str(transition.get("id", "")) == str(status_id):
transition_id = transition.get("id")
logger.info(f"Using direct transition ID {transition_id}")
break
if not transition_id:
# Build list of available statuses from normalized transitions
available_statuses = []
for t in transitions:
# Include transition name and target status if available
transition_name = t.get("name", "")
to_status = t.get("to_status", "")
if to_status:
available_statuses.append(f"{transition_name} -> {to_status}")
elif transition_name:
available_statuses.append(transition_name)
available_statuses_str = (
", ".join(available_statuses) if available_statuses else "None found"
)
error_msg = (
f"Could not find transition to status '{status}'. "
f"Available transitions: {available_statuses_str}"
)
logger.error(error_msg)
raise ValueError(error_msg)
# Perform the transition
logger.info(f"Performing transition with ID {transition_id}")
self.jira.set_issue_status_by_transition_id(
issue_key=issue_key,
transition_id=(
int(transition_id)
if isinstance(transition_id, str) and transition_id.isdigit()
else transition_id
),
)
# Get the updated issue data
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
def delete_issue(self, issue_key: str) -> bool:
"""
Delete a Jira issue.
Args:
issue_key: The key of the issue to delete
Returns:
True if the issue was deleted successfully
Raises:
Exception: If there is an error deleting the issue
"""
try:
self.jira.delete_issue(issue_key)
return True
except Exception as e:
msg = f"Error deleting issue {issue_key}: {str(e)}"
logger.error(msg)
raise Exception(msg) from e
def _log_available_fields(self, fields: list[dict]) -> None:
"""
Log available fields for debugging.
Args:
fields: List of field definitions
"""
logger.debug("Available Jira fields:")
for field in fields:
logger.debug(
f"{field.get('id')}: {field.get('name')} ({field.get('schema', {}).get('type')})"
)
def _process_field_for_epic_data(
self, field: dict, field_ids: dict[str, str]
) -> None:
"""
Process a field for epic-related data.
Args:
field: The field data to process
field_ids: Dictionary of field IDs to update
"""
try:
field_id = field.get("id")
if not field_id:
return
# Skip non-custom fields
if not field_id.startswith("customfield_"):
return
name = field.get("name", "").lower()
# Look for field names related to epics
if "epic" in name:
if "link" in name:
field_ids["epic_link"] = field_id
field_ids["Epic Link"] = field_id
elif "name" in name:
field_ids["epic_name"] = field_id
field_ids["Epic Name"] = field_id
except Exception as e:
logger.warning(f"Error processing field for epic data: {str(e)}")
def _get_raw_transitions(self, issue_key: str) -> list[dict]:
"""
Get raw transition data from the Jira API.
This is an internal method that returns unprocessed transition data.
For normalized transitions with proper structure, use get_available_transitions()
from TransitionsMixin instead.
Args:
issue_key: The key of the issue
Returns:
List of raw transition data from the API
Raises:
Exception: If there is an error getting transitions
"""
try:
transitions = self.jira.get_issue_transitions(issue_key)
return transitions
except Exception as e:
logger.error(f"Error getting transitions for issue {issue_key}: {str(e)}")
raise Exception(
f"Error getting transitions for issue {issue_key}: {str(e)}"
) from e
def transition_issue(self, issue_key: str, transition_id: str) -> JiraIssue:
"""
Transition an issue to a new status.
Args:
issue_key: The key of the issue
transition_id: The ID of the transition to perform
Returns:
JiraIssue model with the updated issue data
Raises:
Exception: If there is an error transitioning the issue
"""
try:
self.jira.set_issue_status(
issue_key=issue_key, status_name=transition_id, fields=None, update=None
)
return self.get_issue(issue_key)
except Exception as e:
logger.error(f"Error transitioning issue {issue_key}: {str(e)}")
raise
def batch_create_issues(
self,
issues: list[dict[str, Any]],
validate_only: bool = False,
) -> list[JiraIssue]:
"""Create multiple Jira issues in a batch.
Args:
issues: List of issue dictionaries, each containing:
- project_key (str): Key of the project
- summary (str): Issue summary
- issue_type (str): Type of issue
- description (str, optional): Issue description
- assignee (str, optional): Username of assignee
- components (list[str], optional): List of component names
- **kwargs: Additional fields specific to your Jira instance
validate_only: If True, only validates the issues without creating them
Returns:
List of created JiraIssue objects
Raises:
ValueError: If any required fields are missing or invalid
MCPAtlassianAuthenticationError: If authentication fails
"""
if not issues:
return []
# Prepare issues for bulk creation
issue_updates = []
for issue_data in issues:
try:
# Extract and validate required fields
project_key = issue_data.pop("project_key", None)
summary = issue_data.pop("summary", None)
issue_type = issue_data.pop("issue_type", None)
description = issue_data.pop("description", "")
assignee = issue_data.pop("assignee", None)
components = issue_data.pop("components", None)
# Validate required fields
if not all([project_key, summary, issue_type]):
raise ValueError(
f"Missing required fields for issue: {project_key=}, {summary=}, {issue_type=}"
)
# Prepare fields dictionary
fields = {
"project": {"key": project_key},
"summary": summary,
"issuetype": {"name": issue_type},
}
# Add optional fields
if description:
fields["description"] = description
# Add assignee if provided
if assignee:
try:
# _get_account_id now returns the correct identifier (accountId for cloud, name for server)
assignee_identifier = self._get_account_id(assignee)
self._add_assignee_to_fields(fields, assignee_identifier)
except ValueError as e:
logger.warning(f"Could not assign issue: {str(e)}")
# Add components if provided
if components:
if isinstance(components, list):
valid_components = [
comp_name.strip()
for comp_name in components
if isinstance(comp_name, str) and comp_name.strip()
]
if valid_components:
fields["components"] = [
{"name": comp_name} for comp_name in valid_components
]
# Add any remaining custom fields
self._process_additional_fields(fields, issue_data)
if validate_only:
# For validation, just log the issue that would be created
logger.info(
f"Validated issue creation: {project_key} - {summary} ({issue_type})"
)
continue
# Add to bulk creation list
issue_updates.append({"fields": fields})
except Exception as e:
logger.error(f"Failed to prepare issue for creation: {str(e)}")
if not issue_updates:
raise
if validate_only:
return []
try:
# Call Jira's bulk create endpoint
response = self.jira.create_issues(issue_updates)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.create_issues`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Process results
created_issues = []
for issue_info in response.get("issues", []):
issue_key = issue_info.get("key")
if issue_key:
try:
# Fetch the full issue data
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
created_issues.append(
JiraIssue.from_api_response(
issue_data,
base_url=self.config.url
if hasattr(self, "config")
else None,
)
)
except Exception as e:
logger.error(
f"Error fetching created issue {issue_key}: {str(e)}"
)
# Log any errors from the bulk creation
errors = response.get("errors", [])
if errors:
for error in errors:
logger.error(f"Bulk creation error: {error}")
return created_issues
except Exception as e:
logger.error(f"Error in bulk issue creation: {str(e)}")
raise
def batch_get_changelogs(
self, issue_ids_or_keys: list[str], fields: list[str] | None = None
) -> list[JiraIssue]:
"""
Get changelogs for multiple issues in a batch. Repeatly fetch data if necessary.
Warning:
This function is only avaiable on Jira Cloud.
Args:
issue_ids_or_keys: List of issue IDs or keys
fields: Filter the changelogs by fields, e.g. ['status', 'assignee']. Default to None for all fields.
Returns:
List of JiraIssue objects that only contain changelogs and id
"""
if not self.config.is_cloud:
error_msg = "Batch get issue changelogs is only available on Jira Cloud."
logger.error(error_msg)
raise NotImplementedError(error_msg)
# Get paged api results
paged_api_results = self.get_paged(
method="post",
url=self.jira.resource_url("changelog/bulkfetch"),
params_or_json={
"fieldIds": fields,
"issueIdsOrKeys": issue_ids_or_keys,
},
)
# Save (issue_id, changelogs)
issue_changelog_results: defaultdict[str, list[JiraChangelog]] = defaultdict(
list
)
for api_result in paged_api_results:
for data in api_result.get("issueChangeLogs", []):
issue_id = data.get("issueId", "")
changelogs = [
JiraChangelog.from_api_response(changelog_data)
for changelog_data in data.get("changeHistories", [])
]
issue_changelog_results[issue_id].extend(changelogs)
issues = [
JiraIssue(id=issue_id, changelogs=changelogs)
for issue_id, changelogs in issue_changelog_results.items()
]
return issues