Skip to main content
Glama
aywengo

MCP Kafka Schema Reg

delete_subject

Remove a subject and all its versions from the Kafka Schema Registry. Supports hard delete to permanently erase metadata and schema IDs for complete cleanup.

Instructions

Delete a subject and all its versions.

Args: subject: The subject name to delete context: Optional schema context registry: Optional registry name permanent: If True, perform a hard delete (removes all metadata including schema ID)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contextNo
permanentNo
registryNo
subjectYes

Implementation Reference

  • Main handler function for the 'delete_subject' tool. Performs HTTP DELETE to /subjects/{subject} (optionally with permanent=true), handles single/multi registry modes, adds metadata and HATEOAS links, returns deleted versions.
    @structured_output("delete_subject", fallback_on_error=True)
    async def delete_subject_tool(
        subject: str,
        registry_manager,
        registry_mode: str,
        context: Optional[str] = None,
        registry: Optional[str] = None,
        permanent: bool = False,
        auth=None,
        headers=None,
        schema_registry_url: str = "",
    ) -> Dict[str, Any]:
        """
        Delete a subject and all its versions.
    
        Args:
            subject: The subject name to delete
            context: Optional schema context
            registry: Optional registry name (ignored in single-registry mode)
            permanent: If True, perform a hard delete (removes all metadata including schema ID)
    
        Returns:
            Dictionary containing deleted version numbers with structured validation and resource links
        """
        # Check viewonly mode
        viewonly_check = _check_viewonly_mode(registry_manager, registry)
        if viewonly_check:
            return validate_registry_response(viewonly_check, registry_mode)
    
        try:
            if registry_mode == "single":
                # Single-registry mode: use secure session approach
                client = registry_manager.get_registry()
                if client is None:
                    return create_error_response(
                        "No default registry configured",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="single",
                    )
    
                url = client.build_context_url(f"/subjects/{subject}", context)
    
                # Add permanent parameter if specified
                if permanent:
                    url += "?permanent=true"
    
                response = client.session.delete(url, auth=client.auth, headers=client.headers)
                response.raise_for_status()
                deleted_versions = response.json()
    
                # Convert to enhanced response format
                result = {
                    "subject": subject,
                    "deleted_versions": deleted_versions,
                    "permanent": permanent,
                    "context": context,
                    "registry_mode": "single",
                    "mcp_protocol_version": "2025-06-18",
                }
    
                # Add links to subjects list since the specific subject is now deleted
                registry_name = _get_registry_name(registry_mode, registry)
                result = add_links_to_response(result, "subjects_list", registry_name, context=context)
    
                return result
            else:
                # Multi-registry mode: use client approach
                client = registry_manager.get_registry(registry)
                if client is None:
                    return create_error_response(
                        f"Registry '{registry}' not found",
                        error_code="REGISTRY_NOT_FOUND",
                        registry_mode="multi",
                    )
    
                url = client.build_context_url(f"/subjects/{subject}", context)
    
                # Add permanent parameter if specified
                if permanent:
                    url += "?permanent=true"
    
                # Use aiohttp for async HTTP requests
                async with aiohttp.ClientSession() as session:
                    async with session.delete(url, headers=client.headers) as response:
                        response.raise_for_status()
                        deleted_versions = await response.json()
    
                # Convert to enhanced response format
                result = {
                    "subject": subject,
                    "deleted_versions": deleted_versions,
                    "permanent": permanent,
                    "context": context,
                    "registry": client.config.name,
                    "registry_mode": "multi",
                    "mcp_protocol_version": "2025-06-18",
                }
    
                # Add links to subjects list since the specific subject is now deleted
                result = add_links_to_response(result, "subjects_list", client.config.name, context=context)
    
                return result
        except Exception as e:
            return create_error_response(str(e), error_code="SUBJECT_DELETE_FAILED", registry_mode=registry_mode)
  • JSON Schema definition for the output of the 'delete_subject' tool, including deleted_versions list and metadata fields.
    "delete_subject": {
        "oneOf": [
            {
                "type": "object",
                "properties": {
                    "subject": {"type": "string"},
                    "deleted_versions": {
                        "type": "array",
                        "items": {"type": "integer"},
                    },
                    "permanent": {"type": "boolean"},
                    "context": {"type": ["string", "null"]},
                    **METADATA_FIELDS,
                },
                "required": ["subject", "deleted_versions"],
            },
            ERROR_RESPONSE_SCHEMA,  # Allow error responses
        ],
    },  # Returns object with deleted versions or error
  • Helper function _delete_subject_from_context used in batch operations to perform individual subject deletions via DELETE /subjects/{subject}.
    def _delete_subject_from_context(registry_client, subject: str, context: Optional[str] = None) -> bool:
        """Helper function to delete a subject from a context using individual request.
    
        Note: This makes a single HTTP request per subject, replacing previous
        JSON-RPC batching approach for MCP 2025-06-18 compliance.
        """
        try:
            url = registry_client.build_context_url(f"/subjects/{subject}", context)
            response = registry_client.session.delete(url, auth=registry_client.auth, headers=registry_client.headers)
            return response.status_code in [200, 404]  # 404 is OK, subject already deleted
        except Exception:
            return False
  • Operation metadata entry for 'delete_subject' indicating it is a quick, direct async operation.
    "delete_subject": {
        "duration": OperationDuration.QUICK,
        "pattern": AsyncPattern.DIRECT,
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server