get_queue_lengths
Identify queue backlogs and balance task distribution by fetching real-time lengths of all Celery queues.
Instructions
Get the current length of all Celery queues.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- source/tools/queues.py:11-15 (handler)The actual handler function for the 'get_queue_lengths' tool. It calls client.get('/api/queues/length'), logs the result, and returns the data as a JSON string.
async def get_queue_lengths() -> str: """Get the current length of all Celery queues.""" data = await client.get("/api/queues/length") logger.info("Fetched queue lengths") return json.dumps(data) - source/tools/queues.py:9-10 (registration)The 'register' function that decorates get_queue_lengths with @mcp.tool() to register it as an MCP tool.
def register(mcp: FastMCP, client: FlowerClient) -> None: @mcp.tool() - source/main.py:25-25 (registration)Queues registration call in main lifespan: queues.register(mcp, client) wires up get_queue_lengths as an MCP tool.
queues.register(mcp, client) - source/client.py:26-35 (helper)The FlowerClient.get() method used by the handler to make the HTTP GET request to the Flower API.
async def get(self, path: str, params: dict[str, Any] | None = None) -> Any: logger.debug("GET {} params={}", path, params) resp = await self._client.get(path, params=params) resp.raise_for_status() if "application/json" in resp.headers.get("content-type", ""): return resp.json() try: return resp.json() except Exception: return resp.text