Skip to main content
Glama
operation_queue.py9.22 kB
"""操作队列管理器""" import asyncio from typing import List, Dict, Any, Optional, Callable from dataclasses import dataclass, field from enum import Enum import time import uuid class OperationStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class OperationType(Enum): EXCEL = "excel" POWERPOINT = "powerpoint" WORD = "word" @dataclass class QueuedOperation: id: str type: OperationType priority: int handler: str method: str args: List[Any] kwargs: Dict[str, Any] status: OperationStatus = OperationStatus.PENDING created_at: float = field(default_factory=time.time) started_at: Optional[float] = None completed_at: Optional[float] = None result: Any = None error: Optional[str] = None class OperationQueue: """批量操作队列管理器""" def __init__(self, max_concurrent: int = 3): self.max_concurrent = max_concurrent self.operations: Dict[str, QueuedOperation] = {} self.pending_queue: List[str] = [] self.running_operations: Dict[str, asyncio.Task] = {} self.handlers: Dict[str, Any] = {} self._lock = asyncio.Lock() def register_handler(self, name: str, handler: Any): """注册操作处理器""" self.handlers[name] = handler async def add_operation(self, op_type: OperationType, handler: str, method: str, args: List[Any] = None, kwargs: Dict[str, Any] = None, priority: int = 0) -> str: """添加操作到队列""" op_id = str(uuid.uuid4()) operation = QueuedOperation( id=op_id, type=op_type, priority=priority, handler=handler, method=method, args=args or [], kwargs=kwargs or {} ) async with self._lock: self.operations[op_id] = operation self.pending_queue.append(op_id) self.pending_queue.sort(key=lambda x: self.operations[x].priority, reverse=True) # 尝试启动操作 await self._process_queue() return op_id async def add_batch_operations(self, operations: List[Dict[str, Any]]) -> List[str]: """批量添加操作""" op_ids = [] for op_data in operations: op_id = await self.add_operation( op_type=OperationType(op_data['type']), handler=op_data['handler'], method=op_data['method'], args=op_data.get('args', []), kwargs=op_data.get('kwargs', {}), priority=op_data.get('priority', 0) ) op_ids.append(op_id) return op_ids async def _process_queue(self): """处理队列中的操作""" async with self._lock: # 检查是否可以启动新操作 while (len(self.running_operations) < self.max_concurrent and self.pending_queue): op_id = self.pending_queue.pop(0) operation = self.operations[op_id] # 启动操作 task = asyncio.create_task(self._execute_operation(operation)) self.running_operations[op_id] = task operation.status = OperationStatus.RUNNING operation.started_at = time.time() async def _execute_operation(self, operation: QueuedOperation): """执行单个操作""" try: # 获取处理器 if operation.handler not in self.handlers: raise ValueError(f"Handler '{operation.handler}' not registered") handler = self.handlers[operation.handler] method = getattr(handler, operation.method) # 执行操作 if asyncio.iscoroutinefunction(method): result = await method(*operation.args, **operation.kwargs) else: result = method(*operation.args, **operation.kwargs) # 更新操作状态 operation.result = result operation.status = OperationStatus.COMPLETED operation.completed_at = time.time() except Exception as e: operation.error = str(e) operation.status = OperationStatus.FAILED operation.completed_at = time.time() finally: # 清理运行中的操作 async with self._lock: if operation.id in self.running_operations: del self.running_operations[operation.id] # 继续处理队列 await self._process_queue() async def wait_for_operation(self, op_id: str, timeout: Optional[float] = None) -> QueuedOperation: """等待操作完成""" start_time = time.time() while True: operation = self.operations.get(op_id) if not operation: raise ValueError(f"Operation {op_id} not found") if operation.status in [OperationStatus.COMPLETED, OperationStatus.FAILED, OperationStatus.CANCELLED]: return operation if timeout and (time.time() - start_time) > timeout: raise asyncio.TimeoutError(f"Operation {op_id} timed out") await asyncio.sleep(0.1) async def wait_for_all(self, op_ids: List[str], timeout: Optional[float] = None) -> List[QueuedOperation]: """等待多个操作完成""" results = [] for op_id in op_ids: result = await self.wait_for_operation(op_id, timeout) results.append(result) return results async def cancel_operation(self, op_id: str) -> bool: """取消操作""" async with self._lock: operation = self.operations.get(op_id) if not operation: return False if operation.status == OperationStatus.PENDING: # 从队列中移除 if op_id in self.pending_queue: self.pending_queue.remove(op_id) operation.status = OperationStatus.CANCELLED return True elif operation.status == OperationStatus.RUNNING: # 取消运行中的任务 task = self.running_operations.get(op_id) if task: task.cancel() del self.running_operations[op_id] operation.status = OperationStatus.CANCELLED return True return False def get_operation_status(self, op_id: str) -> Optional[Dict[str, Any]]: """获取操作状态""" operation = self.operations.get(op_id) if not operation: return None return { "id": operation.id, "type": operation.type.value, "status": operation.status.value, "priority": operation.priority, "created_at": operation.created_at, "started_at": operation.started_at, "completed_at": operation.completed_at, "duration": (operation.completed_at - operation.started_at) if operation.started_at and operation.completed_at else None, "result": operation.result, "error": operation.error } def get_queue_stats(self) -> Dict[str, Any]: """获取队列统计信息""" pending = sum(1 for op in self.operations.values() if op.status == OperationStatus.PENDING) running = sum(1 for op in self.operations.values() if op.status == OperationStatus.RUNNING) completed = sum(1 for op in self.operations.values() if op.status == OperationStatus.COMPLETED) failed = sum(1 for op in self.operations.values() if op.status == OperationStatus.FAILED) return { "total_operations": len(self.operations), "pending": pending, "running": running, "completed": completed, "failed": failed, "max_concurrent": self.max_concurrent, "queue_length": len(self.pending_queue) } async def clear_completed(self): """清理已完成的操作""" async with self._lock: completed_ids = [ op_id for op_id, op in self.operations.items() if op.status in [OperationStatus.COMPLETED, OperationStatus.FAILED, OperationStatus.CANCELLED] ] for op_id in completed_ids: del self.operations[op_id] async def shutdown(self): """关闭队列,取消所有操作""" async with self._lock: # 取消所有运行中的任务 for task in self.running_operations.values(): task.cancel() # 等待所有任务完成 if self.running_operations: await asyncio.gather(*self.running_operations.values(), return_exceptions=True) # 清空队列 self.operations.clear() self.pending_queue.clear() self.running_operations.clear() # 全局队列实例 operation_queue = OperationQueue()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/walkingzzzy/office-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server