Skip to main content
Glama

MCP Project Orchestrator

printing.py19.8 kB
""" Print server integration for PrintCast Agent. Handles printing operations using CUPS and PDF generation. """ import asyncio import os import tempfile from typing import Any, Dict, List, Optional from datetime import datetime from pathlib import Path import subprocess import base64 import structlog from pydantic import BaseModel, Field from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import mm from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle from reportlab.lib import colors from jinja2 import Template logger = structlog.get_logger(__name__) class PrintJob(BaseModel): """Represents a print job.""" job_id: str session_id: str document_path: str printer_name: str status: str = "pending" pages: int = 0 copies: int = 1 created_at: datetime = Field(default_factory=datetime.now) completed_at: Optional[datetime] = None metadata: Dict[str, Any] = Field(default_factory=dict) class PrintManager: """ Manages printing operations. Features: - CUPS integration for local/network printers - PDF generation from content - Print job queue management - Print preview generation """ def __init__(self, config: Dict[str, Any]): """ Initialize print manager. Args: config: Configuration including: - default_printer: Default printer name - cups_server: CUPS server address - temp_dir: Temporary directory for print files - pdf_settings: PDF generation settings """ self.config = config self.default_printer = config.get("default_printer", "default") self.cups_server = config.get("cups_server", "localhost:631") self.temp_dir = Path(config.get("temp_dir", "/tmp/printcast")) self.pdf_settings = config.get("pdf_settings", {}) # Create temp directory self.temp_dir.mkdir(parents=True, exist_ok=True) # Print job tracking self.jobs: Dict[str, PrintJob] = {} self.job_counter = 0 # Check if CUPS is available self.cups_available = False logger.info( "Print manager initialized", default_printer=self.default_printer, temp_dir=str(self.temp_dir) ) async def initialize(self): """Initialize print manager and check CUPS availability.""" try: # Check if CUPS is available result = await asyncio.create_subprocess_exec( "lpstat", "-p", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await result.communicate() if result.returncode == 0: self.cups_available = True logger.info("CUPS is available") # Parse available printers printers = [] for line in stdout.decode().split("\n"): if line.startswith("printer"): parts = line.split() if len(parts) >= 2: printers.append(parts[1]) logger.info( "Available printers", count=len(printers), printers=printers ) else: logger.warning("CUPS not available", stderr=stderr.decode()) except Exception as e: logger.warning("Could not check CUPS availability", error=str(e)) self.cups_available = False async def shutdown(self): """Cleanup resources.""" # Cancel pending jobs for job in self.jobs.values(): if job.status == "pending": job.status = "cancelled" logger.info("Print manager shutdown") def is_available(self) -> bool: """Check if printing is available.""" return self.cups_available async def generate_pdf( self, content: str, title: str = "PrintCast Document", format: str = "A4", output_path: Optional[str] = None ) -> str: """ Generate PDF from content. Args: content: Content to print (text, HTML, or markdown) title: Document title format: Page format output_path: Optional output path Returns: Path to generated PDF """ try: # Generate output path if not provided if not output_path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = self.temp_dir / f"document_{timestamp}.pdf" else: output_path = Path(output_path) # Create PDF document doc = SimpleDocTemplate( str(output_path), pagesize=A4, rightMargin=20*mm, leftMargin=20*mm, topMargin=20*mm, bottomMargin=20*mm ) # Container for the 'Flowable' objects elements = [] # Define styles styles = getSampleStyleSheet() title_style = styles['Title'] heading_style = styles['Heading1'] normal_style = styles['Normal'] # Add title elements.append(Paragraph(title, title_style)) elements.append(Spacer(1, 12)) # Add timestamp timestamp_text = f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" elements.append(Paragraph(timestamp_text, normal_style)) elements.append(Spacer(1, 20)) # Parse and add content if content.startswith("<html>"): # HTML content - parse and convert from bs4 import BeautifulSoup soup = BeautifulSoup(content, "html.parser") for elem in soup.find_all(["h1", "h2", "h3", "p", "ul", "ol"]): if elem.name.startswith("h"): elements.append(Paragraph(elem.text, heading_style)) else: elements.append(Paragraph(elem.text, normal_style)) elements.append(Spacer(1, 6)) elif content.startswith("#"): # Markdown content - convert to PDF elements lines = content.split("\n") for line in lines: if line.startswith("##"): elements.append(Paragraph(line[2:].strip(), heading_style)) elif line.startswith("#"): elements.append(Paragraph(line[1:].strip(), title_style)) elif line.strip(): elements.append(Paragraph(line, normal_style)) elements.append(Spacer(1, 6)) else: # Plain text - split by paragraphs paragraphs = content.split("\n\n") for para in paragraphs: if para.strip(): elements.append(Paragraph(para, normal_style)) elements.append(Spacer(1, 12)) # Build PDF doc.build(elements) logger.info( "PDF generated", path=str(output_path), size=output_path.stat().st_size ) return str(output_path) except Exception as e: logger.error("Failed to generate PDF", error=str(e)) raise async def generate_preview( self, items: List[str], format: str = "pdf" ) -> Dict[str, Any]: """ Generate print preview. Args: items: Content items to preview format: Preview format Returns: Preview information """ try: # Generate preview document timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") preview_path = self.temp_dir / f"preview_{timestamp}.{format}" # Create preview content content = "# Print Preview\n\n" for i, item in enumerate(items, 1): content += f"## Item {i}\n{item}\n\n" # Generate document if format == "pdf": doc_path = await self.generate_pdf( content, title="Print Preview", output_path=str(preview_path) ) else: # HTML preview html_content = f""" <!DOCTYPE html> <html> <head><title>Print Preview</title></head> <body> <h1>Print Preview</h1> {''.join(f'<div>{item}</div>' for item in items)} </body> </html> """ preview_path.write_text(html_content) doc_path = str(preview_path) # Get file info file_stat = preview_path.stat() # Estimate page count (rough) page_count = max(1, len(items) // 3) return { "url": f"file://{doc_path}", "path": doc_path, "pages": page_count, "size": file_stat.st_size, "format": format } except Exception as e: logger.error("Failed to generate preview", error=str(e)) raise async def print_document( self, document_path: str, printer_name: Optional[str] = None, copies: int = 1, options: Optional[Dict[str, str]] = None ) -> str: """ Print a document. Args: document_path: Path to document printer_name: Printer to use copies: Number of copies options: Print options Returns: Print job ID """ if not self.cups_available: logger.warning("CUPS not available, simulating print") return await self._simulate_print(document_path) try: printer = printer_name or self.default_printer # Create print job self.job_counter += 1 job_id = f"job_{self.job_counter}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" job = PrintJob( job_id=job_id, session_id="", document_path=document_path, printer_name=printer, copies=copies, status="pending" ) self.jobs[job_id] = job # Build lpr command cmd = ["lpr", "-P", printer] if copies > 1: cmd.extend(["-#", str(copies)]) if options: for key, value in options.items(): cmd.extend(["-o", f"{key}={value}"]) cmd.append(document_path) # Execute print command result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await result.communicate() if result.returncode == 0: job.status = "printing" logger.info( "Document sent to printer", job_id=job_id, printer=printer, document=document_path ) # Start monitoring job asyncio.create_task(self._monitor_print_job(job_id)) else: job.status = "failed" logger.error( "Failed to print document", job_id=job_id, error=stderr.decode() ) raise RuntimeError(f"Print failed: {stderr.decode()}") return job_id except Exception as e: logger.error("Failed to print document", error=str(e)) raise async def _simulate_print(self, document_path: str) -> str: """Simulate printing when CUPS is not available.""" self.job_counter += 1 job_id = f"sim_job_{self.job_counter}" job = PrintJob( job_id=job_id, session_id="", document_path=document_path, printer_name="simulated", status="simulated" ) self.jobs[job_id] = job logger.info( "Print simulated", job_id=job_id, document=document_path ) # Simulate processing delay await asyncio.sleep(2) job.status = "completed" job.completed_at = datetime.now() return job_id async def _monitor_print_job(self, job_id: str): """Monitor print job status.""" job = self.jobs.get(job_id) if not job: return try: # Poll job status for _ in range(60): # Monitor for up to 60 seconds await asyncio.sleep(1) # Check job status using lpstat result = await asyncio.create_subprocess_exec( "lpstat", "-W", "completed", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await result.communicate() # Simple check - in production would parse lpstat output properly if job_id in stdout.decode(): job.status = "completed" job.completed_at = datetime.now() logger.info("Print job completed", job_id=job_id) break except Exception as e: logger.error( "Error monitoring print job", job_id=job_id, error=str(e) ) job.status = "error" async def cancel_print_job(self, job_id: str) -> bool: """ Cancel a print job. Args: job_id: Job ID to cancel Returns: True if cancelled successfully """ job = self.jobs.get(job_id) if not job: return False if job.status in ["completed", "cancelled", "failed"]: return False try: if self.cups_available: # Cancel using lprm result = await asyncio.create_subprocess_exec( "lprm", "-P", job.printer_name, job_id, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await result.communicate() job.status = "cancelled" logger.info("Print job cancelled", job_id=job_id) return True except Exception as e: logger.error( "Failed to cancel print job", job_id=job_id, error=str(e) ) return False def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]: """Get print job status.""" job = self.jobs.get(job_id) if not job: return None return { "job_id": job.job_id, "status": job.status, "printer": job.printer_name, "document": job.document_path, "created": job.created_at.isoformat(), "completed": job.completed_at.isoformat() if job.completed_at else None } async def get_printer_list(self) -> List[Dict[str, Any]]: """Get list of available printers.""" printers = [] if not self.cups_available: return [{ "name": "simulated", "status": "ready", "default": True }] try: # Get printer list using lpstat result = await asyncio.create_subprocess_exec( "lpstat", "-p", "-d", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await result.communicate() output = stdout.decode() # Parse printer list default_printer = None for line in output.split("\n"): if line.startswith("printer"): parts = line.split() if len(parts) >= 2: printer_name = parts[1] status = "ready" if "enabled" in line else "offline" printers.append({ "name": printer_name, "status": status, "default": False }) elif line.startswith("system default"): parts = line.split(":") if len(parts) >= 2: default_printer = parts[1].strip() # Mark default printer for printer in printers: if printer["name"] == default_printer: printer["default"] = True except Exception as e: logger.error("Failed to get printer list", error=str(e)) return printers async def create_print_batch( self, documents: List[Dict[str, Any]], printer_name: Optional[str] = None ) -> List[str]: """ Create a batch print job. Args: documents: List of documents to print printer_name: Printer to use Returns: List of job IDs """ job_ids = [] for doc in documents: try: # Generate PDF if needed if "content" in doc: doc_path = await self.generate_pdf( doc["content"], title=doc.get("title", "Document") ) else: doc_path = doc["path"] # Print document job_id = await self.print_document( doc_path, printer_name=printer_name, copies=doc.get("copies", 1) ) job_ids.append(job_id) except Exception as e: logger.error( "Failed to print document in batch", document=doc.get("title", "Unknown"), error=str(e) ) return job_ids

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sparesparrow/mcp-project-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server