issue.py•30.3 kB
"""
Jira issue models.
This module provides Pydantic models for Jira issues.
"""
import logging
import re
from typing import Any, Literal
from pydantic import Field
from ..base import ApiModel, TimestampMixin
from ..constants import (
EMPTY_STRING,
JIRA_DEFAULT_ID,
JIRA_DEFAULT_KEY,
)
from .comment import JiraComment
from .common import (
JiraAttachment,
JiraChangelog,
JiraIssueType,
JiraPriority,
JiraResolution,
JiraStatus,
JiraTimetracking,
JiraUser,
)
from .link import JiraIssueLink
from .project import JiraProject
logger = logging.getLogger(__name__)
# Extended epic field name patterns to support more variations
EPIC_NAME_PATTERNS = [
r"epic\s*name",
r"epic[._-]?name",
r"epicname",
]
EPIC_LINK_PATTERNS = [
r"epic\s*link",
r"epic[._-]?link",
r"Parent Link",
r"parent\s*link",
r"epiclink",
]
class JiraIssue(ApiModel, TimestampMixin):
"""
Model representing a Jira issue.
This is a comprehensive model containing all the common fields
for Jira issues and related metadata.
"""
id: str = JIRA_DEFAULT_ID
key: str = JIRA_DEFAULT_KEY
summary: str = EMPTY_STRING
description: str | None = None
created: str = EMPTY_STRING
updated: str = EMPTY_STRING
status: JiraStatus | None = None
issue_type: JiraIssueType | None = None
priority: JiraPriority | None = None
assignee: JiraUser | None = None
reporter: JiraUser | None = None
labels: list[str] = Field(default_factory=list)
components: list[str] = Field(default_factory=list)
comments: list[JiraComment] = Field(default_factory=list)
attachments: list[JiraAttachment] = Field(default_factory=list)
timetracking: JiraTimetracking | None = None
url: str | None = None
epic_key: str | None = None
epic_name: str | None = None
fix_versions: list[str] = Field(default_factory=list)
custom_fields: dict[str, Any] = Field(default_factory=dict)
requested_fields: Literal["*all"] | list[str] | None = None
project: JiraProject | None = None
resolution: JiraResolution | None = None
duedate: str | None = None
resolutiondate: str | None = None
parent: dict | None = None
subtasks: list[dict] = Field(default_factory=list)
security: dict | None = None
worklog: dict | None = None
changelogs: list[JiraChangelog] = Field(default_factory=list)
issuelinks: list[JiraIssueLink] = Field(default_factory=list)
def __getattribute__(self, name: str) -> Any:
"""
Custom attribute access to handle custom field access.
This allows accessing custom fields by their name as if they were
regular attributes of the JiraIssue class.
Args:
name: The attribute name to access
Returns:
The attribute value or custom field value
"""
# First try to get the attribute normally
try:
return super().__getattribute__(name)
except AttributeError:
# If the attribute doesn't exist, check if it's a custom field
try:
custom_fields = super().__getattribute__("custom_fields")
if name in custom_fields:
return custom_fields[name]
except AttributeError:
pass
# Re-raise the original AttributeError
raise
@property
def page_content(self) -> str | None:
"""
Get the page content from the description.
This is a convenience property for treating Jira issues as documentation pages.
Returns:
The description text or None
"""
# Return description without modification for now
# In the future, we could parse ADF content here
return self.description
@staticmethod
def _find_custom_field_in_api_response(
fields: dict[str, Any], name_patterns: list[str]
) -> Any:
"""
Find a custom field by name patterns in the raw API response.
Used during object creation from API response to extract fields
before the JiraIssue object is instantiated.
Args:
fields: The fields dictionary from the Jira API
name_patterns: List of field name patterns to search for
Returns:
The custom field value or None
"""
if not fields or not isinstance(fields, dict):
return None
# Normalize all patterns for easier matching
normalized_patterns = []
for pattern in name_patterns:
norm_pattern = pattern.lower()
norm_pattern = re.sub(r"[_\-\s]", "", norm_pattern)
normalized_patterns.append(norm_pattern)
custom_field_id = None
# Check if fields has a names fields
names_dict = fields.get("names", {})
if isinstance(names_dict, dict):
for field_id, field_name in names_dict.items():
field_name_norm = re.sub(r"[_\-\s]", "", field_name.lower())
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
else:
logger.debug("No names dict found in fields", exc_info=True)
# Look at field metadata if name method didn't work
if not custom_field_id:
schema = fields.get("schema", {})
if schema and isinstance(schema, dict) and "fields" in schema:
schema_fields = schema["fields"]
if isinstance(schema_fields, dict):
for field_id, field_info in schema_fields.items():
if not field_id.startswith("customfield_"):
continue
if isinstance(field_info, dict) and "name" in field_info:
field_name = field_info["name"].lower()
field_name_norm = re.sub(r"[_\-\s]", "", field_name)
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
# Try direct matching of field IDs for common epic fields
if not custom_field_id:
has_epic_link_pattern = any("epiclink" in p for p in normalized_patterns)
has_epic_name_pattern = any("epicname" in p for p in normalized_patterns)
if has_epic_link_pattern:
for field_id in fields:
if field_id.startswith("customfield_") and field_id.endswith("14"):
custom_field_id = field_id
break
elif has_epic_name_pattern:
for field_id in fields:
if field_id.startswith("customfield_") and field_id.endswith("11"):
custom_field_id = field_id
break
# Last attempt - look through all custom fields for names in their values
if not custom_field_id:
for field_id, field_value in fields.items():
if not field_id.startswith("customfield_"):
continue
field_name = None
if isinstance(field_value, dict) and "name" in field_value:
field_name = field_value.get("name", "").lower()
elif isinstance(field_value, dict) and "key" in field_value:
field_name = field_value.get("key", "").lower()
if not field_name:
continue
field_name_norm = re.sub(r"[_\-\s]", "", field_name)
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
if custom_field_id and custom_field_id in fields:
return fields[custom_field_id]
return None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssue":
"""
Create a JiraIssue from a Jira API response.
Args:
data: The issue data from the Jira API
**kwargs: Additional arguments to pass to the constructor
Returns:
A JiraIssue instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
fields = data.get("fields", {})
if not isinstance(fields, dict):
fields = {}
# Get required simple fields
issue_id = str(data.get("id", JIRA_DEFAULT_ID))
key = str(data.get("key", JIRA_DEFAULT_KEY))
summary = str(fields.get("summary", EMPTY_STRING))
description = fields.get("description")
# Timestamps
created = str(fields.get("created", EMPTY_STRING))
updated = str(fields.get("updated", EMPTY_STRING))
# Extract assignee data
assignee = None
assignee_data = fields.get("assignee")
if assignee_data:
assignee = JiraUser.from_api_response(assignee_data)
# Extract reporter data
reporter = None
reporter_data = fields.get("reporter")
if reporter_data:
reporter = JiraUser.from_api_response(reporter_data)
# Extract status data
status = None
status_data = fields.get("status")
if status_data:
status = JiraStatus.from_api_response(status_data)
# Extract issue type data
issue_type = None
issue_type_data = fields.get("issuetype")
if issue_type_data:
issue_type = JiraIssueType.from_api_response(issue_type_data)
# Extract priority data
priority = None
priority_data = fields.get("priority")
if priority_data:
priority = JiraPriority.from_api_response(priority_data)
# Extract project data
project = None
project_data = fields.get("project")
if isinstance(project_data, dict):
project = JiraProject.from_api_response(project_data)
resolution = None
resolution_data = fields.get("resolution")
if isinstance(resolution_data, dict):
resolution = JiraResolution.from_api_response(resolution_data)
duedate = (
fields.get("duedate") if isinstance(fields.get("duedate"), str) else None
)
resolutiondate = (
fields.get("resolutiondate")
if isinstance(fields.get("resolutiondate"), str)
else None
)
parent = (
fields.get("parent") if isinstance(fields.get("parent"), dict) else None
)
# Ensure subtasks is a list of dicts
subtasks_raw = fields.get("subtasks", [])
subtasks = (
[st for st in subtasks_raw if isinstance(st, dict)]
if isinstance(subtasks_raw, list)
else []
)
security = (
fields.get("security") if isinstance(fields.get("security"), dict) else None
)
worklog = (
fields.get("worklog") if isinstance(fields.get("worklog"), dict) else None
)
# Lists of strings
labels = []
if labels_data := fields.get("labels"):
if isinstance(labels_data, list):
labels = [str(label) for label in labels_data if label]
components = []
if components_data := fields.get("components"):
if isinstance(components_data, list):
components = [
str(comp.get("name", "")) if isinstance(comp, dict) else str(comp)
for comp in components_data
if comp
]
fix_versions = []
if fix_versions_data := fields.get("fixVersions"):
if isinstance(fix_versions_data, list):
fix_versions = [
str(version.get("name", ""))
if isinstance(version, dict)
else str(version)
for version in fix_versions_data
if version
]
# Handling comments
comments = []
comments_field = fields.get("comment", {})
if isinstance(comments_field, dict) and "comments" in comments_field:
comments_data = comments_field["comments"]
if isinstance(comments_data, list):
comments = [
JiraComment.from_api_response(comment)
for comment in comments_data
if comment
]
# Handling changelogs
changelogs = []
changelogs_data = data.get("changelog", {})
if isinstance(changelogs_data, dict) and "histories" in changelogs_data:
changelogs = [
JiraChangelog.from_api_response(history)
for history in changelogs_data["histories"]
]
# Handling attachments
attachments = []
attachments_data = fields.get("attachment", [])
if isinstance(attachments_data, list):
attachments = [
JiraAttachment.from_api_response(attachment)
for attachment in attachments_data
if attachment
]
# Timetracking
timetracking = None
timetracking_data = fields.get("timetracking")
if timetracking_data:
timetracking = JiraTimetracking.from_api_response(timetracking_data)
# URL
url = data.get("self") # API URL for the issue
# Try to find epic fields (varies by Jira instance)
epic_key = None
epic_name = None
# Check for "Epic Link" field
epic_link = cls._find_custom_field_in_api_response(
fields, ["epic link", "parent epic"]
)
if isinstance(epic_link, str):
epic_key = epic_link
# Check for "Epic Name" field
epic_name_value = cls._find_custom_field_in_api_response(fields, ["epic name"])
if isinstance(epic_name_value, str):
epic_name = epic_name_value
# Store custom fields
custom_fields = {}
fields_name_map = data.get("names", {})
for orig_field_id, orig_field_value in fields.items():
if orig_field_id.startswith("customfield_"):
value_obj_to_store = {"value": orig_field_value}
human_readable_name = fields_name_map.get(orig_field_id)
if human_readable_name:
value_obj_to_store["name"] = human_readable_name
custom_fields[orig_field_id] = value_obj_to_store
# Handle requested_fields parameter
requested_fields_param = kwargs.get("requested_fields")
# Convert string requested_fields to list (except "*all")
if isinstance(requested_fields_param, str) and requested_fields_param != "*all":
requested_fields_param = requested_fields_param.split(",")
# Strip whitespace from each field name
requested_fields_param = [field.strip() for field in requested_fields_param]
# Create the issue instance with all the extracted data
return cls(
id=issue_id,
key=key,
summary=summary,
description=description,
created=created,
updated=updated,
status=status,
issue_type=issue_type,
priority=priority,
assignee=assignee,
reporter=reporter,
project=project,
resolution=resolution,
duedate=duedate,
resolutiondate=resolutiondate,
parent=parent,
subtasks=subtasks,
security=security,
worklog=worklog,
labels=labels,
components=components,
comments=comments,
attachments=attachments,
timetracking=timetracking,
url=url,
epic_key=epic_key,
epic_name=epic_name,
fix_versions=fix_versions,
custom_fields=custom_fields,
requested_fields=requested_fields_param,
changelogs=changelogs,
issuelinks=cls._extract_issue_links(fields),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result: dict[str, Any] = {
"id": self.id,
"key": self.key,
}
# Helper method to check if a field should be included
def should_include_field(field_name: str) -> bool:
return (
self.requested_fields == "*all"
or not isinstance(self.requested_fields, list)
or field_name in self.requested_fields
)
# Add summary if requested
if should_include_field("summary"):
result["summary"] = self.summary
# Add URL if available and requested
if self.url and should_include_field("url"):
result["url"] = self.url
# Add description if available and requested
if self.description and should_include_field("description"):
result["description"] = self.description
# Add status if available and requested
if self.status and should_include_field("status"):
result["status"] = self.status.to_simplified_dict()
# Add issue type if available and requested
if self.issue_type and should_include_field("issue_type"):
result["issue_type"] = self.issue_type.to_simplified_dict()
# Add priority if available and requested
if self.priority and should_include_field("priority"):
result["priority"] = self.priority.to_simplified_dict()
# Add project info if available and requested
if self.project and should_include_field("project"):
result["project"] = self.project.to_simplified_dict()
# Add resolution if available and requested
if self.resolution and should_include_field("resolution"):
result["resolution"] = self.resolution.to_simplified_dict()
# Add dates if available and requested
if self.duedate and should_include_field("duedate"):
result["duedate"] = self.duedate
if self.resolutiondate and should_include_field("resolutiondate"):
result["resolutiondate"] = self.resolutiondate
# Add parent and subtasks if available and requested
if self.parent and should_include_field("parent"):
result["parent"] = self.parent
if self.subtasks and should_include_field("subtasks"):
result["subtasks"] = self.subtasks
# Add security and worklog if available and requested
if self.security and should_include_field("security"):
result["security"] = self.security
if self.worklog and should_include_field("worklog"):
result["worklog"] = self.worklog
# Add assignee if requested
if should_include_field("assignee"):
if self.assignee:
result["assignee"] = self.assignee.to_simplified_dict()
else:
result["assignee"] = {"display_name": "Unassigned"}
# Add reporter if available and requested
if self.reporter and should_include_field("reporter"):
result["reporter"] = self.reporter.to_simplified_dict()
# Add lists if available and requested
if self.labels and should_include_field("labels"):
result["labels"] = self.labels
if self.components and should_include_field("components"):
result["components"] = self.components
if self.fix_versions and should_include_field("fix_versions"):
result["fix_versions"] = self.fix_versions
# Add epic fields if available and requested
if self.epic_key and should_include_field("epic_key"):
result["epic_key"] = self.epic_key
if self.epic_name and should_include_field("epic_name"):
result["epic_name"] = self.epic_name
# Add time tracking if available and requested
if self.timetracking and should_include_field("timetracking"):
result["timetracking"] = self.timetracking.to_simplified_dict()
# Add created and updated timestamps if available and requested
if self.created and should_include_field("created"):
result["created"] = self.created
if self.updated and should_include_field("updated"):
result["updated"] = self.updated
# Add comments if available and requested
if self.comments and should_include_field("comment"):
result["comments"] = [
comment.to_simplified_dict() for comment in self.comments
]
# Add attachments if available and requested
if self.attachments and should_include_field("attachment"):
result["attachments"] = [
attachment.to_simplified_dict() for attachment in self.attachments
]
# Not use should_include_field since you won't get changelogs
# if you don't ask for them
if self.changelogs:
result["changelogs"] = [
changelog.to_simplified_dict() for changelog in self.changelogs
]
# Add issue links if available and requested
if self.issuelinks and should_include_field("issuelinks"):
result["issuelinks"] = [
link.to_simplified_dict() for link in self.issuelinks
]
# Process custom fields
if self.custom_fields:
if self.requested_fields == "*all":
for internal_id, field_data_obj in self.custom_fields.items():
processed_value = self._process_custom_field_value(
field_data_obj.get("value")
)
output_value_obj = {"value": processed_value}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[internal_id] = output_value_obj
elif isinstance(self.requested_fields, list):
for requested_key_or_name in self.requested_fields:
found_by_id_or_name = False
if (
requested_key_or_name.startswith("customfield_")
and requested_key_or_name in self.custom_fields
):
field_data_obj = self.custom_fields[requested_key_or_name]
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[requested_key_or_name] = output_value_obj
found_by_id_or_name = True
else:
for internal_id, field_data_obj in self.custom_fields.items():
if (
field_data_obj.get("name", "").lower()
== requested_key_or_name.lower()
):
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
output_value_obj["name"] = field_data_obj["name"]
result[internal_id] = output_value_obj
found_by_id_or_name = True
break
if not found_by_id_or_name and requested_key_or_name.startswith(
"cf_"
):
full_id = "customfield_" + requested_key_or_name[3:]
if full_id in self.custom_fields:
field_data_obj = self.custom_fields[full_id]
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[full_id] = output_value_obj
return {k: v for k, v in result.items() if v is not None}
def _process_custom_field_value(self, field_value: Any) -> Any:
"""
Process a custom field value for simplified dict output.
Args:
field_value: The value to process
Returns:
Processed value suitable for API response
"""
if field_value is None or isinstance(field_value, str | int | float | bool):
return field_value
if isinstance(field_value, dict):
# For single-select, user pickers, etc., try to extract 'value' or 'name'
if "value" in field_value:
return field_value["value"]
elif "name" in field_value:
return field_value["name"]
return field_value
if isinstance(field_value, list):
return [self._process_custom_field_value(item) for item in field_value]
return str(field_value)
def _find_custom_field_in_issue(
self, name: str, pattern: bool = False
) -> tuple[str | None, Any]:
"""
Find a custom field by name or pattern in an instantiated JiraIssue.
Used by instance methods like _get_epic_name and _get_epic_link
to search through the custom_fields dictionary of an existing issue.
Args:
name: The name to search for
pattern: If True, use regex pattern matching
Returns:
A tuple of (field_id, field_value) or (None, None) if not found
"""
if not self.custom_fields:
return None, None
# Check if fields has a names() method (some implementations have this)
names_dict = self.custom_fields.get("names", {})
if isinstance(names_dict, dict):
for field_id, field_name in names_dict.items():
if (pattern and re.search(name, field_name, re.IGNORECASE)) or (
not pattern and field_name.lower() == name.lower()
):
return field_id, self.custom_fields.get(field_id)
else:
logger.debug("No names dict found in custom fields", exc_info=True)
# Check field metadata for name (custom fields usually have a name)
for field_id, field_value in self.custom_fields.items():
if not field_id.startswith("customfield_"):
continue
# Custom fields can have a schema with a name
if isinstance(field_value, dict) and field_value.get("name"):
field_name = field_value.get("name")
if field_name and (
(pattern and re.search(name, field_name, re.IGNORECASE))
or (not pattern and field_name.lower() == name.lower())
):
return field_id, field_value
# Fallback: Directly look for keys that match the pattern
if pattern:
for field_id, field_value in self.custom_fields.items():
if re.search(name, field_id, re.IGNORECASE):
return field_id, field_value
return None, None
def _get_epic_name(self) -> str | None:
"""Get the epic name from custom fields if available."""
# Try each pattern in order
for pattern in EPIC_NAME_PATTERNS:
field_id, field_value = self._find_custom_field_in_issue(
pattern, pattern=True
)
if field_id and field_value:
if isinstance(field_value, dict):
return field_value.get("value") or field_value.get("name")
return str(field_value)
return None
def _get_epic_link(self) -> str | None:
"""Get the epic link from custom fields if available."""
# Try each pattern in order
for pattern in EPIC_LINK_PATTERNS:
field_id, field_value = self._find_custom_field_in_issue(
pattern, pattern=True
)
if field_id and field_value:
# Handle different possible value types
if isinstance(field_value, dict):
return field_value.get("key") or field_value.get("value")
return str(field_value)
return None
@staticmethod
def _extract_issue_links(fields: dict[str, Any]) -> list[JiraIssueLink]:
"""
Extract issue links from fields.
Args:
fields: The fields dictionary from the Jira API
Returns:
List of JiraIssueLink objects
"""
if not fields or not isinstance(fields, dict):
return []
issuelinks_data = fields.get("issuelinks", [])
if not isinstance(issuelinks_data, list):
return []
return [
JiraIssueLink.from_api_response(link_data)
for link_data in issuelinks_data
if link_data
]