We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Tests for Bulk Operations Wizard
"""
import os
import sys
from datetime import datetime
from unittest.mock import AsyncMock, Mock
import pytest
# Add project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from bulk_operations_wizard import BulkOperationPreview, BulkOperationsWizard, BulkOperationType
class TestBulkOperationsWizard:
"""Test suite for BulkOperationsWizard"""
@pytest.fixture
def mock_registry(self):
"""Create mock registry client"""
registry = Mock()
registry.list_subjects = AsyncMock(
return_value=["test-schema-1", "test-schema-2", "prod-schema-1", "deprecated-schema-1", "old-schema-1"]
)
return registry
@pytest.fixture
def mock_elicitation(self):
"""Create mock elicitation manager"""
elicitation = Mock()
elicitation.elicit = AsyncMock()
elicitation.show_info = AsyncMock()
return elicitation
@pytest.fixture
def mock_task_manager(self):
"""Create mock task manager"""
task_manager = Mock()
task_manager.create_task = AsyncMock(return_value="task-123")
task_manager.update_progress = AsyncMock()
task_manager.complete_task = AsyncMock()
task_manager.fail_task = AsyncMock()
task_manager.get_task_status = AsyncMock(return_value={"status": "completed", "processed": 5, "total_items": 5})
return task_manager
@pytest.fixture
def mock_batch_ops(self):
"""Create mock batch operations"""
return Mock()
@pytest.fixture
def wizard(self, mock_registry, mock_elicitation, mock_task_manager, mock_batch_ops):
"""Create wizard instance with mocks"""
return BulkOperationsWizard(mock_registry, mock_elicitation, mock_task_manager, mock_batch_ops)
@pytest.mark.asyncio
async def test_wizard_initialization(self, wizard):
"""Test wizard initialization"""
assert wizard.registry is not None
assert wizard.elicitation is not None
assert wizard.task_manager is not None
assert wizard.batch_ops is not None
assert len(wizard._operation_handlers) == 4
@pytest.mark.asyncio
async def test_start_wizard_with_operation_type(self, wizard):
"""Test starting wizard with pre-selected operation type"""
# Mock the handler
mock_handler = AsyncMock(return_value={"status": "success"})
wizard._operation_handlers[BulkOperationType.CLEANUP] = mock_handler
result = await wizard.start_wizard(BulkOperationType.CLEANUP)
assert result["status"] == "success"
mock_handler.assert_called_once()
@pytest.mark.asyncio
async def test_start_wizard_without_operation_type(self, wizard):
"""Test starting wizard without operation type (elicits from user)"""
# Mock elicitation response
wizard.elicitation.elicit.return_value = {"operation_type": "cleanup"}
# Mock the handler
mock_handler = AsyncMock(return_value={"status": "success"})
wizard._operation_handlers[BulkOperationType.CLEANUP] = mock_handler
result = await wizard.start_wizard()
assert result["status"] == "success"
wizard.elicitation.elicit.assert_called_once()
@pytest.mark.asyncio
async def test_schema_selection_simple(self, wizard):
"""Test simple schema selection"""
# Mock elicitation response
wizard.elicitation.elicit.return_value = {"selected_schemas": ["test-schema-1", "test-schema-2"]}
result = await wizard._elicit_schema_selection("Select schemas")
assert len(result) == 2
assert "test-schema-1" in result
assert "test-schema-2" in result
@pytest.mark.asyncio
async def test_schema_selection_with_patterns(self, wizard):
"""Test schema selection with pattern matching"""
# Mock elicitation response with pattern
wizard.elicitation.elicit.return_value = {"selected_schemas": ["test-*", "prod-schema-1"]}
result = await wizard._elicit_schema_selection("Select schemas", allow_patterns=True)
# Should expand test-* pattern
assert "test-schema-1" in result
assert "test-schema-2" in result
assert "prod-schema-1" in result
assert len(result) == 3
@pytest.mark.asyncio
async def test_preview_generation(self, wizard):
"""Test preview generation"""
items = ["schema1", "schema2", "schema3"]
params = {"update_type": "compatibility", "level": "BACKWARD"}
preview = await wizard._generate_preview(BulkOperationType.SCHEMA_UPDATE, items, params)
assert isinstance(preview, BulkOperationPreview)
assert preview.total_count == 3
assert preview.estimated_duration == 1.5 # 0.5 seconds per item
assert len(preview.affected_items) == 3
@pytest.mark.asyncio
async def test_preview_with_warnings(self, wizard):
"""Test preview generation with warnings"""
# Large batch
items = ["schema" + str(i) for i in range(150)]
params = {}
preview = await wizard._generate_preview(BulkOperationType.CLEANUP, items, params)
assert len(preview.warnings) >= 2 # Large batch warning + destructive op warning
assert any("Large batch size" in w for w in preview.warnings)
assert any("destructive" in w for w in preview.warnings)
@pytest.mark.asyncio
async def test_confirmation_flow(self, wizard):
"""Test confirmation workflow"""
preview = BulkOperationPreview(
affected_items=[{"name": "test"}], total_count=1, changes_summary={}, estimated_duration=0.5
)
# Mock elicitation responses
wizard.elicitation.elicit.side_effect = [
{"show_preview": False}, # Don't show preview
{"proceed": True}, # Confirm operation
]
result = await wizard._confirm_operation(preview)
assert result is True
assert wizard.elicitation.elicit.call_count == 2
@pytest.mark.asyncio
async def test_execute_operation_success(self, wizard):
"""Test successful operation execution"""
items = ["schema1", "schema2"]
params = {"batch_size": 2, "create_backup": False}
preview = Mock()
# Mock process_batch to return success
wizard._process_batch = AsyncMock(return_value={"batch_size": 2, "success": 2, "failed": 0, "errors": []})
result = await wizard._execute_bulk_operation(BulkOperationType.SCHEMA_UPDATE, items, params, preview)
assert result["status"] == "success"
assert result["processed"] == 2
assert "task_id" in result
wizard.task_manager.create_task.assert_called_once()
wizard.task_manager.complete_task.assert_called_once()
@pytest.mark.asyncio
async def test_execute_operation_with_error_and_rollback(self, wizard):
"""Test operation execution with error and rollback"""
items = ["schema1", "schema2"]
params = {"rollback_on_error": True}
preview = Mock()
# Mock process_batch to raise error
wizard._process_batch = AsyncMock(side_effect=Exception("Test error"))
wizard._rollback_operation = AsyncMock()
result = await wizard._execute_bulk_operation(BulkOperationType.SCHEMA_UPDATE, items, params, preview)
assert result["status"] == "failed"
assert "Test error" in result["error"]
wizard._rollback_operation.assert_called_once()
wizard.task_manager.fail_task.assert_called_once()
@pytest.mark.asyncio
async def test_consumer_impact_detection(self, wizard):
"""Test consumer impact detection for cleanup operations"""
items = ["schema1"]
params = {"cleanup_type": "deprecated"}
preview = await wizard._generate_preview(BulkOperationType.CLEANUP, items, params)
assert preview.consumer_impact is not None
assert "active_consumers" in preview.consumer_impact
assert "consumer_groups" in preview.consumer_impact
@pytest.mark.asyncio
async def test_batch_processing(self, wizard):
"""Test batch processing with delays"""
items = ["schema" + str(i) for i in range(5)]
params = {"batch_size": 2, "delay_between_batches": 0.1, "create_backup": False}
preview = Mock()
wizard._process_batch = AsyncMock(return_value={"success": 2})
start_time = datetime.now()
await wizard._execute_bulk_operation(BulkOperationType.SCHEMA_UPDATE, items, params, preview)
duration = (datetime.now() - start_time).total_seconds()
# Should have 3 batches (2+2+1) with 2 delays
assert wizard._process_batch.call_count == 3
assert duration >= 0.2 # At least 2 delays of 0.1s
@pytest.mark.asyncio
async def test_bulk_cleanup_workflow(self, wizard):
"""Test complete bulk cleanup workflow"""
# Mock all elicitation responses
wizard.elicitation.elicit.side_effect = [
{"cleanup_type": "deprecated"}, # Cleanup type
{"selected_schemas": ["deprecated-schema-1"]}, # Schema selection
{"check_consumers": True}, # Consumer check option
{"force": False}, # Force option
{"consumer_action": "skip"}, # Consumer impact action
{"show_preview": False}, # Don't show preview
{"proceed": True}, # Confirm operation
]
# Mock other methods
wizard._generate_preview = AsyncMock(
return_value=BulkOperationPreview(
affected_items=[{"name": "deprecated-schema-1"}],
total_count=1,
changes_summary={},
estimated_duration=0.5,
consumer_impact={"active_consumers": 1},
)
)
wizard._execute_bulk_operation = AsyncMock(return_value={"status": "success", "processed": 1})
result = await wizard._handle_bulk_cleanup()
assert result["status"] == "success"
assert wizard.elicitation.elicit.call_count == 7
@pytest.mark.asyncio
async def test_operation_cancellation(self, wizard):
"""Test operation cancellation"""
# Mock confirmation to return False
wizard.elicitation.elicit.side_effect = [
{"show_preview": False}, # Don't show preview
{"proceed": False}, # Don't proceed
]
wizard._elicit_schema_selection = AsyncMock(return_value=["test"])
wizard._elicit_update_type = AsyncMock(return_value="compatibility")
wizard._elicit_update_parameters = AsyncMock(return_value={})
wizard._generate_preview = AsyncMock(
return_value=BulkOperationPreview(
affected_items=[{"name": "test"}], total_count=1, changes_summary={}, estimated_duration=0.5
)
)
result = await wizard._handle_bulk_schema_update()
assert result["status"] == "cancelled"
assert result["reason"] == "User cancelled operation"
if __name__ == "__main__":
pytest.main([__file__, "-v"])