Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

clear_context_batch

Remove all subjects within a specified context using application-level batch operations, ensuring MCP compliance. Optionally delete the context post-clearance or simulate with a dry run.

Instructions

Clear all subjects in a context using application-level batch operations.

⚠️ APPLICATION-LEVEL BATCHING: Uses individual requests per MCP 2025-06-18 compliance.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextYes
delete_context_afterNo
dry_runNo
registryNo

Implementation Reference

  • Primary MCP tool handler for 'clear_context_batch'. Decorated with @structured_output specifying the tool name. Validates inputs, resolves registry, creates async task using task_manager, schedules execution of _execute_clear_context_batch, returns task_id and metadata for monitoring.
    @structured_output("clear_context_batch", fallback_on_error=True)
    def clear_context_batch_tool(
        context: str,
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        delete_context_after: bool = True,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Clear all subjects in a context using application-level batch operations.
    
        ⚠️  APPLICATION-LEVEL BATCHING: This performs application-level batching by
            making individual JSON-RPC requests for each operation. JSON-RPC batching
            has been disabled per MCP 2025-06-18 specification compliance.
    
        **MEDIUM-DURATION OPERATION** - Uses task queue pattern.
        This operation runs asynchronously and returns a task_id immediately.
        Use get_task_status(task_id) to monitor progress and get results.
    
        Performance Notes:
        - Uses parallel processing with ThreadPoolExecutor for efficiency
        - Individual requests maintain protocol compliance
        - Client-side request coordination replaces JSON-RPC batching
    
        Args:
            context: The context to clear
            registry: The registry to operate on (uses default if not specified)
            delete_context_after: Whether to delete the context after clearing subjects
            dry_run: If True, only simulate the operation without making changes
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Resolve registry name BEFORE creating task (critical for task lookup)
            if registry is None:
                # Get first available registry for single-registry compatibility
                available_registries = registry_manager.list_registries()
                if available_registries:
                    registry = available_registries[0]  # list_registries() returns list of strings
                else:
                    return create_error_response(
                        "No registries available",
                        error_code="NO_REGISTRIES_AVAILABLE",
                        registry_mode=registry_mode,
                    )
    
            # Validate registry exists
            registry_client = registry_manager.get_registry(registry)
            if not registry_client:
                return create_error_response(
                    f"Registry '{registry}' not found",
                    error_code="REGISTRY_NOT_FOUND",
                    registry_mode=registry_mode,
                )
    
            # Create async task with resolved registry name
            task = task_manager.create_task(
                TaskType.CLEANUP,
                metadata={
                    "operation": "clear_context_batch",
                    "context": context,
                    "registry": registry,  # Now always a resolved string, never None
                    "delete_context_after": delete_context_after,
                    "dry_run": dry_run,
                    "batching_type": "application_level",  # For clarity
                    "jsonrpc_batching": False,  # Explicitly disabled
                },
            )
    
            # Start async execution
            try:
                # Check if there's a running event loop
                asyncio.get_running_loop()
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _execute_clear_context_batch,
                        context=context,
                        registry=registry,
                        registry_manager=registry_manager,
                        delete_context_after=delete_context_after,
                        dry_run=dry_run,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool to run the task
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _execute_clear_context_batch,
                            context=context,
                            registry=registry,
                            registry_manager=registry_manager,
                            delete_context_after=delete_context_after,
                            dry_run=dry_run,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            # Return structured response
            result = {
                "message": "Context cleanup started as async task (application-level batching)",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "clear_context_batch",
                    "expected_duration": "medium",
                    "async_pattern": "task_queue",
                    "batching_type": "application_level",
                    "jsonrpc_batching_disabled": True,
                    "guidance": "Long-running operation using individual requests. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                    "performance_note": "Uses parallel individual requests instead of JSON-RPC batching for MCP 2025-06-18 compliance",
                },
                "operation": "clear_context_batch",
                "dry_run": dry_run,
                "total_items": 1,  # One context
                "successful": 0,  # Will be updated by task
                "failed": 0,  # Will be updated by task
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            return result
    
        except Exception as e:
            return create_error_response(str(e), error_code="BATCH_OPERATION_FAILED", registry_mode=registry_mode)
  • Core execution logic for clear_context_batch tool. Fetches subjects from context, performs parallel deletions using ThreadPoolExecutor, handles dry_run and viewonly modes, computes performance metrics, updates task progress, returns comprehensive results.
    def _execute_clear_context_batch(
        context: str,
        registry: str,
        registry_manager,
        delete_context_after: bool = True,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Execute the actual context cleanup logic using individual requests.
    
        Performance Implementation:
        - Uses ThreadPoolExecutor for parallel individual requests
        - Replaces previous JSON-RPC batching with application-level coordination
        - Maintains efficiency while ensuring MCP 2025-06-18 compliance
        """
        start_time = time.time()
        subjects_found = 0
        subjects_deleted = 0
        context_deleted = False
        errors = []
    
        try:
            # Get the current task ID from task manager for progress updates
            current_task = None
            for task in task_manager.list_tasks(status=TaskStatus.RUNNING):
                if (
                    task.metadata
                    and task.metadata.get("operation") == "clear_context_batch"
                    and task.metadata.get("context") == context
                    and task.metadata.get("registry") == registry
                ):
                    current_task = task
                    break
    
            def update_progress(progress: float, message: str = ""):
                if current_task:
                    task_manager.update_progress(current_task.id, progress)
                if message:
                    logger.info(f"Clear Context Progress {progress:.1f}%: {message}")
    
            # Get registry client (registry is already resolved, never None here)
            registry_client = registry_manager.get_registry(registry)
    
            update_progress(
                5.0,
                f"Starting cleanup of context '{context}' in registry '{registry}' (individual requests)",
            )
    
            if not registry_client:
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 0.0,
                    "performance": 0.0,
                    "message": f"Registry '{registry}' not found",
                    "error": f"Registry '{registry}' not found",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(10.0, "Registry client connected")
    
            # Check viewonly mode
            viewonly_check = registry_manager.is_viewonly(registry)
            if viewonly_check:
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 0.0,
                    "performance": 0.0,
                    "message": f"Registry '{registry}' is in VIEWONLY mode",
                    "error": f"Registry '{registry}' is in VIEWONLY mode",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(20.0, "Fetching subjects from context")
    
            # Get all subjects in the context
            subjects = registry_client.get_subjects(context)
            if isinstance(subjects, dict) and "error" in subjects:
                subjects = []
            subjects_found = len(subjects)
    
            if subjects_found == 0:
                update_progress(100.0, "Context is already empty")
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 100.0,
                    "performance": 0.0,
                    "message": f"Context '{context}' is already empty",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(
                30.0,
                f"Found {subjects_found} subjects to {'delete' if not dry_run else 'analyze'} (using individual requests)",
            )
    
            if dry_run:
                update_progress(
                    100.0,
                    f"DRY RUN: Would delete {subjects_found} subjects using individual requests",
                )
                return {
                    "subjects_found": subjects_found,
                    "subjects_deleted": 0,
                    "context_deleted": delete_context_after,
                    "dry_run": True,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 100.0,
                    "performance": 0.0,
                    "message": f"DRY RUN: Would delete {subjects_found} subjects from context '{context}' using individual requests",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(
                40.0,
                f"Starting deletion of {subjects_found} subjects using parallel individual requests",
            )
    
            # Delete subjects in parallel using individual requests (replaces JSON-RPC batching)
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                for subject in subjects:
                    future = executor.submit(_delete_subject_from_context, registry_client, subject, context)
                    futures.append(future)
    
                total_futures = len(futures)
                for i, future in enumerate(as_completed(futures)):
                    try:
                        if future.result():
                            subjects_deleted += 1
                    except Exception as e:
                        errors.append(str(e))
    
                    # Update progress for deletions (40% to 85%)
                    deletion_progress = 40.0 + ((i + 1) / total_futures) * 45.0
                    update_progress(
                        deletion_progress,
                        f"Deleted {subjects_deleted} of {subjects_found} subjects (individual requests)",
                    )
    
            update_progress(90.0, "Computing cleanup results")
    
            # Calculate metrics
            duration = time.time() - start_time
            success_rate = (subjects_deleted / subjects_found * 100) if subjects_found > 0 else 100.0
            performance = subjects_deleted / duration if duration > 0 else 0.0
    
            # Delete context if requested (not supported by Schema Registry API)
            if delete_context_after and subjects_deleted == subjects_found:
                context_deleted = False  # Context deletion not supported by API
                update_progress(95.0, "Context deletion not supported by API")
    
            update_progress(
                100.0,
                f"Cleanup completed - deleted {subjects_deleted} subjects using individual requests",
            )
    
            return {
                "subjects_found": subjects_found,
                "subjects_deleted": subjects_deleted,
                "context_deleted": context_deleted,
                "dry_run": False,
                "duration_seconds": duration,
                "success_rate": success_rate,
                "performance": performance,
                "message": f"Successfully cleared context '{context}' - deleted {subjects_deleted} subjects using individual requests",
                "errors": errors if errors else None,
                "registry": registry,
                "batching_method": "application_level",
                "compliance_note": "Uses individual requests per MCP 2025-06-18 specification (JSON-RPC batching disabled)",
            }
    
        except Exception as e:
            return {
                "subjects_found": subjects_found,
                "subjects_deleted": subjects_deleted,
                "context_deleted": False,
                "dry_run": dry_run,
                "duration_seconds": time.time() - start_time,
                "success_rate": 0.0,
                "performance": 0.0,
                "message": f"Batch cleanup failed: {str(e)}",
                "error": str(e),
                "registry": registry,
                "batching_method": "application_level",
            }
  • BATCH_OPERATION_SCHEMA: JSON Schema defining the structured output format for batch operations including clear_context_batch.
    # Batch operation response
    BATCH_OPERATION_SCHEMA = {
        "type": "object",
        "properties": {
            "operation": {"type": "string", "description": "Type of batch operation"},
            "dry_run": {"type": "boolean", "description": "Whether this was a dry run"},
            "total_items": {
                "type": "integer",
                "minimum": 0,
                "description": "Total items processed",
            },
            "successful": {
                "type": "integer",
                "minimum": 0,
                "description": "Number of successful operations",
            },
            "failed": {
                "type": "integer",
                "minimum": 0,
                "description": "Number of failed operations",
            },
            "errors": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Error messages from failed operations",
            },
            "details": {"type": "object", "description": "Detailed operation results"},
            **METADATA_FIELDS,
        },
        "required": ["operation", "dry_run", "total_items", "successful", "failed"],
        "additionalProperties": True,
    }
  • TOOL_OUTPUT_SCHEMAS mapping: Associates 'clear_context_batch' tool name with BATCH_OPERATION_SCHEMA for output validation.
    "clear_context_batch": BATCH_OPERATION_SCHEMA,
    "clear_multiple_contexts_batch": BATCH_OPERATION_SCHEMA,
    # Task Management
  • OPERATION_METADATA registration: Defines expected duration (MEDIUM) and async pattern (TASK_QUEUE) for the clear_context_batch tool, used for client guidance.
    "clear_context_batch": {
        "duration": OperationDuration.MEDIUM,
        "pattern": AsyncPattern.TASK_QUEUE,
    },
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server