Skip to main content
Glama
task_manager.py5.9 kB
"""异步任务管理器:管理后台扫描任务""" import asyncio import uuid import sys from datetime import datetime from typing import Dict, Any, Optional from enum import Enum class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class ScanTask: """扫描任务""" def __init__(self, task_id: str, url: str, token: str): self.task_id = task_id self.url = url self.token = token self.status = TaskStatus.PENDING self.progress = 0 self.current_step = "" self.current_phase = "初始化" # 当前阶段 self.logs = [] self.result = None self.error = None self.created_at = datetime.now() self.started_at = None self.completed_at = None # 阶段性数据(实时更新) self.recon_data = None # 侦察阶段数据 self.attack_progress = { # 攻击进度 "total_tools": 0, "tested_tools": 0, "total_attacks": 0, "completed_attacks": 0, "vulnerabilities_found": 0 } def log(self, message: str): """添加日志""" timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {message}" self.logs.append(log_entry) print(f"[Task:{self.task_id[:8]}] {message}", file=sys.stderr) def set_recon_data(self, tools: list, attack_surface: list): """设置侦察阶段数据""" self.recon_data = { "tools_count": len(tools), "tools": [{"name": t.get("name"), "description": t.get("description", "")[:100]} for t in tools], "injectable_count": len(attack_surface), "attack_surface": [ { "tool_name": s.get("tool_name"), "injectable_params": s.get("injectable_params", []), "priority": s.get("priority", "medium") } for s in attack_surface ] } self.attack_progress["total_tools"] = len(attack_surface) def update_attack_progress(self, tested_tools: int = None, completed_attacks: int = None, vulns: int = None): """更新攻击进度""" if tested_tools is not None: self.attack_progress["tested_tools"] = tested_tools if completed_attacks is not None: self.attack_progress["completed_attacks"] = completed_attacks if vulns is not None: self.attack_progress["vulnerabilities_found"] = vulns def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "task_id": self.task_id, "url": self.url, "status": self.status.value, "progress": self.progress, "current_step": self.current_step, "current_phase": self.current_phase, "logs": self.logs[-20:], "recon_data": self.recon_data, "attack_progress": self.attack_progress, "result": self.result, "error": self.error, "created_at": self.created_at.isoformat(), "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, } class TaskManager: """任务管理器 - 单例模式""" _instance = None _tasks: Dict[str, ScanTask] = {} _running_tasks: Dict[str, asyncio.Task] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def create_task(self, url: str, token: str) -> ScanTask: """创建新任务""" task_id = str(uuid.uuid4()) task = ScanTask(task_id, url, token) self._tasks[task_id] = task return task def get_task(self, task_id: str) -> Optional[ScanTask]: """获取任务""" return self._tasks.get(task_id) def list_tasks(self) -> list: """列出所有任务""" return [t.to_dict() for t in self._tasks.values()] def clear_task(self, task_id: str) -> bool: """清除指定任务""" if task_id in self._tasks: del self._tasks[task_id] if task_id in self._running_tasks: self._running_tasks[task_id].cancel() del self._running_tasks[task_id] return True return False def clear_all_tasks(self): """清除所有任务""" for task_id in list(self._running_tasks.keys()): self._running_tasks[task_id].cancel() self._tasks.clear() self._running_tasks.clear() async def start_task(self, task: ScanTask, scan_func): """启动后台任务""" async def run_scan(): try: task.status = TaskStatus.RUNNING task.started_at = datetime.now() task.log("扫描任务开始") # 执行扫描 result = await scan_func(task) task.result = result task.status = TaskStatus.COMPLETED task.progress = 100 task.completed_at = datetime.now() task.log("扫描任务完成") except Exception as e: task.error = str(e) task.status = TaskStatus.FAILED task.completed_at = datetime.now() task.log(f"扫描任务失败: {e}") # 创建后台任务 asyncio_task = asyncio.create_task(run_scan()) self._running_tasks[task.task_id] = asyncio_task return task # 全局任务管理器实例 task_manager = TaskManager()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anntsmart/MCP-Security-Scanner'

If you have feedback or need assistance with the MCP directory API, please join our Discord server