"""Utility functions for file operations."""
import email
from email import policy
import io
import os
from typing import Optional, Tuple
def extract_pdf_from_eml(eml_file_path: str, output_dir: str = "/app/attachments") -> Optional[str]:
"""
Extract PDF attachment from EML file and save to disk.
Args:
eml_file_path: Path to EML file
output_dir: Directory to save extracted PDF
Returns:
Path to extracted PDF file, or None if no PDF found
"""
try:
with open(eml_file_path, 'rb') as f:
msg = email.message_from_binary_file(f, policy=policy.default)
for part in msg.walk():
filename = part.get_filename()
if filename and (filename.lower().endswith('.pdf') or
'pdf' in part.get_content_type().lower()):
# Extract PDF
pdf_content = part.get_content()
# Save to disk
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Generate unique filename
base_name = os.path.splitext(filename)[0]
ext = os.path.splitext(filename)[1]
counter = 0
output_path = os.path.join(output_dir, filename)
while os.path.exists(output_path):
counter += 1
output_path = os.path.join(output_dir, f"{base_name}_{counter}{ext}")
# Write PDF
if isinstance(pdf_content, bytes):
with open(output_path, 'wb') as pdf_file:
pdf_file.write(pdf_content)
else:
with open(output_path, 'w') as pdf_file:
pdf_file.write(pdf_content)
return output_path
return None
except Exception as e:
print(f"Error extracting PDF from EML: {str(e)}")
return None
def extract_all_attachments_from_eml(eml_file_path: str, output_dir: str = "/app/attachments") -> list:
"""
Extract all attachments from EML file.
Args:
eml_file_path: Path to EML file
output_dir: Directory to save attachments
Returns:
List of tuples (filename, file_path)
"""
extracted = []
try:
with open(eml_file_path, 'rb') as f:
msg = email.message_from_binary_file(f, policy=policy.default)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
for part in msg.walk():
filename = part.get_filename()
if filename:
content = part.get_content()
# Generate unique filename
base_name = os.path.splitext(filename)[0]
ext = os.path.splitext(filename)[1]
counter = 0
output_path = os.path.join(output_dir, filename)
while os.path.exists(output_path):
counter += 1
output_path = os.path.join(output_dir, f"{base_name}_{counter}{ext}")
# Write file
if isinstance(content, bytes):
with open(output_path, 'wb') as out_file:
out_file.write(content)
else:
with open(output_path, 'w') as out_file:
out_file.write(content)
extracted.append((filename, output_path))
return extracted
except Exception as e:
print(f"Error extracting attachments from EML: {str(e)}")
return []