Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

clear_context_batch

Remove all subjects within a specified context using application-level batch operations, ensuring MCP compliance. Optionally delete the context post-clearance or simulate with a dry run.

Instructions

Clear all subjects in a context using application-level batch operations.

⚠️ APPLICATION-LEVEL BATCHING: Uses individual requests per MCP 2025-06-18 compliance.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextYes
delete_context_afterNo
dry_runNo
registryNo

Implementation Reference

  • Primary MCP tool handler for 'clear_context_batch'. Decorated with @structured_output specifying the tool name. Validates inputs, resolves registry, creates async task using task_manager, schedules execution of _execute_clear_context_batch, returns task_id and metadata for monitoring.
    @structured_output("clear_context_batch", fallback_on_error=True)
    def clear_context_batch_tool(
        context: str,
        registry_manager,
        registry_mode: str,
        registry: Optional[str] = None,
        delete_context_after: bool = True,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Clear all subjects in a context using application-level batch operations.
    
        ⚠️  APPLICATION-LEVEL BATCHING: This performs application-level batching by
            making individual JSON-RPC requests for each operation. JSON-RPC batching
            has been disabled per MCP 2025-06-18 specification compliance.
    
        **MEDIUM-DURATION OPERATION** - Uses task queue pattern.
        This operation runs asynchronously and returns a task_id immediately.
        Use get_task_status(task_id) to monitor progress and get results.
    
        Performance Notes:
        - Uses parallel processing with ThreadPoolExecutor for efficiency
        - Individual requests maintain protocol compliance
        - Client-side request coordination replaces JSON-RPC batching
    
        Args:
            context: The context to clear
            registry: The registry to operate on (uses default if not specified)
            delete_context_after: Whether to delete the context after clearing subjects
            dry_run: If True, only simulate the operation without making changes
    
        Returns:
            Task information with task_id for monitoring progress with structured validation
        """
        try:
            # Resolve registry name BEFORE creating task (critical for task lookup)
            if registry is None:
                # Get first available registry for single-registry compatibility
                available_registries = registry_manager.list_registries()
                if available_registries:
                    registry = available_registries[0]  # list_registries() returns list of strings
                else:
                    return create_error_response(
                        "No registries available",
                        error_code="NO_REGISTRIES_AVAILABLE",
                        registry_mode=registry_mode,
                    )
    
            # Validate registry exists
            registry_client = registry_manager.get_registry(registry)
            if not registry_client:
                return create_error_response(
                    f"Registry '{registry}' not found",
                    error_code="REGISTRY_NOT_FOUND",
                    registry_mode=registry_mode,
                )
    
            # Create async task with resolved registry name
            task = task_manager.create_task(
                TaskType.CLEANUP,
                metadata={
                    "operation": "clear_context_batch",
                    "context": context,
                    "registry": registry,  # Now always a resolved string, never None
                    "delete_context_after": delete_context_after,
                    "dry_run": dry_run,
                    "batching_type": "application_level",  # For clarity
                    "jsonrpc_batching": False,  # Explicitly disabled
                },
            )
    
            # Start async execution
            try:
                # Check if there's a running event loop
                asyncio.get_running_loop()
                asyncio.create_task(
                    task_manager.execute_task(
                        task,
                        _execute_clear_context_batch,
                        context=context,
                        registry=registry,
                        registry_manager=registry_manager,
                        delete_context_after=delete_context_after,
                        dry_run=dry_run,
                    )
                )
            except RuntimeError:
                # No running event loop, use thread pool to run the task
                def run_task():
                    asyncio.run(
                        task_manager.execute_task(
                            task,
                            _execute_clear_context_batch,
                            context=context,
                            registry=registry,
                            registry_manager=registry_manager,
                            delete_context_after=delete_context_after,
                            dry_run=dry_run,
                        )
                    )
    
                thread = threading.Thread(target=run_task)
                thread.start()
    
            # Return structured response
            result = {
                "message": "Context cleanup started as async task (application-level batching)",
                "task_id": task.id,
                "task": task.to_dict(),
                "operation_info": {
                    "operation": "clear_context_batch",
                    "expected_duration": "medium",
                    "async_pattern": "task_queue",
                    "batching_type": "application_level",
                    "jsonrpc_batching_disabled": True,
                    "guidance": "Long-running operation using individual requests. Returns task_id immediately. Use get_task_status() to monitor progress.",
                    "registry_mode": registry_mode,
                    "performance_note": "Uses parallel individual requests instead of JSON-RPC batching for MCP 2025-06-18 compliance",
                },
                "operation": "clear_context_batch",
                "dry_run": dry_run,
                "total_items": 1,  # One context
                "successful": 0,  # Will be updated by task
                "failed": 0,  # Will be updated by task
                "registry_mode": registry_mode,
                "mcp_protocol_version": "2025-06-18",
            }
    
            return result
    
        except Exception as e:
            return create_error_response(str(e), error_code="BATCH_OPERATION_FAILED", registry_mode=registry_mode)
  • Core execution logic for clear_context_batch tool. Fetches subjects from context, performs parallel deletions using ThreadPoolExecutor, handles dry_run and viewonly modes, computes performance metrics, updates task progress, returns comprehensive results.
    def _execute_clear_context_batch(
        context: str,
        registry: str,
        registry_manager,
        delete_context_after: bool = True,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Execute the actual context cleanup logic using individual requests.
    
        Performance Implementation:
        - Uses ThreadPoolExecutor for parallel individual requests
        - Replaces previous JSON-RPC batching with application-level coordination
        - Maintains efficiency while ensuring MCP 2025-06-18 compliance
        """
        start_time = time.time()
        subjects_found = 0
        subjects_deleted = 0
        context_deleted = False
        errors = []
    
        try:
            # Get the current task ID from task manager for progress updates
            current_task = None
            for task in task_manager.list_tasks(status=TaskStatus.RUNNING):
                if (
                    task.metadata
                    and task.metadata.get("operation") == "clear_context_batch"
                    and task.metadata.get("context") == context
                    and task.metadata.get("registry") == registry
                ):
                    current_task = task
                    break
    
            def update_progress(progress: float, message: str = ""):
                if current_task:
                    task_manager.update_progress(current_task.id, progress)
                if message:
                    logger.info(f"Clear Context Progress {progress:.1f}%: {message}")
    
            # Get registry client (registry is already resolved, never None here)
            registry_client = registry_manager.get_registry(registry)
    
            update_progress(
                5.0,
                f"Starting cleanup of context '{context}' in registry '{registry}' (individual requests)",
            )
    
            if not registry_client:
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 0.0,
                    "performance": 0.0,
                    "message": f"Registry '{registry}' not found",
                    "error": f"Registry '{registry}' not found",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(10.0, "Registry client connected")
    
            # Check viewonly mode
            viewonly_check = registry_manager.is_viewonly(registry)
            if viewonly_check:
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 0.0,
                    "performance": 0.0,
                    "message": f"Registry '{registry}' is in VIEWONLY mode",
                    "error": f"Registry '{registry}' is in VIEWONLY mode",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(20.0, "Fetching subjects from context")
    
            # Get all subjects in the context
            subjects = registry_client.get_subjects(context)
            if isinstance(subjects, dict) and "error" in subjects:
                subjects = []
            subjects_found = len(subjects)
    
            if subjects_found == 0:
                update_progress(100.0, "Context is already empty")
                return {
                    "subjects_found": 0,
                    "subjects_deleted": 0,
                    "context_deleted": False,
                    "dry_run": dry_run,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 100.0,
                    "performance": 0.0,
                    "message": f"Context '{context}' is already empty",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(
                30.0,
                f"Found {subjects_found} subjects to {'delete' if not dry_run else 'analyze'} (using individual requests)",
            )
    
            if dry_run:
                update_progress(
                    100.0,
                    f"DRY RUN: Would delete {subjects_found} subjects using individual requests",
                )
                return {
                    "subjects_found": subjects_found,
                    "subjects_deleted": 0,
                    "context_deleted": delete_context_after,
                    "dry_run": True,
                    "duration_seconds": time.time() - start_time,
                    "success_rate": 100.0,
                    "performance": 0.0,
                    "message": f"DRY RUN: Would delete {subjects_found} subjects from context '{context}' using individual requests",
                    "registry": registry,
                    "batching_method": "application_level",
                }
    
            update_progress(
                40.0,
                f"Starting deletion of {subjects_found} subjects using parallel individual requests",
            )
    
            # Delete subjects in parallel using individual requests (replaces JSON-RPC batching)
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                for subject in subjects:
                    future = executor.submit(_delete_subject_from_context, registry_client, subject, context)
                    futures.append(future)
    
                total_futures = len(futures)
                for i, future in enumerate(as_completed(futures)):
                    try:
                        if future.result():
                            subjects_deleted += 1
                    except Exception as e:
                        errors.append(str(e))
    
                    # Update progress for deletions (40% to 85%)
                    deletion_progress = 40.0 + ((i + 1) / total_futures) * 45.0
                    update_progress(
                        deletion_progress,
                        f"Deleted {subjects_deleted} of {subjects_found} subjects (individual requests)",
                    )
    
            update_progress(90.0, "Computing cleanup results")
    
            # Calculate metrics
            duration = time.time() - start_time
            success_rate = (subjects_deleted / subjects_found * 100) if subjects_found > 0 else 100.0
            performance = subjects_deleted / duration if duration > 0 else 0.0
    
            # Delete context if requested (not supported by Schema Registry API)
            if delete_context_after and subjects_deleted == subjects_found:
                context_deleted = False  # Context deletion not supported by API
                update_progress(95.0, "Context deletion not supported by API")
    
            update_progress(
                100.0,
                f"Cleanup completed - deleted {subjects_deleted} subjects using individual requests",
            )
    
            return {
                "subjects_found": subjects_found,
                "subjects_deleted": subjects_deleted,
                "context_deleted": context_deleted,
                "dry_run": False,
                "duration_seconds": duration,
                "success_rate": success_rate,
                "performance": performance,
                "message": f"Successfully cleared context '{context}' - deleted {subjects_deleted} subjects using individual requests",
                "errors": errors if errors else None,
                "registry": registry,
                "batching_method": "application_level",
                "compliance_note": "Uses individual requests per MCP 2025-06-18 specification (JSON-RPC batching disabled)",
            }
    
        except Exception as e:
            return {
                "subjects_found": subjects_found,
                "subjects_deleted": subjects_deleted,
                "context_deleted": False,
                "dry_run": dry_run,
                "duration_seconds": time.time() - start_time,
                "success_rate": 0.0,
                "performance": 0.0,
                "message": f"Batch cleanup failed: {str(e)}",
                "error": str(e),
                "registry": registry,
                "batching_method": "application_level",
            }
  • BATCH_OPERATION_SCHEMA: JSON Schema defining the structured output format for batch operations including clear_context_batch.
    # Batch operation response
    BATCH_OPERATION_SCHEMA = {
        "type": "object",
        "properties": {
            "operation": {"type": "string", "description": "Type of batch operation"},
            "dry_run": {"type": "boolean", "description": "Whether this was a dry run"},
            "total_items": {
                "type": "integer",
                "minimum": 0,
                "description": "Total items processed",
            },
            "successful": {
                "type": "integer",
                "minimum": 0,
                "description": "Number of successful operations",
            },
            "failed": {
                "type": "integer",
                "minimum": 0,
                "description": "Number of failed operations",
            },
            "errors": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Error messages from failed operations",
            },
            "details": {"type": "object", "description": "Detailed operation results"},
            **METADATA_FIELDS,
        },
        "required": ["operation", "dry_run", "total_items", "successful", "failed"],
        "additionalProperties": True,
    }
  • TOOL_OUTPUT_SCHEMAS mapping: Associates 'clear_context_batch' tool name with BATCH_OPERATION_SCHEMA for output validation.
    "clear_context_batch": BATCH_OPERATION_SCHEMA,
    "clear_multiple_contexts_batch": BATCH_OPERATION_SCHEMA,
    # Task Management
  • OPERATION_METADATA registration: Defines expected duration (MEDIUM) and async pattern (TASK_QUEUE) for the clear_context_batch tool, used for client guidance.
    "clear_context_batch": {
        "duration": OperationDuration.MEDIUM,
        "pattern": AsyncPattern.TASK_QUEUE,
    },
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description must fully disclose behavioral traits. It mentions 'application-level batching' and MCP compliance, hinting at operational constraints, but lacks details on permissions, rate limits, side effects (e.g., data loss), or response behavior. The warning icon suggests caution but without elaboration, leaving gaps in understanding the tool's impact.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is brief and front-loaded with the main action, followed by a compliance note. Both sentences are relevant, with no wasted words. However, the lack of detail on parameters and usage slightly undermines efficiency, as it could be more informative without sacrificing conciseness.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of a batch operation with 4 parameters, no annotations, and no output schema, the description is incomplete. It misses key details: parameter meanings, behavioral risks (e.g., data deletion), and expected outcomes. The MCP compliance note adds some context but does not suffice for safe and effective use.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must compensate for undocumented parameters. It does not explain any of the 4 parameters (context, delete_context_after, dry_run, registry), their purposes, or how they affect the operation. This leaves critical input semantics unclear, failing to add value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('Clear all subjects') and target resource ('in a context'), specifying it uses 'application-level batch operations'. It distinguishes from the sibling 'clear_multiple_contexts_batch' by focusing on a single context, though not explicitly named. The purpose is specific but could be more precise about what 'subjects' are.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No explicit guidance on when to use this tool versus alternatives like 'delete_context' or 'clear_multiple_contexts_batch' is provided. The description mentions 'application-level batching' and MCP compliance, which implies a specific technical context, but does not state when this approach is preferred or required over other methods.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server