"""Email operations for Microsoft Graph API."""
import base64
import os
from typing import Dict, List, Any
from .base import BaseOperation, OperationError
from ...attachment_helper import process_attachment_for_analysis
from ..graph_client import MicrosoftGraphClient
class EmailOperations(BaseOperation):
"""Operations for email management."""
def get_supported_actions(self) -> List[str]:
"""Return supported email actions."""
return [
"list", "get", "send", "reply", "reply_all",
"forward", "delete", "move", "list_attachments", "get_attachment",
"read_attachment_text", "search", "get_conversation"
]
def _validate_action_params(self, action: str, params: Dict) -> None:
"""Validate parameters for email actions."""
# access_token and microsoft_user are required for all actions (from TrustyVault)
self._require_param(params, "access_token", str)
self._require_param(params, "microsoft_user", str)
if action == "list":
folder = params.get("folder", "inbox")
self._validate_choice(folder, ["inbox", "sent", "drafts", "deleted"], "folder")
elif action == "get":
self._require_param(params, "message_id", str)
elif action == "send":
to = self._require_param(params, "to", list)
if not to:
raise OperationError(code="INVALID_PARAM", message="'to' must contain at least one recipient")
for email in to:
self._validate_email(email)
self._require_param(params, "subject", str)
self._require_param(params, "body", str)
body_type = params.get("body_type", "HTML")
self._validate_choice(body_type, ["Text", "HTML"], "body_type")
importance = params.get("importance", "normal")
self._validate_choice(importance, ["low", "normal", "high"], "importance")
# Validate cc/bcc if present
for field in ["cc", "bcc"]:
if field in params:
for email in params[field]:
self._validate_email(email)
elif action in ["reply", "reply_all"]:
self._require_param(params, "message_id", str)
self._require_param(params, "body", str)
elif action == "forward":
self._require_param(params, "message_id", str)
to = self._require_param(params, "to", list)
if not to:
raise OperationError(code="INVALID_PARAM", message="'to' must contain at least one recipient")
for email in to:
self._validate_email(email)
elif action in ["delete", "move"]:
self._require_param(params, "message_id", str)
if action == "move":
self._require_param(params, "folder_id", str)
elif action == "list_attachments":
self._require_param(params, "message_id", str)
elif action == "get_attachment":
self._require_param(params, "message_id", str)
self._require_param(params, "attachment_id", str)
elif action == "read_attachment_text":
self._require_param(params, "message_id", str)
self._require_param(params, "attachment_id", str)
elif action == "search":
# query is required for search
self._require_param(params, "query", str)
elif action == "get_conversation":
# Either conversation_id or message_id is required
if "conversation_id" not in params and "message_id" not in params:
raise OperationError(
code="INVALID_PARAM",
message="Either 'conversation_id' or 'message_id' is required"
)
def _execute_action(self, action: str, params: Dict) -> Any:
"""Execute email action."""
if action == "list":
return self._action_list(params)
elif action == "get":
return self._action_get(params)
elif action == "send":
return self._action_send(params)
elif action == "reply":
return self._action_reply(params, reply_all=False)
elif action == "reply_all":
return self._action_reply(params, reply_all=True)
elif action == "forward":
return self._action_forward(params)
elif action == "delete":
return self._action_delete(params)
elif action == "move":
return self._action_move(params)
elif action == "list_attachments":
return self._action_list_attachments(params)
elif action == "get_attachment":
return self._action_get_attachment(params)
elif action == "read_attachment_text":
return self._action_read_attachment_text(params)
elif action == "search":
return self._action_search(params)
elif action == "get_conversation":
return self._action_get_conversation(params)
def _action_list(self, params: Dict) -> Dict:
"""List emails with optional filters."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
folder = params.get("folder", "inbox")
max_results = params.get("max_results", 50)
# Build query
query_params = {
"$select": "id,subject,from,receivedDateTime,isRead,hasAttachments,bodyPreview",
"$top": max_results,
"$orderby": "receivedDateTime desc"
}
# Add filters
filter_parts = []
filters = params.get("filters", {})
if "from" in filters:
filter_parts.append(f"from/emailAddress/address eq '{filters['from']}'")
if "subject_contains" in filters:
filter_parts.append(f"contains(subject, '{filters['subject_contains']}')")
if "is_read" in filters:
filter_parts.append(f"isRead eq {str(filters['is_read']).lower()}")
if "received_after" in filters:
filter_parts.append(f"receivedDateTime ge {filters['received_after']}")
if "received_before" in filters:
filter_parts.append(f"receivedDateTime le {filters['received_before']}")
if "conversation_id" in filters:
filter_parts.append(f"conversationId eq '{filters['conversation_id']}'")
if "has_attachments" in filters:
filter_parts.append(f"hasAttachments eq {str(filters['has_attachments']).lower()}")
if filter_parts:
query_params["$filter"] = " and ".join(filter_parts)
try:
folder_map = {
"inbox": "inbox",
"sent": "sentitems",
"drafts": "drafts",
"deleted": "deleteditems"
}
folder_name = folder_map[folder]
# Create GraphClient with access_token from TrustyVault
client = MicrosoftGraphClient(access_token=access_token)
response = client.get(
f"/users/{microsoft_user}/mailFolders/{folder_name}/messages",
params=query_params
)
messages = response.get("value", [])
return {
"messages": messages,
"count": len(messages),
"folder": folder
}
except Exception as e:
raise OperationError(
code="LIST_FAILED",
message=f"Failed to list emails: {str(e)}"
)
def _action_get(self, params: Dict) -> Dict:
"""Get email details."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
message_id = params["message_id"]
try:
message = client.get(
f"/users/{microsoft_user}/messages/{message_id}",
)
return {"message": message}
except Exception as e:
if "404" in str(e):
raise OperationError(
code="MESSAGE_NOT_FOUND",
message=f"Message not found: {message_id}",
details={"message_id": message_id}
)
raise OperationError(
code="GET_FAILED",
message=f"Failed to get message: {str(e)}",
details={"message_id": message_id}
)
def _action_send(self, params: Dict) -> Dict:
"""Send new email."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
# Build message
message = {
"subject": params["subject"],
"body": {
"contentType": params.get("body_type", "HTML"),
"content": params["body"]
},
"toRecipients": [
{"emailAddress": {"address": email}}
for email in params["to"]
],
"importance": params.get("importance", "normal")
}
# Add cc/bcc
if "cc" in params and params["cc"]:
message["ccRecipients"] = [
{"emailAddress": {"address": email}}
for email in params["cc"]
]
if "bcc" in params and params["bcc"]:
message["bccRecipients"] = [
{"emailAddress": {"address": email}}
for email in params["bcc"]
]
# Add attachments if provided
if "attachments" in params and params["attachments"]:
message["attachments"] = []
for attachment in params["attachments"]:
# attachment can be either:
# 1. {"file_path": "/path/to/file.pdf"} - read from disk
# 2. {"name": "file.pdf", "content_bytes": "base64data", "content_type": "application/pdf"}
if "file_path" in attachment:
# Read file from disk
file_path = attachment["file_path"]
if not os.path.exists(file_path):
raise OperationError(
code="FILE_NOT_FOUND",
message=f"Attachment file not found: {file_path}"
)
with open(file_path, "rb") as f:
content_bytes = base64.b64encode(f.read()).decode("utf-8")
file_name = attachment.get("name") or os.path.basename(file_path)
content_type = attachment.get("content_type") or self._guess_content_type(file_name)
message["attachments"].append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": file_name,
"contentType": content_type,
"contentBytes": content_bytes
})
else:
# Use provided content
message["attachments"].append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": attachment["name"],
"contentType": attachment.get("content_type", "application/octet-stream"),
"contentBytes": attachment["content_bytes"]
})
try:
# Send directly
client.post(
f"/users/{microsoft_user}/sendMail",
json={"message": message}
)
return {
"sent": True,
"subject": params["subject"],
"to": params["to"],
"attachments_count": len(params.get("attachments", []))
}
except Exception as e:
raise OperationError(
code="SEND_FAILED",
message=f"Failed to send email: {str(e)}",
details={"subject": params["subject"]}
)
def _action_reply(self, params: Dict, reply_all: bool = False) -> Dict:
"""Reply to email."""
message_id = params["message_id"]
body = params["body"]
comment = params.get("comment", body)
action_url = "replyAll" if reply_all else "reply"
try:
client.post(
f"/users/{microsoft_user}/messages/{message_id}/{action_url}",
json={"comment": comment}
)
return {
"replied": True,
"message_id": message_id,
"reply_all": reply_all
}
except Exception as e:
raise OperationError(
code="REPLY_FAILED",
message=f"Failed to reply to message: {str(e)}",
details={"message_id": message_id}
)
def _action_forward(self, params: Dict) -> Dict:
"""Forward email."""
message_id = params["message_id"]
to = params["to"]
comment = params.get("comment", "")
try:
client.post(
f"/users/{microsoft_user}/messages/{message_id}/forward",
json={
"comment": comment,
"toRecipients": [
{"emailAddress": {"address": email}}
for email in to
]
}
)
return {
"forwarded": True,
"message_id": message_id,
"to": to
}
except Exception as e:
raise OperationError(
code="FORWARD_FAILED",
message=f"Failed to forward message: {str(e)}",
details={"message_id": message_id}
)
def _action_delete(self, params: Dict) -> Dict:
"""Delete email."""
message_id = params["message_id"]
try:
client.delete(f"/users/{microsoft_user}/messages/{message_id}")
return {
"deleted": True,
"message_id": message_id
}
except Exception as e:
raise OperationError(
code="DELETE_FAILED",
message=f"Failed to delete message: {str(e)}",
details={"message_id": message_id}
)
def _action_move(self, params: Dict) -> Dict:
"""Move email to folder."""
message_id = params["message_id"]
folder_id = params["folder_id"]
try:
moved_message = client.post(
f"/users/{microsoft_user}/messages/{message_id}/move",
json={"destinationId": folder_id}
)
return {
"moved": True,
"message_id": message_id,
"new_folder_id": folder_id
}
except Exception as e:
raise OperationError(
code="MOVE_FAILED",
message=f"Failed to move message: {str(e)}",
details={"message_id": message_id}
)
def _action_list_attachments(self, params: Dict) -> Dict:
"""List attachments for email."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
message_id = params["message_id"]
try:
response = client.get(
f"/users/{microsoft_user}/messages/{message_id}/attachments",
)
attachments = response.get("value", [])
# Format attachment info
formatted_attachments = [
{
"id": att.get("id"),
"name": att.get("name"),
"contentType": att.get("contentType"),
"size": att.get("size"),
"isInline": att.get("isInline", False)
}
for att in attachments
]
return {
"attachments": formatted_attachments,
"count": len(formatted_attachments),
"message_id": message_id
}
except Exception as e:
raise OperationError(
code="LIST_ATTACHMENTS_FAILED",
message=f"Failed to list attachments: {str(e)}",
details={"message_id": message_id}
)
def _action_get_attachment(self, params: Dict) -> Dict:
"""Get attachment, save to disk and convert to markdown for analysis."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
message_id = params["message_id"]
attachment_id = params["attachment_id"]
try:
import base64
# First get attachment metadata
attachment = client.get(
f"/users/{microsoft_user}/messages/{message_id}/attachments/{attachment_id}",
)
attachment_name = attachment.get("name", "attachment")
content_type = attachment.get("contentType")
content_bytes = attachment.get("contentBytes")
# Get raw content
raw_content = None
if content_bytes:
# Decode base64 from Graph API
raw_content = base64.b64decode(content_bytes)
else:
# Try $value endpoint for large files
raw_content = client.get(
f"/users/{microsoft_user}/messages/{message_id}/attachments/{attachment_id}/$value",
)
if not raw_content:
raise Exception("No content available for attachment")
# Ensure bytes
if isinstance(raw_content, str):
raw_content = raw_content.encode('utf-8')
# Process attachment: save + convert to markdown
processed = process_attachment_for_analysis(
content=raw_content,
filename=attachment_name,
content_type=content_type
)
return {
"attachment": processed
}
except Exception as e:
raise OperationError(
code="GET_ATTACHMENT_FAILED",
message=f"Failed to get attachment: {str(e)}",
details={"message_id": message_id, "attachment_id": attachment_id}
)
def _action_read_attachment_text(self, params: Dict) -> Dict:
"""Read text content from attachment (PDF, EML, TXT, etc.)."""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
message_id = params["message_id"]
attachment_id = params["attachment_id"]
try:
# First get attachment metadata
attachment = client.get(
f"/users/{microsoft_user}/messages/{message_id}/attachments/{attachment_id}",
)
attachment_name = attachment.get("name", "attachment")
content_type = attachment.get("contentType", "")
# Get raw content
raw_content = client.get(
f"/users/{microsoft_user}/messages/{message_id}/attachments/{attachment_id}/$value",
)
if not isinstance(raw_content, bytes):
raise OperationError(
code="INVALID_ATTACHMENT",
message="Could not retrieve attachment content as bytes"
)
extracted_text = None
file_type = None
extracted_files = [] # Track files extracted from EML
# Handle different file types
if attachment_name.lower().endswith('.pdf') or 'pdf' in content_type.lower():
file_type = "PDF"
extracted_text = self._extract_text_from_pdf(raw_content)
elif attachment_name.lower().endswith('.pptx') or 'presentationml' in content_type.lower():
file_type = "PPTX"
extracted_text = self._extract_text_from_pptx(raw_content)
elif attachment_name.lower().endswith('.eml') or content_type == 'message/rfc822':
file_type = "EML"
extracted_text, extracted_files = self._extract_text_from_eml(raw_content, attachment_name)
elif attachment_name.lower().endswith(('.txt', '.md', '.log', '.xml', '.json', '.csv')):
file_type = "TEXT"
try:
extracted_text = raw_content.decode('utf-8')
except:
try:
extracted_text = raw_content.decode('latin-1')
except:
extracted_text = "Error: Could not decode text file"
else:
raise OperationError(
code="UNSUPPORTED_FILE_TYPE",
message=f"File type not supported for text extraction: {attachment_name}",
details={"supported_types": ["PDF", "PPTX", "EML", "TXT", "MD", "LOG", "XML", "JSON", "CSV"]}
)
result = {
"attachment": {
"id": attachment_id,
"name": attachment_name,
"contentType": content_type,
"file_type": file_type,
"size": len(raw_content)
},
"extracted_text": extracted_text,
"text_length": len(extracted_text) if extracted_text else 0
}
# Add extracted files info if any (from EML)
if extracted_files:
result["extracted_files"] = extracted_files
return result
except OperationError:
raise
except Exception as e:
raise OperationError(
code="READ_ATTACHMENT_FAILED",
message=f"Failed to read attachment text: {str(e)}",
details={"message_id": message_id, "attachment_id": attachment_id}
)
def _extract_text_from_pdf(self, pdf_bytes: bytes) -> str:
"""Extract text from PDF bytes."""
try:
from PyPDF2 import PdfReader
import io
pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file)
text_parts = []
for i, page in enumerate(reader.pages):
page_text = page.extract_text()
text_parts.append(f"--- PAGE {i+1}/{len(reader.pages)} ---\n\n{page_text}")
return "\n\n".join(text_parts)
except ImportError:
return "Error: PyPDF2 library not installed. Install with: pip install PyPDF2"
except Exception as e:
return f"Error extracting PDF text: {str(e)}"
def _extract_text_from_pptx(self, pptx_bytes: bytes) -> str:
"""Extract text from PowerPoint (.pptx) bytes."""
try:
from pptx import Presentation
import io
pptx_file = io.BytesIO(pptx_bytes)
prs = Presentation(pptx_file)
text_parts = []
for i, slide in enumerate(prs.slides):
slide_text = []
slide_text.append(f"={'=' * 80}")
slide_text.append(f"SLIDE {i+1}/{len(prs.slides)}")
slide_text.append(f"{'=' * 80}\n")
# Extract text from all shapes in the slide
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text)
# Check for tables
if hasattr(shape, "table"):
slide_text.append("\n[TABLE]")
for row in shape.table.rows:
row_text = " | ".join([cell.text for cell in row.cells])
slide_text.append(row_text)
text_parts.append("\n".join(slide_text))
return "\n\n".join(text_parts)
except ImportError:
return "Error: python-pptx library not installed. Install with: pip install python-pptx"
except Exception as e:
return f"Error extracting PPTX text: {str(e)}"
def _extract_text_from_eml(self, eml_bytes: bytes, eml_name: str = "email.eml") -> tuple:
"""Extract text and attachments list from EML file. Returns (text, list_of_extracted_files)."""
try:
import email
from email import policy
import io
msg = email.message_from_binary_file(io.BytesIO(eml_bytes), policy=policy.default)
parts = []
parts.append(f"From: {msg.get('from', 'N/A')}")
parts.append(f"To: {msg.get('to', 'N/A')}")
parts.append(f"Subject: {msg.get('subject', 'N/A')}")
parts.append(f"Date: {msg.get('date', 'N/A')}")
parts.append("\n" + "=" * 80 + "\n")
# Extract body
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
try:
body = part.get_content()
parts.append("BODY (Plain Text):\n")
parts.append(body)
except:
pass
else:
try:
body = msg.get_content()
parts.append("BODY:\n")
parts.append(body)
except:
pass
# Extract attachments and process PDF files
attachments = []
pdf_texts = []
saved_files = [] # Track all saved files with metadata
for part in msg.walk():
filename = part.get_filename()
if filename:
content_type = part.get_content_type()
try:
attachment_content = part.get_content()
size = len(attachment_content)
attachments.append(f" • {filename} ({content_type}, {size} bytes)")
# If it's a PDF, extract text from it AND save to disk
if filename.lower().endswith('.pdf') or 'pdf' in content_type.lower():
parts.append(f"\n{'=' * 80}")
parts.append(f"\n📄 EXTRACTING PDF: {filename}\n")
parts.append("=" * 80 + "\n")
pdf_text = self._extract_text_from_pdf(attachment_content)
parts.append(pdf_text)
# Save PDF to disk for later use
try:
import os
from datetime import datetime
attachments_dir = "/app/attachments"
os.makedirs(attachments_dir, exist_ok=True)
# Use timestamp + original filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = filename.replace('/', '_').replace('\\', '_')
pdf_path = os.path.join(attachments_dir, f"{timestamp}_{safe_filename}")
with open(pdf_path, 'wb') as f:
f.write(attachment_content)
saved_files.append({
"type": "PDF",
"original_name": filename,
"saved_path": pdf_path,
"size": size
})
parts.append(f"\n💾 PDF SAVED TO: {pdf_path}\n")
except Exception as save_err:
parts.append(f"\n⚠️ Warning: Could not save PDF to disk: {str(save_err)}\n")
except Exception as e:
attachments.append(f" • {filename} (error: {str(e)})")
if attachments:
parts.append("\n" + "=" * 80)
parts.append("\nATTACHMENTS FOUND:")
parts.extend(attachments)
return ("\n".join(parts), saved_files)
except Exception as e:
return (f"Error extracting EML content: {str(e)}", [])
def _action_search(self, params: Dict) -> Dict:
"""Search emails using Microsoft Search API.
Uses the /search/query endpoint with KQL (Keyword Query Language).
Supports full-text search across subject, body, and attachments.
Query examples:
- "belluzzo" - Search for keyword
- "from:giovanni.belluzzo@infocert.it" - Search by sender
- "subject:meeting" - Search in subject
- "hasattachments:true" - Only emails with attachments
"""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
query = params["query"]
max_results = params.get("max_results", 25)
enable_top_results = params.get("enable_top_results", False)
# Build search request
search_request = {
"requests": [
{
"entityTypes": ["message"],
"query": {
"queryString": query
},
"from": 0,
"size": max_results
}
]
}
# Enable relevance ranking if requested
if enable_top_results:
search_request["requests"][0]["enableTopResults"] = True
try:
response = client.post(
"/search/query",
json=search_request
)
# Extract results
results = []
if "value" in response and len(response["value"]) > 0:
hits_containers = response["value"][0].get("hitsContainers", [])
if hits_containers:
hits = hits_containers[0].get("hits", [])
total = hits_containers[0].get("total", 0)
more_available = hits_containers[0].get("moreResultsAvailable", False)
for hit in hits:
resource = hit.get("resource", {})
hit_id = hit.get("hitId")
conversation_id = resource.get("conversationId")
subject = resource.get("subject")
received_dt = resource.get("receivedDateTime")
# Try to get message ID from hitId or fetch it using conversationId
message_id = None
if hit_id:
# hitId format is often a URL, extract the ID part
if "/" in hit_id:
message_id = hit_id.split("/")[-1]
else:
message_id = hit_id
# If we still don't have an ID and have conversationId, fetch the first matching message
if not message_id and conversation_id:
try:
# Query for messages with this conversationId and subject
query_params = {
"$filter": f"conversationId eq '{conversation_id}'",
"$select": "id,subject,receivedDateTime",
"$top": 10
}
if subject:
query_params["$filter"] += f" and subject eq '{subject.replace("'", "''")}'"
messages_response = client.get(
f"/users/{microsoft_user}/messages",
params=query_params
)
# Find the message with matching receivedDateTime
for msg in messages_response.get("value", []):
if msg.get("receivedDateTime") == received_dt:
message_id = msg.get("id")
break
# If no exact match, use the first message in conversation
if not message_id and messages_response.get("value"):
message_id = messages_response["value"][0].get("id")
except Exception as e:
# If fetching fails, continue without ID
pass
results.append({
"id": message_id,
"subject": subject,
"from": resource.get("from", {}).get("emailAddress", {}),
"receivedDateTime": received_dt,
"hasAttachments": resource.get("hasAttachments", False),
"bodyPreview": resource.get("bodyPreview", ""),
"importance": resource.get("importance", "normal"),
"isRead": resource.get("isRead", False),
"conversationId": conversation_id,
"rank": hit.get("rank", 0),
"summary": hit.get("summary", "")
})
return {
"results": results,
"count": len(results),
"total": total,
"more_available": more_available,
"query": query
}
return {
"results": [],
"count": 0,
"total": 0,
"more_available": False,
"query": query
}
except Exception as e:
raise OperationError(
code="SEARCH_FAILED",
message=f"Failed to search emails: {str(e)}",
details={"query": query}
)
def _action_get_conversation(self, params: Dict) -> Dict:
"""Get all messages in a conversation thread.
Retrieves all emails that are part of the same conversation.
Can be called with either:
- conversation_id: Direct conversation ID
- message_id: Will fetch the message first to get its conversation_id
"""
access_token = params["access_token"]
microsoft_user = params["microsoft_user"]
client = MicrosoftGraphClient(access_token=access_token)
conversation_id = params.get("conversation_id")
message_id = params.get("message_id")
try:
# If message_id provided, get conversation_id from it
if not conversation_id and message_id:
message = client.get(
f"/users/{microsoft_user}/messages/{message_id}",
params={"$select": "conversationId"}
)
conversation_id = message.get("conversationId")
if not conversation_id:
raise OperationError(
code="NO_CONVERSATION_ID",
message="Message does not have a conversation ID",
details={"message_id": message_id}
)
# Get all messages in the conversation
# Note: We cannot combine $filter with $orderby on conversationId due to API limitations
# So we fetch all matching messages and sort them client-side
query_params = {
"$filter": f"conversationId eq '{conversation_id}'",
"$select": "id,subject,from,receivedDateTime,isRead,hasAttachments,bodyPreview,conversationId",
"$top": 100
}
response = client.get(
f"/users/{microsoft_user}/messages",
params=query_params
)
messages = response.get("value", [])
# Sort messages client-side by receivedDateTime
messages.sort(key=lambda x: x.get("receivedDateTime", ""))
return {
"conversation_id": conversation_id,
"messages": messages,
"count": len(messages)
}
except OperationError:
raise
except Exception as e:
raise OperationError(
code="GET_CONVERSATION_FAILED",
message=f"Failed to get conversation: {str(e)}",
details={
"conversation_id": conversation_id,
"message_id": message_id
}
)
def _guess_content_type(self, filename: str) -> str:
"""Guess content type from filename extension."""
ext = filename.lower().split('.')[-1]
content_types = {
'pdf': 'application/pdf',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xls': 'application/vnd.ms-excel',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'ppt': 'application/vnd.ms-powerpoint',
'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'txt': 'text/plain',
'html': 'text/html',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'zip': 'application/zip',
'eml': 'message/rfc822'
}
return content_types.get(ext, 'application/octet-stream')