"""Attachment operations for Confluence API."""
import logging
import os
from pathlib import Path
from typing import Any
from ..models.confluence import ConfluenceAttachment
from .client import ConfluenceClient
from .protocols import AttachmentsOperationsProto
from .v2_adapter import ConfluenceV2Adapter
# Configure logging
logger = logging.getLogger("mcp-confluence")
class AttachmentsMixin(ConfluenceClient, AttachmentsOperationsProto):
"""Mixin for Confluence attachment operations."""
@property
def _v2_adapter(self) -> ConfluenceV2Adapter | None:
"""Get v2 API adapter for OAuth authentication.
Returns:
ConfluenceV2Adapter instance if OAuth is configured, None otherwise
"""
if self.config.auth_type == "oauth" and self.config.is_cloud:
return ConfluenceV2Adapter(
session=self.confluence._session, base_url=self.confluence.url
)
return None
def upload_attachment(
self,
content_id: str,
file_path: str,
comment: str | None = None,
minor_edit: bool = True,
) -> dict[str, Any]:
"""
Upload a single attachment to Confluence content.
Args:
content_id: The Confluence content ID
file_path: The path to the file to upload
comment: Optional comment for the attachment
minor_edit: Whether this is a minor edit (default: True)
Returns:
A dictionary with upload result information
"""
if not content_id:
logger.error("No content ID provided for attachment upload")
return {"success": False, "error": "No content ID provided"}
if not file_path:
logger.error("No file path provided for attachment upload")
return {"success": False, "error": "No file path provided"}
try:
# Convert to absolute path if relative
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
# Check if file exists
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return {"success": False, "error": f"File not found: {file_path}"}
logger.info(
f"Uploading attachment from {file_path} to content {content_id} (minor_edit={minor_edit})"
)
# Use direct REST API call to support minorEdit parameter
filename = os.path.basename(file_path)
attachment = self._upload_attachment_direct(
content_id, file_path, filename, comment, minor_edit
)
if attachment:
file_size = os.path.getsize(file_path)
logger.info(
f"Successfully uploaded attachment {filename} to content {content_id} (size: {file_size} bytes)"
)
return {
"success": True,
"content_id": content_id,
"filename": filename,
"size": file_size,
"id": attachment.get("id")
if isinstance(attachment, dict)
else None,
}
else:
logger.error(
f"Failed to upload attachment {filename} to content {content_id}"
)
return {
"success": False,
"error": f"Failed to upload attachment {filename} to content {content_id}",
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error uploading attachment: {error_msg}")
return {"success": False, "error": error_msg}
def upload_attachments(
self,
content_id: str,
file_paths: list[str],
comment: str | None = None,
minor_edit: bool = True,
) -> dict[str, Any]:
"""
Upload multiple attachments to Confluence content.
Args:
content_id: The Confluence content ID
file_paths: List of paths to files to upload
comment: Optional comment for the attachments
minor_edit: Whether this is a minor edit (default: True)
Returns:
A dictionary with upload results
"""
if not content_id:
logger.error("No content ID provided for attachment upload")
return {"success": False, "error": "No content ID provided"}
if not file_paths:
logger.error("No file paths provided for attachment upload")
return {"success": False, "error": "No file paths provided"}
logger.info(f"Uploading {len(file_paths)} attachments to content {content_id}")
# Upload each attachment
uploaded = []
failed = []
for file_path in file_paths:
result = self.upload_attachment(content_id, file_path, comment, minor_edit)
if result.get("success"):
uploaded.append(
{
"filename": result.get("filename"),
"size": result.get("size"),
"id": result.get("id"),
}
)
else:
failed.append(
{
"filename": os.path.basename(file_path),
"error": result.get("error"),
}
)
return {
"success": True,
"content_id": content_id,
"total": len(file_paths),
"uploaded": uploaded,
"failed": failed,
}
def download_attachment(self, url: str, target_path: str) -> bool:
"""
Download a Confluence attachment to the specified path.
Args:
url: The URL of the attachment to download
target_path: The path where the attachment should be saved
Returns:
True if successful, False otherwise
"""
if not url:
logger.error("No URL provided for attachment download")
return False
try:
# Convert to absolute path if relative
if not os.path.isabs(target_path):
target_path = os.path.abspath(target_path)
logger.info(f"Downloading attachment from {url} to {target_path}")
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(target_path), exist_ok=True)
# Use the Confluence session to download the file
response = self.confluence._session.get(url, stream=True)
response.raise_for_status()
# Write the file to disk
with open(target_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify the file was created
if os.path.exists(target_path):
file_size = os.path.getsize(target_path)
logger.info(
f"Successfully downloaded attachment to {target_path} (size: {file_size} bytes)"
)
return True
else:
logger.error(f"File was not created at {target_path}")
return False
except Exception as e:
logger.error(f"Error downloading attachment: {str(e)}")
return False
def download_content_attachments(
self, content_id: str, target_dir: str
) -> dict[str, Any]:
"""
Download all attachments for Confluence content.
Args:
content_id: The Confluence content ID
target_dir: The directory where attachments should be saved
Returns:
A dictionary with download results
"""
# Convert to absolute path if relative
if not os.path.isabs(target_dir):
target_dir = os.path.abspath(target_dir)
logger.info(
f"Downloading attachments for content {content_id} to directory: {target_dir}"
)
# Create the target directory if it doesn't exist
target_path = Path(target_dir)
target_path.mkdir(parents=True, exist_ok=True)
# Get the attachments
logger.info(f"Fetching attachments for content {content_id}")
attachments_result = self.get_content_attachments(content_id)
if not attachments_result.get("success"):
return attachments_result
attachment_data = attachments_result.get("attachments", [])
if not attachment_data:
return {
"success": True,
"message": f"No attachments found for content {content_id}",
"downloaded": [],
"failed": [],
}
# Create ConfluenceAttachment objects for each attachment
attachments = []
for attachment in attachment_data:
if isinstance(attachment, dict):
attachments.append(ConfluenceAttachment.from_api_response(attachment))
# Download each attachment
downloaded = []
failed = []
for attachment in attachments:
if not attachment.download_url:
logger.warning(f"No download URL for attachment {attachment.title}")
failed.append(
{
"filename": attachment.title,
"error": "No download URL available",
}
)
continue
# Create a safe filename
safe_filename = Path(attachment.title).name
file_path = target_path / safe_filename
# Prepend base URL if download URL is relative
download_url = attachment.download_url
if download_url.startswith("/"):
base_url = self.config.url.rstrip("/")
download_url = f"{base_url}{download_url}"
# Download the attachment
success = self.download_attachment(download_url, str(file_path))
if success:
downloaded.append(
{
"filename": attachment.title,
"path": str(file_path),
"size": attachment.file_size,
}
)
else:
failed.append(
{"filename": attachment.title, "error": "Download failed"}
)
return {
"success": True,
"content_id": content_id,
"total": len(attachments),
"downloaded": downloaded,
"failed": failed,
}
def get_content_attachments(
self,
content_id: str,
start: int = 0,
limit: int = 50,
filename: str | None = None,
media_type: str | None = None,
) -> dict[str, Any]:
"""
Get all attachments for Confluence content.
Args:
content_id: The Confluence content ID
start: Starting index for pagination
limit: Maximum number of results to return
filename: Optional filename filter (exact match)
media_type: Optional MIME type filter (exact match)
Returns:
A dictionary with attachment information
"""
if not content_id:
logger.error("No content ID provided for getting attachments")
return {"success": False, "error": "No content ID provided"}
try:
logger.info(f"Fetching attachments for content {content_id}")
# Use v2 API for OAuth authentication, v1 API for token/basic auth
v2_adapter = self._v2_adapter
if v2_adapter:
logger.debug(
f"Using v2 API for OAuth authentication to get attachments for '{content_id}'"
)
# V2 API supports server-side filtering
response = v2_adapter.get_page_attachments(
page_id=content_id,
start=start,
limit=limit,
filename=filename,
media_type=media_type,
)
else:
logger.debug(
f"Using v1 API for token/basic authentication to get attachments for '{content_id}'"
)
# V1 API doesn't support filtering - fetch all, then filter client-side
response = self.confluence.get_attachments_from_content(
content_id, start=start, limit=limit
)
attachments = response.get("results", [])
total = response.get("size", 0)
# Apply client-side filtering for V1 API when filters are specified
if not v2_adapter and (filename or media_type):
filtered = []
for att in attachments:
# Filter by filename (exact match)
if filename and att.get("title") != filename:
continue
# Filter by media_type (exact match)
if media_type and att.get("mediaType") != media_type:
continue
filtered.append(att)
attachments = filtered
total = len(filtered)
logger.debug(
f"Client-side filtering: {len(filtered)} of {response.get('size', 0)} attachments matched"
)
logger.info(
f"Retrieved {len(attachments)} attachments for content {content_id}"
)
return {
"success": True,
"content_id": content_id,
"attachments": attachments,
"total": total,
"start": start,
"limit": limit,
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error getting attachments: {error_msg}")
return {"success": False, "error": error_msg}
def _upload_attachment_direct(
self,
content_id: str,
file_path: str,
filename: str,
comment: str | None,
minor_edit: bool,
) -> dict[str, Any] | None:
"""
Upload attachment using direct REST API call.
This method uses the Confluence REST API directly to support
the minorEdit parameter, which is not available in the
atlassian-python-api library's attach_file() method.
Args:
content_id: The Confluence content ID
file_path: Full path to the file
filename: Name of the file
comment: Optional comment for the attachment
minor_edit: Whether this is a minor edit
Returns:
Attachment metadata dict if successful, None otherwise
"""
try:
# Build the API endpoint URL
base_url = self.config.url.rstrip("/")
url = f"{base_url}/rest/api/content/{content_id}/child/attachment"
# Prepare headers (X-Atlassian-Token required for file uploads)
headers = {"X-Atlassian-Token": "nocheck"}
# Prepare multipart form data
files = {"file": (filename, open(file_path, "rb"))}
# Comment must be sent with text/plain content-type for proper encoding
if comment:
files["comment"] = (None, comment, "text/plain; charset=utf-8")
data = {}
if minor_edit is not None:
data["minorEdit"] = str(minor_edit).lower()
# Use PUT to support creating new versions of existing attachments
# PUT will create a new attachment if it doesn't exist, OR create a new
# version if an attachment with the same filename already exists
response = self.confluence._session.put(
url, headers=headers, files=files, data=data
)
response.raise_for_status()
# Parse response
result = response.json()
# Return first result if it's a list
if isinstance(result, dict) and "results" in result:
results = result.get("results", [])
return results[0] if results else result
return result
except Exception as e:
logger.error(f"Direct API upload failed: {e}")
return None
finally:
# Close file handles (only for actual file objects, not text fields like comment)
if "files" in locals() and "file" in files:
file_tuple = files["file"]
if len(file_tuple) >= 2 and hasattr(file_tuple[1], "close"):
file_tuple[1].close()
def delete_attachment(self, attachment_id: str) -> dict[str, Any]:
"""
Delete an attachment by ID.
Args:
attachment_id: The Confluence attachment ID
Returns:
A dictionary with deletion result
"""
if not attachment_id:
logger.error("No attachment ID provided for deletion")
return {"success": False, "error": "No attachment ID provided"}
try:
logger.info(f"Deleting attachment {attachment_id}")
# Use v2 API for OAuth authentication, v1 API for token/basic auth
v2_adapter = self._v2_adapter
if v2_adapter:
logger.debug(
f"Using v2 API for OAuth authentication to delete attachment '{attachment_id}'"
)
v2_adapter.delete_attachment(attachment_id)
else:
logger.debug(
f"Using v1 API for token/basic authentication to delete attachment '{attachment_id}'"
)
# Use v1 API endpoint for deletion
base_url = self.config.url.rstrip("/")
url = f"{base_url}/rest/api/content/{attachment_id}"
response = self.confluence._session.delete(url)
response.raise_for_status()
logger.info(f"Successfully deleted attachment {attachment_id}")
return {
"success": True,
"attachment_id": attachment_id,
"message": "Attachment deleted successfully",
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error deleting attachment: {error_msg}")
return {"success": False, "error": error_msg}