clear_context_batch
Remove all subjects within a specified context using application-level batch operations, ensuring MCP compliance. Optionally delete the context post-clearance or simulate with a dry run.
Instructions
Clear all subjects in a context using application-level batch operations.
⚠️ APPLICATION-LEVEL BATCHING: Uses individual requests per MCP 2025-06-18 compliance.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| context | Yes | ||
| delete_context_after | No | ||
| dry_run | No | ||
| registry | No |
Implementation Reference
- batch_operations.py:36-167 (handler)Primary MCP tool handler for 'clear_context_batch'. Decorated with @structured_output specifying the tool name. Validates inputs, resolves registry, creates async task using task_manager, schedules execution of _execute_clear_context_batch, returns task_id and metadata for monitoring.@structured_output("clear_context_batch", fallback_on_error=True) def clear_context_batch_tool( context: str, registry_manager, registry_mode: str, registry: Optional[str] = None, delete_context_after: bool = True, dry_run: bool = True, ) -> Dict[str, Any]: """Clear all subjects in a context using application-level batch operations. ⚠️ APPLICATION-LEVEL BATCHING: This performs application-level batching by making individual JSON-RPC requests for each operation. JSON-RPC batching has been disabled per MCP 2025-06-18 specification compliance. **MEDIUM-DURATION OPERATION** - Uses task queue pattern. This operation runs asynchronously and returns a task_id immediately. Use get_task_status(task_id) to monitor progress and get results. Performance Notes: - Uses parallel processing with ThreadPoolExecutor for efficiency - Individual requests maintain protocol compliance - Client-side request coordination replaces JSON-RPC batching Args: context: The context to clear registry: The registry to operate on (uses default if not specified) delete_context_after: Whether to delete the context after clearing subjects dry_run: If True, only simulate the operation without making changes Returns: Task information with task_id for monitoring progress with structured validation """ try: # Resolve registry name BEFORE creating task (critical for task lookup) if registry is None: # Get first available registry for single-registry compatibility available_registries = registry_manager.list_registries() if available_registries: registry = available_registries[0] # list_registries() returns list of strings else: return create_error_response( "No registries available", error_code="NO_REGISTRIES_AVAILABLE", registry_mode=registry_mode, ) # Validate registry exists registry_client = registry_manager.get_registry(registry) if not registry_client: return create_error_response( f"Registry '{registry}' not found", error_code="REGISTRY_NOT_FOUND", registry_mode=registry_mode, ) # Create async task with resolved registry name task = task_manager.create_task( TaskType.CLEANUP, metadata={ "operation": "clear_context_batch", "context": context, "registry": registry, # Now always a resolved string, never None "delete_context_after": delete_context_after, "dry_run": dry_run, "batching_type": "application_level", # For clarity "jsonrpc_batching": False, # Explicitly disabled }, ) # Start async execution try: # Check if there's a running event loop asyncio.get_running_loop() asyncio.create_task( task_manager.execute_task( task, _execute_clear_context_batch, context=context, registry=registry, registry_manager=registry_manager, delete_context_after=delete_context_after, dry_run=dry_run, ) ) except RuntimeError: # No running event loop, use thread pool to run the task def run_task(): asyncio.run( task_manager.execute_task( task, _execute_clear_context_batch, context=context, registry=registry, registry_manager=registry_manager, delete_context_after=delete_context_after, dry_run=dry_run, ) ) thread = threading.Thread(target=run_task) thread.start() # Return structured response result = { "message": "Context cleanup started as async task (application-level batching)", "task_id": task.id, "task": task.to_dict(), "operation_info": { "operation": "clear_context_batch", "expected_duration": "medium", "async_pattern": "task_queue", "batching_type": "application_level", "jsonrpc_batching_disabled": True, "guidance": "Long-running operation using individual requests. Returns task_id immediately. Use get_task_status() to monitor progress.", "registry_mode": registry_mode, "performance_note": "Uses parallel individual requests instead of JSON-RPC batching for MCP 2025-06-18 compliance", }, "operation": "clear_context_batch", "dry_run": dry_run, "total_items": 1, # One context "successful": 0, # Will be updated by task "failed": 0, # Will be updated by task "registry_mode": registry_mode, "mcp_protocol_version": "2025-06-18", } return result except Exception as e: return create_error_response(str(e), error_code="BATCH_OPERATION_FAILED", registry_mode=registry_mode)
- batch_operations.py:169-369 (handler)Core execution logic for clear_context_batch tool. Fetches subjects from context, performs parallel deletions using ThreadPoolExecutor, handles dry_run and viewonly modes, computes performance metrics, updates task progress, returns comprehensive results.def _execute_clear_context_batch( context: str, registry: str, registry_manager, delete_context_after: bool = True, dry_run: bool = True, ) -> Dict[str, Any]: """Execute the actual context cleanup logic using individual requests. Performance Implementation: - Uses ThreadPoolExecutor for parallel individual requests - Replaces previous JSON-RPC batching with application-level coordination - Maintains efficiency while ensuring MCP 2025-06-18 compliance """ start_time = time.time() subjects_found = 0 subjects_deleted = 0 context_deleted = False errors = [] try: # Get the current task ID from task manager for progress updates current_task = None for task in task_manager.list_tasks(status=TaskStatus.RUNNING): if ( task.metadata and task.metadata.get("operation") == "clear_context_batch" and task.metadata.get("context") == context and task.metadata.get("registry") == registry ): current_task = task break def update_progress(progress: float, message: str = ""): if current_task: task_manager.update_progress(current_task.id, progress) if message: logger.info(f"Clear Context Progress {progress:.1f}%: {message}") # Get registry client (registry is already resolved, never None here) registry_client = registry_manager.get_registry(registry) update_progress( 5.0, f"Starting cleanup of context '{context}' in registry '{registry}' (individual requests)", ) if not registry_client: return { "subjects_found": 0, "subjects_deleted": 0, "context_deleted": False, "dry_run": dry_run, "duration_seconds": time.time() - start_time, "success_rate": 0.0, "performance": 0.0, "message": f"Registry '{registry}' not found", "error": f"Registry '{registry}' not found", "registry": registry, "batching_method": "application_level", } update_progress(10.0, "Registry client connected") # Check viewonly mode viewonly_check = registry_manager.is_viewonly(registry) if viewonly_check: return { "subjects_found": 0, "subjects_deleted": 0, "context_deleted": False, "dry_run": dry_run, "duration_seconds": time.time() - start_time, "success_rate": 0.0, "performance": 0.0, "message": f"Registry '{registry}' is in VIEWONLY mode", "error": f"Registry '{registry}' is in VIEWONLY mode", "registry": registry, "batching_method": "application_level", } update_progress(20.0, "Fetching subjects from context") # Get all subjects in the context subjects = registry_client.get_subjects(context) if isinstance(subjects, dict) and "error" in subjects: subjects = [] subjects_found = len(subjects) if subjects_found == 0: update_progress(100.0, "Context is already empty") return { "subjects_found": 0, "subjects_deleted": 0, "context_deleted": False, "dry_run": dry_run, "duration_seconds": time.time() - start_time, "success_rate": 100.0, "performance": 0.0, "message": f"Context '{context}' is already empty", "registry": registry, "batching_method": "application_level", } update_progress( 30.0, f"Found {subjects_found} subjects to {'delete' if not dry_run else 'analyze'} (using individual requests)", ) if dry_run: update_progress( 100.0, f"DRY RUN: Would delete {subjects_found} subjects using individual requests", ) return { "subjects_found": subjects_found, "subjects_deleted": 0, "context_deleted": delete_context_after, "dry_run": True, "duration_seconds": time.time() - start_time, "success_rate": 100.0, "performance": 0.0, "message": f"DRY RUN: Would delete {subjects_found} subjects from context '{context}' using individual requests", "registry": registry, "batching_method": "application_level", } update_progress( 40.0, f"Starting deletion of {subjects_found} subjects using parallel individual requests", ) # Delete subjects in parallel using individual requests (replaces JSON-RPC batching) with ThreadPoolExecutor(max_workers=10) as executor: futures = [] for subject in subjects: future = executor.submit(_delete_subject_from_context, registry_client, subject, context) futures.append(future) total_futures = len(futures) for i, future in enumerate(as_completed(futures)): try: if future.result(): subjects_deleted += 1 except Exception as e: errors.append(str(e)) # Update progress for deletions (40% to 85%) deletion_progress = 40.0 + ((i + 1) / total_futures) * 45.0 update_progress( deletion_progress, f"Deleted {subjects_deleted} of {subjects_found} subjects (individual requests)", ) update_progress(90.0, "Computing cleanup results") # Calculate metrics duration = time.time() - start_time success_rate = (subjects_deleted / subjects_found * 100) if subjects_found > 0 else 100.0 performance = subjects_deleted / duration if duration > 0 else 0.0 # Delete context if requested (not supported by Schema Registry API) if delete_context_after and subjects_deleted == subjects_found: context_deleted = False # Context deletion not supported by API update_progress(95.0, "Context deletion not supported by API") update_progress( 100.0, f"Cleanup completed - deleted {subjects_deleted} subjects using individual requests", ) return { "subjects_found": subjects_found, "subjects_deleted": subjects_deleted, "context_deleted": context_deleted, "dry_run": False, "duration_seconds": duration, "success_rate": success_rate, "performance": performance, "message": f"Successfully cleared context '{context}' - deleted {subjects_deleted} subjects using individual requests", "errors": errors if errors else None, "registry": registry, "batching_method": "application_level", "compliance_note": "Uses individual requests per MCP 2025-06-18 specification (JSON-RPC batching disabled)", } except Exception as e: return { "subjects_found": subjects_found, "subjects_deleted": subjects_deleted, "context_deleted": False, "dry_run": dry_run, "duration_seconds": time.time() - start_time, "success_rate": 0.0, "performance": 0.0, "message": f"Batch cleanup failed: {str(e)}", "error": str(e), "registry": registry, "batching_method": "application_level", }
- schema_definitions.py:723-754 (schema)BATCH_OPERATION_SCHEMA: JSON Schema defining the structured output format for batch operations including clear_context_batch.# Batch operation response BATCH_OPERATION_SCHEMA = { "type": "object", "properties": { "operation": {"type": "string", "description": "Type of batch operation"}, "dry_run": {"type": "boolean", "description": "Whether this was a dry run"}, "total_items": { "type": "integer", "minimum": 0, "description": "Total items processed", }, "successful": { "type": "integer", "minimum": 0, "description": "Number of successful operations", }, "failed": { "type": "integer", "minimum": 0, "description": "Number of failed operations", }, "errors": { "type": "array", "items": {"type": "string"}, "description": "Error messages from failed operations", }, "details": {"type": "object", "description": "Detailed operation results"}, **METADATA_FIELDS, }, "required": ["operation", "dry_run", "total_items", "successful", "failed"], "additionalProperties": True, }
- schema_definitions.py:1019-1021 (schema)TOOL_OUTPUT_SCHEMAS mapping: Associates 'clear_context_batch' tool name with BATCH_OPERATION_SCHEMA for output validation."clear_context_batch": BATCH_OPERATION_SCHEMA, "clear_multiple_contexts_batch": BATCH_OPERATION_SCHEMA, # Task Management
- task_management.py:154-157 (registration)OPERATION_METADATA registration: Defines expected duration (MEDIUM) and async pattern (TASK_QUEUE) for the clear_context_batch tool, used for client guidance."clear_context_batch": { "duration": OperationDuration.MEDIUM, "pattern": AsyncPattern.TASK_QUEUE, },