rlm_sub_query_batch
Batch process sub-queries across multiple chunks of a large context using parallel requests. Configure LLM provider, model, and concurrency limit to manage system resources.
Instructions
Process multiple chunks in parallel. Respects concurrency limit to manage system resources.
Args: query: Question/instruction for each sub-call context_name: Context identifier chunk_indices: List of chunk indices to process provider: LLM provider - 'auto', 'ollama', or 'claude-sdk' model: Model to use (provider-specific defaults apply) concurrency: Max parallel requests (default 4, max 8)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| context_name | Yes | ||
| chunk_indices | Yes | ||
| provider | No | auto | |
| model | No | ||
| concurrency | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/rlm_mcp_server.py:1739-1758 (handler)The FastMCP tool registration and handler for rlm_sub_query_batch. It delegates to the private implementation _sub_query_batch_impl.
@mcp.tool() async def rlm_sub_query_batch( query: str, context_name: str, chunk_indices: list[int], provider: str = "auto", model: Optional[str] = None, concurrency: int = 4, ) -> dict: """Process multiple chunks in parallel. Respects concurrency limit to manage system resources. Args: query: Question/instruction for each sub-call context_name: Context identifier chunk_indices: List of chunk indices to process provider: LLM provider - 'auto', 'ollama', or 'claude-sdk' model: Model to use (provider-specific defaults apply) concurrency: Max parallel requests (default 4, max 8) """ return await _sub_query_batch_impl(query, context_name, chunk_indices, provider, model, concurrency) - src/rlm_mcp_server.py:1671-1736 (helper)Core implementation logic for rlm_sub_query_batch. Validates inputs, then processes multiple chunks in parallel with a concurrency-limited semaphore, calling the LLM provider for each chunk.
async def _sub_query_batch_impl( query: str, context_name: str, chunk_indices: list[int], provider: str = "auto", model: Optional[str] = None, concurrency: int = 4, ) -> dict: """Implementation of batch sub-query processing.""" concurrency = min(concurrency, 8) # Resolve auto provider and model once for the entire batch resolved_provider, resolved_model = await _resolve_provider_and_model(provider, model) error = _ensure_context_loaded(context_name) if error: return {"error": "context_not_found", "message": error} chunks = contexts[context_name].get("chunks") if not chunks: return {"error": "context_not_chunked", "message": f"Context '{context_name}' has not been chunked yet"} invalid_indices = [idx for idx in chunk_indices if idx >= len(chunks)] if invalid_indices: return { "error": "invalid_chunk_indices", "message": f"Invalid chunk indices: {invalid_indices} (max: {len(chunks) - 1})", } semaphore = asyncio.Semaphore(concurrency) async def process_chunk(chunk_idx: int) -> dict: async with semaphore: chunk_content = chunks[chunk_idx] result, call_error = await _make_provider_call(resolved_provider, resolved_model, query, chunk_content) if call_error: return { "chunk_index": chunk_idx, "error": "provider_error", "message": call_error, } return { "chunk_index": chunk_idx, "response": result, "provider": resolved_provider, "model": resolved_model, } results = await asyncio.gather(*[process_chunk(idx) for idx in chunk_indices]) successful = sum(1 for r in results if "response" in r) failed = len(results) - successful return { "status": "completed", "total_chunks": len(chunk_indices), "successful": successful, "failed": failed, "concurrency": concurrency, "provider": resolved_provider, "model": resolved_model, "requested_provider": provider if provider == "auto" else None, "results": results, } - src/rlm_mcp_server.py:1739-1758 (registration)Registered as a FastMCP tool via @mcp.tool() decorator on the async function rlm_sub_query_batch.
@mcp.tool() async def rlm_sub_query_batch( query: str, context_name: str, chunk_indices: list[int], provider: str = "auto", model: Optional[str] = None, concurrency: int = 4, ) -> dict: """Process multiple chunks in parallel. Respects concurrency limit to manage system resources. Args: query: Question/instruction for each sub-call context_name: Context identifier chunk_indices: List of chunk indices to process provider: LLM provider - 'auto', 'ollama', or 'claude-sdk' model: Model to use (provider-specific defaults apply) concurrency: Max parallel requests (default 4, max 8) """ return await _sub_query_batch_impl(query, context_name, chunk_indices, provider, model, concurrency) - src/rlm_mcp_server.py:1740-1747 (schema)Input schema/parameters: query (str), context_name (str), chunk_indices (list[int]), provider (str, default 'auto'), model (Optional[str]), concurrency (int, default 4). Returns dict with status, results, etc.
async def rlm_sub_query_batch( query: str, context_name: str, chunk_indices: list[int], provider: str = "auto", model: Optional[str] = None, concurrency: int = 4, ) -> dict: