run_task
Execute a scheduled task in Qinglong Panel and retrieve its execution logs, waiting up to 30 seconds for completion.
Instructions
执行任务并等待完成,返回执行日志(最多等待30秒)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | Yes | 任务 ID |
Implementation Reference
- server.py:91-100 (registration)Registration of the 'run_task' tool in the tools/list response, including name, description, and input schema."name": "run_task", "description": "执行任务并等待完成,返回执行日志(最多等待30秒)", "inputSchema": { "type": "object", "properties": { "task_id": {"type": "integer", "description": "任务 ID"} }, "required": ["task_id"] } },
- server.py:93-99 (schema)Input schema definition for the 'run_task' tool."inputSchema": { "type": "object", "properties": { "task_id": {"type": "integer", "description": "任务 ID"} }, "required": ["task_id"] }
- server.py:337-412 (handler)Handler implementation for 'run_task': launches the task via API, polls its status every 5 seconds for up to 30 seconds, retrieves and returns the log upon completion, or timeout message.elif tool_name == "run_task": task_id = arguments.get("task_id") headers = {"Authorization": f"Bearer {token}"} try: url = f"{QINGLONG_URL}/open/crons/run" resp = requests.put(url, headers=headers, json=[task_id], timeout=10) result = resp.json() if result.get("code") != 200: response = { "jsonrpc": "2.0", "id": request["id"], "error": {"code": -32603, "message": f"启动任务失败: {result}"} } print(json.dumps(response), flush=True) continue except Exception as e: response = { "jsonrpc": "2.0", "id": request["id"], "error": {"code": -32603, "message": f"启动任务失败: {str(e)}"} } print(json.dumps(response), flush=True) continue time.sleep(2) response = None task_started = False for _ in range(6): time.sleep(5) try: status_url = f"{QINGLONG_URL}/open/crons/{task_id}" status_resp = requests.get(status_url, headers=headers, timeout=10) status_result = status_resp.json() if status_result.get("code") == 200: cron = status_result["data"] task_status = cron.get("status") # status: 0=运行中, 1=空闲 if task_status == 0: task_started = True elif task_status == 1 and task_started: log_url = f"{QINGLONG_URL}/open/crons/{task_id}/log" log_resp = requests.get(log_url, headers=headers, timeout=10) log_result = log_resp.json() if log_result.get("code") == 200: response = { "jsonrpc": "2.0", "id": request["id"], "result": {"content": [{"type": "text", "text": log_result["data"]}]} } else: response = { "jsonrpc": "2.0", "id": request["id"], "error": {"code": -32603, "message": f"获取日志失败: {log_result}"} } break except Exception as e: response = { "jsonrpc": "2.0", "id": request["id"], "error": {"code": -32603, "message": f"检查任务失败: {str(e)}"} } break if response is None: response = { "jsonrpc": "2.0", "id": request["id"], "result": {"content": [{"type": "text", "text": f"任务 {task_id} 超时(30秒),请使用 get_task_logs 查看日志"}]} }