printing.py•19.8 kB
"""
Print server integration for PrintCast Agent.
Handles printing operations using CUPS and PDF generation.
"""
import asyncio
import os
import tempfile
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
import subprocess
import base64
import structlog
from pydantic import BaseModel, Field
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle
from reportlab.lib import colors
from jinja2 import Template
logger = structlog.get_logger(__name__)
class PrintJob(BaseModel):
"""Represents a print job."""
job_id: str
session_id: str
document_path: str
printer_name: str
status: str = "pending"
pages: int = 0
copies: int = 1
created_at: datetime = Field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class PrintManager:
"""
Manages printing operations.
Features:
- CUPS integration for local/network printers
- PDF generation from content
- Print job queue management
- Print preview generation
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize print manager.
Args:
config: Configuration including:
- default_printer: Default printer name
- cups_server: CUPS server address
- temp_dir: Temporary directory for print files
- pdf_settings: PDF generation settings
"""
self.config = config
self.default_printer = config.get("default_printer", "default")
self.cups_server = config.get("cups_server", "localhost:631")
self.temp_dir = Path(config.get("temp_dir", "/tmp/printcast"))
self.pdf_settings = config.get("pdf_settings", {})
# Create temp directory
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Print job tracking
self.jobs: Dict[str, PrintJob] = {}
self.job_counter = 0
# Check if CUPS is available
self.cups_available = False
logger.info(
"Print manager initialized",
default_printer=self.default_printer,
temp_dir=str(self.temp_dir)
)
async def initialize(self):
"""Initialize print manager and check CUPS availability."""
try:
# Check if CUPS is available
result = await asyncio.create_subprocess_exec(
"lpstat", "-p",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
self.cups_available = True
logger.info("CUPS is available")
# Parse available printers
printers = []
for line in stdout.decode().split("\n"):
if line.startswith("printer"):
parts = line.split()
if len(parts) >= 2:
printers.append(parts[1])
logger.info(
"Available printers",
count=len(printers),
printers=printers
)
else:
logger.warning("CUPS not available", stderr=stderr.decode())
except Exception as e:
logger.warning("Could not check CUPS availability", error=str(e))
self.cups_available = False
async def shutdown(self):
"""Cleanup resources."""
# Cancel pending jobs
for job in self.jobs.values():
if job.status == "pending":
job.status = "cancelled"
logger.info("Print manager shutdown")
def is_available(self) -> bool:
"""Check if printing is available."""
return self.cups_available
async def generate_pdf(
self,
content: str,
title: str = "PrintCast Document",
format: str = "A4",
output_path: Optional[str] = None
) -> str:
"""
Generate PDF from content.
Args:
content: Content to print (text, HTML, or markdown)
title: Document title
format: Page format
output_path: Optional output path
Returns:
Path to generated PDF
"""
try:
# Generate output path if not provided
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = self.temp_dir / f"document_{timestamp}.pdf"
else:
output_path = Path(output_path)
# Create PDF document
doc = SimpleDocTemplate(
str(output_path),
pagesize=A4,
rightMargin=20*mm,
leftMargin=20*mm,
topMargin=20*mm,
bottomMargin=20*mm
)
# Container for the 'Flowable' objects
elements = []
# Define styles
styles = getSampleStyleSheet()
title_style = styles['Title']
heading_style = styles['Heading1']
normal_style = styles['Normal']
# Add title
elements.append(Paragraph(title, title_style))
elements.append(Spacer(1, 12))
# Add timestamp
timestamp_text = f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
elements.append(Paragraph(timestamp_text, normal_style))
elements.append(Spacer(1, 20))
# Parse and add content
if content.startswith("<html>"):
# HTML content - parse and convert
from bs4 import BeautifulSoup
soup = BeautifulSoup(content, "html.parser")
for elem in soup.find_all(["h1", "h2", "h3", "p", "ul", "ol"]):
if elem.name.startswith("h"):
elements.append(Paragraph(elem.text, heading_style))
else:
elements.append(Paragraph(elem.text, normal_style))
elements.append(Spacer(1, 6))
elif content.startswith("#"):
# Markdown content - convert to PDF elements
lines = content.split("\n")
for line in lines:
if line.startswith("##"):
elements.append(Paragraph(line[2:].strip(), heading_style))
elif line.startswith("#"):
elements.append(Paragraph(line[1:].strip(), title_style))
elif line.strip():
elements.append(Paragraph(line, normal_style))
elements.append(Spacer(1, 6))
else:
# Plain text - split by paragraphs
paragraphs = content.split("\n\n")
for para in paragraphs:
if para.strip():
elements.append(Paragraph(para, normal_style))
elements.append(Spacer(1, 12))
# Build PDF
doc.build(elements)
logger.info(
"PDF generated",
path=str(output_path),
size=output_path.stat().st_size
)
return str(output_path)
except Exception as e:
logger.error("Failed to generate PDF", error=str(e))
raise
async def generate_preview(
self,
items: List[str],
format: str = "pdf"
) -> Dict[str, Any]:
"""
Generate print preview.
Args:
items: Content items to preview
format: Preview format
Returns:
Preview information
"""
try:
# Generate preview document
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
preview_path = self.temp_dir / f"preview_{timestamp}.{format}"
# Create preview content
content = "# Print Preview\n\n"
for i, item in enumerate(items, 1):
content += f"## Item {i}\n{item}\n\n"
# Generate document
if format == "pdf":
doc_path = await self.generate_pdf(
content,
title="Print Preview",
output_path=str(preview_path)
)
else:
# HTML preview
html_content = f"""
<!DOCTYPE html>
<html>
<head><title>Print Preview</title></head>
<body>
<h1>Print Preview</h1>
{''.join(f'<div>{item}</div>' for item in items)}
</body>
</html>
"""
preview_path.write_text(html_content)
doc_path = str(preview_path)
# Get file info
file_stat = preview_path.stat()
# Estimate page count (rough)
page_count = max(1, len(items) // 3)
return {
"url": f"file://{doc_path}",
"path": doc_path,
"pages": page_count,
"size": file_stat.st_size,
"format": format
}
except Exception as e:
logger.error("Failed to generate preview", error=str(e))
raise
async def print_document(
self,
document_path: str,
printer_name: Optional[str] = None,
copies: int = 1,
options: Optional[Dict[str, str]] = None
) -> str:
"""
Print a document.
Args:
document_path: Path to document
printer_name: Printer to use
copies: Number of copies
options: Print options
Returns:
Print job ID
"""
if not self.cups_available:
logger.warning("CUPS not available, simulating print")
return await self._simulate_print(document_path)
try:
printer = printer_name or self.default_printer
# Create print job
self.job_counter += 1
job_id = f"job_{self.job_counter}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
job = PrintJob(
job_id=job_id,
session_id="",
document_path=document_path,
printer_name=printer,
copies=copies,
status="pending"
)
self.jobs[job_id] = job
# Build lpr command
cmd = ["lpr", "-P", printer]
if copies > 1:
cmd.extend(["-#", str(copies)])
if options:
for key, value in options.items():
cmd.extend(["-o", f"{key}={value}"])
cmd.append(document_path)
# Execute print command
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
job.status = "printing"
logger.info(
"Document sent to printer",
job_id=job_id,
printer=printer,
document=document_path
)
# Start monitoring job
asyncio.create_task(self._monitor_print_job(job_id))
else:
job.status = "failed"
logger.error(
"Failed to print document",
job_id=job_id,
error=stderr.decode()
)
raise RuntimeError(f"Print failed: {stderr.decode()}")
return job_id
except Exception as e:
logger.error("Failed to print document", error=str(e))
raise
async def _simulate_print(self, document_path: str) -> str:
"""Simulate printing when CUPS is not available."""
self.job_counter += 1
job_id = f"sim_job_{self.job_counter}"
job = PrintJob(
job_id=job_id,
session_id="",
document_path=document_path,
printer_name="simulated",
status="simulated"
)
self.jobs[job_id] = job
logger.info(
"Print simulated",
job_id=job_id,
document=document_path
)
# Simulate processing delay
await asyncio.sleep(2)
job.status = "completed"
job.completed_at = datetime.now()
return job_id
async def _monitor_print_job(self, job_id: str):
"""Monitor print job status."""
job = self.jobs.get(job_id)
if not job:
return
try:
# Poll job status
for _ in range(60): # Monitor for up to 60 seconds
await asyncio.sleep(1)
# Check job status using lpstat
result = await asyncio.create_subprocess_exec(
"lpstat", "-W", "completed",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
# Simple check - in production would parse lpstat output properly
if job_id in stdout.decode():
job.status = "completed"
job.completed_at = datetime.now()
logger.info("Print job completed", job_id=job_id)
break
except Exception as e:
logger.error(
"Error monitoring print job",
job_id=job_id,
error=str(e)
)
job.status = "error"
async def cancel_print_job(self, job_id: str) -> bool:
"""
Cancel a print job.
Args:
job_id: Job ID to cancel
Returns:
True if cancelled successfully
"""
job = self.jobs.get(job_id)
if not job:
return False
if job.status in ["completed", "cancelled", "failed"]:
return False
try:
if self.cups_available:
# Cancel using lprm
result = await asyncio.create_subprocess_exec(
"lprm", "-P", job.printer_name, job_id,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await result.communicate()
job.status = "cancelled"
logger.info("Print job cancelled", job_id=job_id)
return True
except Exception as e:
logger.error(
"Failed to cancel print job",
job_id=job_id,
error=str(e)
)
return False
def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Get print job status."""
job = self.jobs.get(job_id)
if not job:
return None
return {
"job_id": job.job_id,
"status": job.status,
"printer": job.printer_name,
"document": job.document_path,
"created": job.created_at.isoformat(),
"completed": job.completed_at.isoformat() if job.completed_at else None
}
async def get_printer_list(self) -> List[Dict[str, Any]]:
"""Get list of available printers."""
printers = []
if not self.cups_available:
return [{
"name": "simulated",
"status": "ready",
"default": True
}]
try:
# Get printer list using lpstat
result = await asyncio.create_subprocess_exec(
"lpstat", "-p", "-d",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
output = stdout.decode()
# Parse printer list
default_printer = None
for line in output.split("\n"):
if line.startswith("printer"):
parts = line.split()
if len(parts) >= 2:
printer_name = parts[1]
status = "ready" if "enabled" in line else "offline"
printers.append({
"name": printer_name,
"status": status,
"default": False
})
elif line.startswith("system default"):
parts = line.split(":")
if len(parts) >= 2:
default_printer = parts[1].strip()
# Mark default printer
for printer in printers:
if printer["name"] == default_printer:
printer["default"] = True
except Exception as e:
logger.error("Failed to get printer list", error=str(e))
return printers
async def create_print_batch(
self,
documents: List[Dict[str, Any]],
printer_name: Optional[str] = None
) -> List[str]:
"""
Create a batch print job.
Args:
documents: List of documents to print
printer_name: Printer to use
Returns:
List of job IDs
"""
job_ids = []
for doc in documents:
try:
# Generate PDF if needed
if "content" in doc:
doc_path = await self.generate_pdf(
doc["content"],
title=doc.get("title", "Document")
)
else:
doc_path = doc["path"]
# Print document
job_id = await self.print_document(
doc_path,
printer_name=printer_name,
copies=doc.get("copies", 1)
)
job_ids.append(job_id)
except Exception as e:
logger.error(
"Failed to print document in batch",
document=doc.get("title", "Unknown"),
error=str(e)
)
return job_ids