Skip to main content
Glama
progress_tracker.py5.5 kB
"""Progress tracking for OCR processing.""" import time import threading from typing import Optional, Callable, List, Dict from dataclasses import dataclass @dataclass class ProgressUpdate: """Progress update record.""" timestamp: float percentage: float stage: str message: str class ProgressTracker: """Track OCR processing progress with heartbeat support.""" def __init__( self, on_progress: Optional[Callable[[float, str, str], None]] = None, heartbeat_interval: float = 5.0 ): """Initialize progress tracker. Args: on_progress: Callback function called on progress updates. Signature: (percentage: float, stage: str, message: str) -> None heartbeat_interval: 心跳间隔(秒),用于在长时间操作中保持连接活跃 """ self.on_progress = on_progress self.history: List[ProgressUpdate] = [] self._current_percentage = 0.0 self._current_stage = "" self._current_message = "" self.heartbeat_interval = heartbeat_interval self.last_heartbeat = time.time() self._last_update_time = time.time() self._heartbeat_thread: Optional[threading.Thread] = None self._heartbeat_active = False def update( self, percentage: float, stage: str, message: str = "" ): """Update progress. Args: percentage: Progress percentage (0-100) stage: Current stage name message: Progress message """ self._current_percentage = max(0.0, min(100.0, percentage)) self._current_stage = stage self._current_message = message self._last_update_time = time.time() update = ProgressUpdate( timestamp=time.time(), percentage=self._current_percentage, stage=stage, message=message ) self.history.append(update) if self.on_progress: try: self.on_progress( self._current_percentage, stage, message ) self.last_heartbeat = time.time() except Exception: # Silently ignore callback errors pass def send_heartbeat(self): """发送心跳进度更新,用于在长时间操作中保持连接活跃。 即使进度没有变化,也会定期发送进度更新,防止客户端超时。 """ if not self.on_progress: return now = time.time() # 如果距离上次更新超过心跳间隔,发送心跳 if now - self.last_heartbeat >= self.heartbeat_interval: try: # 发送当前进度作为心跳 self.on_progress( self._current_percentage, self._current_stage, f"处理中... ({self._current_message})" ) self.last_heartbeat = now except Exception: # Silently ignore callback errors pass def start_heartbeat(self): """启动自动心跳线程,确保在长时间操作中持续发送心跳。 防止连接因长时间无响应而中断。 """ if self._heartbeat_active or not self.on_progress: return self._heartbeat_active = True def heartbeat_loop(): """心跳循环,定期发送进度更新。""" while self._heartbeat_active: try: time.sleep(self.heartbeat_interval) if self._heartbeat_active: self.send_heartbeat() except Exception: # 忽略心跳错误,继续运行 pass self._heartbeat_thread = threading.Thread( target=heartbeat_loop, daemon=True, # 守护线程,主线程退出时自动退出 name="ProgressHeartbeat" ) self._heartbeat_thread.start() def stop_heartbeat(self): """停止自动心跳线程。""" self._heartbeat_active = False if self._heartbeat_thread and self._heartbeat_thread.is_alive(): # 等待线程结束(最多等待1秒) self._heartbeat_thread.join(timeout=1.0) def get_history(self) -> List[Dict]: """Get progress history as list of dicts. Returns: List of progress update dictionaries """ return [ { "timestamp": update.timestamp, "percentage": update.percentage, "stage": update.stage, "message": update.message } for update in self.history ] def get_current(self) -> Dict: """Get current progress. Returns: Current progress dictionary """ return { "percentage": self._current_percentage, "stage": self._current_stage } def reset(self): """Reset progress tracker.""" self.stop_heartbeat() self.history.clear() self._current_percentage = 0.0 self._current_stage = "" self._current_message = ""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qiao-925/ocr-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server