Skip to main content
Glama
test_job_manager.py28.9 kB
"""Tests for Job Manager initialization logic.""" import json from datetime import datetime from types import SimpleNamespace from typing import Dict, List import pytest from src import job_manager, models, storage def create_dummy_mcp_client() -> dict: def invoke_tool(*args, **kwargs): # pragma: no cover - dummy callable return None client: dict = {"name": "dummy-mcp", "invoke_tool": invoke_tool} return client def create_dummy_storage() -> dict: storage: dict = {"name": "dummy-storage"} return storage def test_create_job_manager_with_dependencies() -> None: mcp_client = create_dummy_mcp_client() storage = create_dummy_storage() manager = job_manager.create_job_manager(mcp_client, storage) assert manager["mcp_client"] is mcp_client assert manager["storage"] is storage def test_mcp_client_injection_is_validated() -> None: storage = create_dummy_storage() with pytest.raises(ValueError): job_manager.create_job_manager(None, storage) def test_storage_injection_is_validated() -> None: mcp_client = create_dummy_mcp_client() with pytest.raises(ValueError): job_manager.create_job_manager(mcp_client, None) def test_dependency_validation_rejects_invalid_inputs() -> None: invalid_client = {"name": "dummy", "type": "not-callable"} storage = create_dummy_storage() with pytest.raises(TypeError): job_manager.create_job_manager(invalid_client, storage) def create_stub_mcp_client(responder): calls: List[Dict[str, object]] = [] def invoke_tool(name: str, arguments: Dict[str, object]): calls.append({"name": name, "arguments": arguments}) return responder(name, arguments) client: Dict[str, object] = {"invoke_tool": invoke_tool, "calls": calls} return client def create_serialized_task(task_id: str, status: str) -> Dict[str, object]: timestamp = datetime.now().astimezone() task = models.create_jules_task( task_id, "Example Title", "Example Description", "owner/repo", "main", status, timestamp, timestamp, "https://example.com/task", ) serialized = models.jules_task_to_dict(task) return serialized def create_storage_manager_with_tasks(tmp_path, tasks: List[Dict[str, object]]): file_path = tmp_path / "tasks.json" storage_manager = storage.create_storage(str(file_path)) for entry in tasks: storage.save_task(storage_manager, entry) return storage_manager def create_manager_with_storage(storage_manager, client=None): mcp_client = client if client is not None else create_dummy_mcp_client() manager = job_manager.create_job_manager(mcp_client, storage_manager) return manager def test_list_tasks_returns_all_tasks(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "pending")) tasks.append(create_serialized_task("task-2", "completed")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager) assert len(result) == 2 found_ids: List[str] = [] for item in result: found_ids.append(item["id"]) assert "task-1" in found_ids assert "task-2" in found_ids def test_list_tasks_filters_pending(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "pending")) tasks.append(create_serialized_task("task-2", "in_progress")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager, status="pending") assert len(result) == 1 assert result[0]["id"] == "task-1" def test_list_tasks_filters_in_progress(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "pending")) tasks.append(create_serialized_task("task-2", "in_progress")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager, status="in_progress") assert len(result) == 1 assert result[0]["id"] == "task-2" def test_list_tasks_filters_completed(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "completed")) tasks.append(create_serialized_task("task-2", "in_progress")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager, status="completed") assert len(result) == 1 assert result[0]["id"] == "task-1" def test_list_tasks_returns_empty_list_for_no_tasks(tmp_path) -> None: tasks: List[Dict[str, object]] = [] storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager) assert result == [] def test_list_tasks_uses_storage_data(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "pending")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) new_task = create_serialized_task("task-2", "pending") storage.save_task(storage_manager, new_task) result = job_manager.list_tasks(manager) assert len(result) == 2 def test_list_tasks_returns_normalized_jules_tasks(tmp_path) -> None: tasks: List[Dict[str, object]] = [] tasks.append(create_serialized_task("task-1", "pending")) storage_manager = create_storage_manager_with_tasks(tmp_path, tasks) manager = create_manager_with_storage(storage_manager) result = job_manager.list_tasks(manager) assert len(result) == 1 task = result[0] assert isinstance(task["created_at"], datetime) assert isinstance(task["updated_at"], datetime) def test_get_task_invokes_mcp_tool(tmp_path) -> None: raw_task = create_serialized_task("task-10", "pending") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps(raw_task) response = {"content": [{"type": "text", "text": payload}]} return response stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.get_task(manager, "task-10") assert result["id"] == "task-10" assert len(stub_client["calls"]) == 1 call = stub_client["calls"][0] assert call["name"] == "jules_get_task" assert call["arguments"] == {"taskId": "task-10"} def test_get_task_updates_storage(tmp_path) -> None: raw_task = create_serialized_task("task-11", "completed") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps(raw_task) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.get_task(manager, "task-11") saved = storage.get_task(storage_manager, "task-11") assert saved["status"] == "completed" def test_get_task_handles_not_found(tmp_path) -> None: def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"error": "not_found"}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(KeyError): job_manager.get_task(manager, "missing-id") def test_get_task_validates_identifier(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.get_task(manager, " ") def test_get_task_rejects_invalid_identifier_format(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.get_task(manager, "bad id") def test_get_task_handles_mcp_errors(tmp_path) -> None: def responder(name: str, arguments: Dict[str, object]): raise RuntimeError("mcp failure") stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.get_task(manager, "task-12") def test_get_task_requires_valid_response(tmp_path) -> None: def responder(name: str, arguments: Dict[str, object]): return {"content": [{"type": "text", "text": "not-json"}]} stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(ValueError): job_manager.get_task(manager, "task-13") def test_create_task_invokes_mcp_tool(tmp_path) -> None: raw_task = create_serialized_task("task-20", "pending") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps(raw_task) response = {"content": [{"type": "text", "text": payload}]} return response stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.create_task(manager, "Example task", "owner/repo", branch="dev") assert result["id"] == "task-20" assert len(stub_client["calls"]) == 1 call = stub_client["calls"][0] assert call["name"] == "jules_create_task" assert call["arguments"] == { "description": "Example task", "repository": "owner/repo", "branch": "dev", } def test_create_task_defaults_branch(tmp_path) -> None: raw_task = create_serialized_task("task-21", "pending") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps(raw_task) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.create_task(manager, "Example", "owner/repo") call = stub_client["calls"][0] assert call["arguments"]["branch"] == "main" def test_create_task_updates_storage(tmp_path) -> None: raw_task = create_serialized_task("task-22", "in_progress") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps(raw_task) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.create_task(manager, "Example", "owner/repo") saved = storage.get_task(storage_manager, "task-22") assert saved["status"] == "in_progress" def test_create_task_validates_inputs(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.create_task(manager, "", "owner/repo") with pytest.raises(ValueError): job_manager.create_task(manager, "Example", " ") def test_create_task_handles_mcp_failure(tmp_path) -> None: def responder(name: str, arguments: Dict[str, object]): raise RuntimeError("mcp failure") stub_client = create_stub_mcp_client(responder) storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.create_task(manager, "Example", "owner/repo") def create_storage_with_existing_task(tmp_path, task_id="task-30", status="in_progress"): task = create_serialized_task(task_id, status) storage_manager = create_storage_manager_with_tasks(tmp_path, [task]) return storage_manager def test_send_message_invokes_mcp_tool(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-30") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.send_message(manager, "task-30", "Hello Jules") assert result assert len(stub_client["calls"]) == 1 call = stub_client["calls"][0] assert call["name"] == "jules_send_message" assert call["arguments"] == {"taskId": "task-30", "message": "Hello Jules"} def test_send_message_updates_chat_history(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-31") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.send_message(manager, "task-31", "Progress update") stored = storage.get_task(storage_manager, "task-31") history = stored.get("chat_history", []) assert len(history) == 1 last = history[-1] assert last["type"] == "user" assert last["content"] == "Progress update" def test_send_message_validates_task_identifier(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-32") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.send_message(manager, " ", "Hello") def test_send_message_validates_message(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-33") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.send_message(manager, "task-33", " ") def test_send_message_requires_existing_task(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(KeyError): job_manager.send_message(manager, "missing", "Hello") def test_send_message_handles_failure_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-34") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": False, "error": "not sent"}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.send_message(manager, "task-34", "Hello") assert result is False def test_send_message_handles_mcp_exception(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-35") def responder(name: str, arguments: Dict[str, object]): raise RuntimeError("transport error") stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.send_message(manager, "task-35", "Hello") def test_send_message_requires_valid_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-36") def responder(name: str, arguments: Dict[str, object]): return {"content": [{"type": "text", "text": "not-json"}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(ValueError): job_manager.send_message(manager, "task-36", "Hello") def test_send_message_handles_error_payload(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-37") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"error": "delivery_failed"}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.send_message(manager, "task-37", "Hello") def test_approve_plan_invokes_mcp_tool(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-40", status="waiting_approval") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.approve_plan(manager, "task-40") assert result is True assert len(stub_client["calls"]) == 1 call = stub_client["calls"][0] assert call["name"] == "jules_approve_plan" assert call["arguments"] == {"taskId": "task-40"} def test_approve_plan_updates_status(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-41", status="waiting_approval") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.approve_plan(manager, "task-41") stored = storage.get_task(storage_manager, "task-41") assert stored["status"] == "in_progress" def test_approve_plan_requires_waiting_status(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-42", status="pending") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.approve_plan(manager, "task-42") def test_approve_plan_handles_failure_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-43", status="waiting_approval") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": False}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.approve_plan(manager, "task-43") assert result is False stored = storage.get_task(storage_manager, "task-43") assert stored["status"] == "waiting_approval" def test_approve_plan_handles_error_payload(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-44", status="waiting_approval") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"error": "approval_failed"}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.approve_plan(manager, "task-44") def test_approve_plan_validates_task_identifier(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-45", status="waiting_approval") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.approve_plan(manager, " ") def test_approve_plan_requires_existing_task(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(KeyError): job_manager.approve_plan(manager, "missing") def test_approve_plan_requires_valid_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-46", status="waiting_approval") def responder(name: str, arguments: Dict[str, object]): return {"content": [{"type": "text", "text": "not-json"}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(ValueError): job_manager.approve_plan(manager, "task-46") def test_resume_task_invokes_mcp_tool(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-50", status="paused") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.resume_task(manager, "task-50") assert result is True assert len(stub_client["calls"]) == 1 call = stub_client["calls"][0] assert call["name"] == "jules_resume_task" assert call["arguments"] == {"taskId": "task-50"} def test_resume_task_updates_status(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-51", status="paused") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": True}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) job_manager.resume_task(manager, "task-51") stored = storage.get_task(storage_manager, "task-51") assert stored["status"] == "in_progress" def test_resume_task_requires_paused_status(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-52", status="pending") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.resume_task(manager, "task-52") def test_resume_task_handles_failure_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-53", status="paused") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"success": False}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) result = job_manager.resume_task(manager, "task-53") assert result is False stored = storage.get_task(storage_manager, "task-53") assert stored["status"] == "paused" def test_resume_task_handles_error_payload(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-54", status="paused") def responder(name: str, arguments: Dict[str, object]): payload = json.dumps({"error": "resume_failed"}) return {"content": [{"type": "text", "text": payload}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(RuntimeError): job_manager.resume_task(manager, "task-54") def test_resume_task_validates_task_identifier(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-55", status="paused") manager = create_manager_with_storage(storage_manager) with pytest.raises(ValueError): job_manager.resume_task(manager, " ") def test_resume_task_requires_existing_task(tmp_path) -> None: storage_manager = create_storage_manager_with_tasks(tmp_path, []) manager = create_manager_with_storage(storage_manager) with pytest.raises(KeyError): job_manager.resume_task(manager, "missing") def test_resume_task_requires_valid_response(tmp_path) -> None: storage_manager = create_storage_with_existing_task(tmp_path, "task-56", status="paused") def responder(name: str, arguments: Dict[str, object]): return {"content": [{"type": "text", "text": "not-json"}]} stub_client = create_stub_mcp_client(responder) manager = create_manager_with_storage(storage_manager, stub_client) with pytest.raises(ValueError): job_manager.resume_task(manager, "task-56") def test_monitor_task_polls_until_completed(monkeypatch) -> None: statuses: list = [] statuses.append("pending") statuses.append("in_progress") statuses.append("completed") calls: list = [] def fake_get_task(manager, task_identifier): calls.append(task_identifier) if statuses: current_status = statuses.pop(0) else: current_status = "completed" timestamp = datetime.now().astimezone() task = models.create_jules_task( task_identifier, "Sample Title", "Sample Description", "owner/repo", "main", current_status, timestamp, timestamp, "https://example.com/task", ) return task sleeps: list = [] def fake_sleep(interval): sleeps.append(interval) messages: list = [] def create_console_stub(): def record_message(message): messages.append(message) console = SimpleNamespace(print=record_message) return console original_manager = job_manager.create_job_manager(create_dummy_mcp_client(), create_dummy_storage()) monkeypatch.setattr(job_manager, "get_task", fake_get_task) monkeypatch.setattr(job_manager, "_sleep", fake_sleep) monkeypatch.setattr(job_manager, "_create_console", create_console_stub) job_manager.monitor_task(original_manager, "task-200", interval=2) assert len(calls) == 3 assert len(sleeps) == 2 assert sleeps[0] == 2 assert sleeps[1] == 2 assert messages last_index = len(messages) - 1 assert "completed" in str(messages[last_index]) def test_monitor_task_handles_keyboard_interrupt(monkeypatch) -> None: call_counter: dict = {"count": 0} def fake_get_task(manager, task_identifier): call_counter["count"] = call_counter["count"] + 1 timestamp = datetime.now().astimezone() task = models.create_jules_task( task_identifier, "Sample Title", "Sample Description", "owner/repo", "main", "in_progress", timestamp, timestamp, "https://example.com/task", ) return task def fake_sleep(interval): raise KeyboardInterrupt() messages: list = [] def create_console_stub(): def record_message(message): messages.append(message) console = SimpleNamespace(print=record_message) return console original_manager = job_manager.create_job_manager(create_dummy_mcp_client(), create_dummy_storage()) monkeypatch.setattr(job_manager, "get_task", fake_get_task) monkeypatch.setattr(job_manager, "_sleep", fake_sleep) monkeypatch.setattr(job_manager, "_create_console", create_console_stub) job_manager.monitor_task(original_manager, "task-201", interval=3) assert call_counter["count"] == 1 assert messages last_index = len(messages) - 1 assert "Monitoring stopped" in str(messages[last_index]) def test_monitor_task_raises_for_missing_task(monkeypatch) -> None: def fake_get_task(manager, task_identifier): raise KeyError("missing") def fake_sleep(interval): return None def create_console_stub(): def record_message(message): return None console = SimpleNamespace(print=record_message) return console original_manager = job_manager.create_job_manager(create_dummy_mcp_client(), create_dummy_storage()) monkeypatch.setattr(job_manager, "get_task", fake_get_task) monkeypatch.setattr(job_manager, "_sleep", fake_sleep) monkeypatch.setattr(job_manager, "_create_console", create_console_stub) with pytest.raises(KeyError): job_manager.monitor_task(original_manager, "task-202", interval=4)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mberjans/google-jules-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server