Skip to main content
Glama
juanqui
by juanqui
background_queue.py13.6 kB
"""Background processing queue for PDF tasks. Provides asynchronous job handling with priority, concurrency limits, status tracking, retry logic, and graceful shutdown. Other components can import :class:`BackgroundProcessingQueue`, :class:`JobType`, :class:`JobStatus`, and :class:`Priority`. """ import asyncio import logging import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from enum import Enum, auto from typing import Any, Awaitable, Callable, Dict, Optional logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) class JobType(Enum): FILE_WATCHER = auto() WEB_UPLOAD = auto() class JobStatus(Enum): QUEUED = auto() PROCESSING = auto() COMPLETED = auto() FAILED = auto() CANCELED = auto() class Priority(Enum): HIGH = 0 NORMAL = 1 LOW = 2 @dataclass class Job: job_id: str job_type: JobType metadata: Dict[str, Any] priority: Priority status: JobStatus = JobStatus.QUEUED attempts: int = 0 max_retries: int = 3 # Optional callback that actually processes the job. # It must be an async callable accepting the Job instance. processor: Optional[Callable[["Job"], Awaitable[None]]] = None class BackgroundProcessingQueue: """Async job queue with priority and worker management. Example ------- >>> async def dummy_processor(job: Job): ... await asyncio.sleep(0.1) # simulate work ... >>> queue = BackgroundProcessingQueue(concurrency=2, default_processor=dummy_processor) >>> job_id = await queue.add_job( ... job_type=JobType.WEB_UPLOAD, ... metadata={"filename": "example.pdf"}, ... priority=Priority.HIGH, ... ) >>> status = await queue.get_status(job_id) """ def __init__( self, concurrency: int = 4, default_processor: Optional[Callable[[Job], Awaitable[None]]] = None, max_retries: int = 3, thread_pool_size: int = 2, ) -> None: self._queue: asyncio.PriorityQueue[Job] = asyncio.PriorityQueue() self._jobs: Dict[str, Job] = {} self._concurrency = concurrency self._workers: list[asyncio.Task] = [] self._shutdown_event = asyncio.Event() self._default_processor = default_processor self._max_retries = max_retries self._stats: Dict[JobStatus, int] = defaultdict(int) self._counter = 0 # Counter to avoid Job comparison issues # Thread pool for CPU-intensive operations self._thread_pool = ThreadPoolExecutor(max_workers=thread_pool_size) self._use_thread_pool_for_types = {JobType.WEB_UPLOAD, JobType.FILE_WATCHER} for _ in range(self._concurrency): worker = asyncio.create_task(self._worker_loop()) self._workers.append(worker) async def _worker_loop(self) -> None: while not self._shutdown_event.is_set(): try: # Wait for a job; timeout allows checking shutdown flag. priority_job_tuple = await asyncio.wait_for(self._queue.get(), timeout=1.0) _, _, job = priority_job_tuple # Extract job from (priority, counter, job) tuple except asyncio.TimeoutError: continue except asyncio.CancelledError: # Mark any pending job as done when cancelled try: self._queue.task_done() except ValueError: pass break # Check shutdown flag again before processing if self._shutdown_event.is_set() or job.status == JobStatus.CANCELED: self._queue.task_done() continue job.status = JobStatus.PROCESSING self._update_stats() processor = job.processor or self._default_processor try: if processor is None: raise RuntimeError("No processor defined for job %s" % job.job_id) # Check shutdown flag before starting processor if self._shutdown_event.is_set(): job.status = JobStatus.CANCELED self._queue.task_done() continue # Run CPU-intensive jobs (PDF processing) in thread pool if job.job_type in self._use_thread_pool_for_types: logger.info( f"🧵 BACKGROUND QUEUE: Running job {job.job_id} ({job.job_type.name}) " f"in thread pool to avoid blocking web server" ) await asyncio.get_event_loop().run_in_executor( self._thread_pool, self._run_processor_sync, processor, job ) logger.info(f"🧵 BACKGROUND QUEUE: Job {job.job_id} completed in thread pool") else: # Run other jobs directly in event loop logger.info( f"⚡ BACKGROUND QUEUE: Running job {job.job_id} ({job.job_type.name}) in main event loop" ) await processor(job) # Check if we should still mark as completed (might have been cancelled) if not self._shutdown_event.is_set(): job.status = JobStatus.COMPLETED logger.info("Job %s completed", job.job_id) else: job.status = JobStatus.CANCELED logger.info("Job %s cancelled during processing", job.job_id) except asyncio.CancelledError: job.status = JobStatus.CANCELED logger.info("Job %s cancelled", job.job_id) except Exception as exc: # noqa: BLE001 job.attempts += 1 logger.exception("Job %s failed on attempt %s: %s", job.job_id, job.attempts, exc) # Don't retry if shutting down if not self._shutdown_event.is_set() and job.attempts < (job.max_retries or self._max_retries): job.status = JobStatus.QUEUED self._counter += 1 await self._queue.put((job.priority.value, self._counter, job)) logger.info("Re‑queued job %s (attempt %s)", job.job_id, job.attempts) else: job.status = JobStatus.FAILED logger.error("Job %s marked as FAILED after %s attempts", job.job_id, job.attempts) self._update_stats() self._queue.task_done() def _update_stats(self) -> None: # Reset and recount self._stats = defaultdict(int) for job in self._jobs.values(): self._stats[job.status] += 1 async def add_job( self, job_type: JobType, metadata: Dict[str, Any], priority: Priority = Priority.NORMAL, processor: Optional[Callable[[Job], Awaitable[None]]] = None, max_retries: Optional[int] = None, ) -> str: """Enqueue a new job. Returns ------- str Unique job identifier. """ job_id = str(uuid.uuid4()) job = Job( job_id=job_id, job_type=job_type, metadata=metadata, priority=priority, processor=processor, max_retries=max_retries or self._max_retries, ) self._jobs[job_id] = job self._counter += 1 await self._queue.put((priority.value, self._counter, job)) self._update_stats() logger.info("Enqueued job %s with priority %s", job_id, priority.name) return job_id async def get_status(self, job_id: str) -> Optional[JobStatus]: """Return the current status of a job, or ``None`` if unknown.""" job = self._jobs.get(job_id) return job.status if job else None async def cancel_job(self, job_id: str) -> bool: """Attempt to cancel a queued job. Returns ``True`` if the job was found and cancelled, ``False`` otherwise. """ job = self._jobs.get(job_id) if not job: return False if job.status in {JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELED}: return False job.status = JobStatus.CANCELED self._update_stats() logger.info("Cancelled job %s", job_id) return True async def get_statistics(self) -> Dict[JobStatus, int]: """Snapshot of job counts per status.""" # Return a copy to avoid external mutation. return dict(self._stats) def _run_processor_sync(self, processor: Callable[[Job], Awaitable[None]], job: Job) -> None: """Run an async processor function synchronously in a thread. This method is called in the thread pool to execute CPU-intensive processors. """ try: # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(processor(job)) finally: loop.close() except Exception as e: logger.error(f"Error running processor in thread for job {job.job_id}: {e}") raise async def shutdown(self, wait: bool = True, timeout: float = 5.0) -> None: """Signal workers to stop and optionally wait for the queue to drain. Parameters ---------- wait: bool, default True If ``True``, the method will wait for all queued jobs to be processed (or cancelled) before returning. timeout: float, default 5.0 Maximum time to wait for jobs to complete before forcing shutdown. """ logger.info("Initiating background queue shutdown...") self._shutdown_event.set() if wait: try: # Wait for queue to drain with timeout logger.info(f"Waiting for {self._queue.qsize()} queued jobs to complete (timeout: {timeout}s)...") await asyncio.wait_for(self._queue.join(), timeout=timeout) logger.info("All queued jobs completed") except asyncio.TimeoutError: logger.warning( f"Timeout ({timeout}s) reached, forcing shutdown with {self._queue.qsize()} jobs remaining" ) # Cancel any remaining jobs await self._cancel_remaining_jobs() # Cancel all worker tasks logger.info(f"Cancelling {len(self._workers)} worker tasks...") for w in self._workers: if not w.done(): w.cancel() # Wait for workers to finish with a timeout try: await asyncio.wait_for(asyncio.gather(*self._workers, return_exceptions=True), timeout=2.0) logger.info("All worker tasks cancelled successfully") except asyncio.TimeoutError: logger.warning("Worker task cancellation timeout, proceeding with shutdown") # Shutdown thread pool with shorter wait time for responsiveness logger.info("Shutting down thread pool...") try: # ThreadPoolExecutor.shutdown() doesn't have a timeout parameter # So we run it with asyncio timeout wrapper await asyncio.wait_for( asyncio.get_event_loop().run_in_executor(None, lambda: self._thread_pool.shutdown(wait=True)), timeout=3.0, ) logger.info("Thread pool shut down successfully") except asyncio.TimeoutError: logger.warning("Thread pool shutdown timeout, forcing termination") # Force shutdown without waiting self._thread_pool.shutdown(wait=False) except Exception as e: logger.warning(f"Thread pool shutdown error (continuing): {e}") logger.info("BackgroundProcessingQueue shut down complete") async def _cancel_remaining_jobs(self) -> None: """Cancel all remaining jobs in the queue.""" cancelled_count = 0 # Cancel jobs that are still queued temp_jobs = [] try: while True: try: priority_job_tuple = self._queue.get_nowait() _, _, job = priority_job_tuple if job.status == JobStatus.QUEUED: job.status = JobStatus.CANCELED cancelled_count += 1 temp_jobs.append(priority_job_tuple) except asyncio.QueueEmpty: break except Exception as e: logger.warning(f"Error during job cancellation: {e}") # Put non-cancelled jobs back (though we're shutting down) for job_tuple in temp_jobs: try: self._queue.task_done() # Mark each extracted job as done except ValueError: pass # Queue might be empty or task_done called too many times if cancelled_count > 0: logger.info(f"Cancelled {cancelled_count} remaining jobs") self._update_stats() # Convenience singleton for the application (optional) # from .config import settings # Example import if you have a settings module # default_queue = BackgroundProcessingQueue(concurrency=settings.max_workers) __all__ = [ "BackgroundProcessingQueue", "JobType", "JobStatus", "Priority", "Job", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server