#!/usr/bin/env python3
"""
Lightweight Migration Tests
This test validates migration functionality using the existing
multi-registry environment without requiring additional setup.
"""
import asyncio
import atexit
import os
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import aiohttp
import pytest
# Add parent directory to path to import the MCP server
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import kafka_schema_registry_unified_mcp as mcp_server
# Global executor for cleanup
_executor = None
def cleanup_executor():
"""Cleanup function to be called at exit"""
global _executor
if _executor:
_executor.shutdown(wait=False)
_executor = None
# Note: task_manager was removed - FastMCP 2.14.0+ handles tasks internally
# Register cleanup
atexit.register(cleanup_executor)
@pytest.fixture(scope="function")
async def test_env():
"""Fixture to set up test environment"""
dev_url = "http://localhost:38081"
prod_url = "http://localhost:38082"
# Setup environment for multi-registry mode
os.environ["SCHEMA_REGISTRY_NAME_1"] = "dev"
os.environ["SCHEMA_REGISTRY_URL_1"] = dev_url
os.environ["VIEWONLY_1"] = "false"
os.environ["SCHEMA_REGISTRY_NAME_2"] = "prod"
os.environ["SCHEMA_REGISTRY_URL_2"] = prod_url
os.environ["VIEWONLY_2"] = "false" # Allow writes for testing
# Clear any other registry configurations
for i in range(3, 9):
for var in [
f"SCHEMA_REGISTRY_NAME_{i}",
f"SCHEMA_REGISTRY_URL_{i}",
f"VIEWONLY_{i}",
]:
if var in os.environ:
del os.environ[var]
# Reinitialize registry manager with multi-registry config
mcp_server.registry_manager._load_multi_registries()
# Create a session for async HTTP requests
session = aiohttp.ClientSession()
yield {"dev_url": dev_url, "prod_url": prod_url, "session": session}
# Cleanup
await session.close()
# Reset registry manager
mcp_server.registry_manager.registries.clear()
mcp_server.registry_manager.default_registry = None
# FastMCP handles task cleanup automatically via Docket
@pytest.fixture(autouse=True)
async def cleanup_after_test():
"""Cleanup fixture that runs after each test"""
yield
# FastMCP handles task cleanup automatically via Docket
pass
@pytest.mark.asyncio
async def test_default_context_url_building(test_env):
"""Test that default context URL building works correctly"""
print("\nš Testing default context URL building...")
try:
# Get client
client = mcp_server.registry_manager.get_registry("dev")
if not client:
print(" ā Could not get DEV registry client")
return False
# Test URL building with different context values
url_none = client.build_context_url("/subjects", None)
url_dot = client.build_context_url("/subjects", ".")
url_empty = client.build_context_url("/subjects", "")
url_production = client.build_context_url("/subjects", "production")
print(" š URL Building Results:")
print(f" context=None: {url_none}")
print(f" context='.': {url_dot}")
print(f" context='': {url_empty}")
print(f" context='production': {url_production}")
# Verify the fix: context='.' should be treated like None
if url_none != url_dot:
print(" ā FAILURE: context=None and context='.' produce different URLs")
return False
# Verify that production context is different
if url_none == url_production:
print(" ā FAILURE: default context URL same as production context URL")
return False
print(" ā
Default context URL building is correct")
return True
except Exception as e:
print(f" ā Default context URL building test failed: {e}")
return False
@pytest.mark.asyncio
async def test_registry_comparison(test_env):
"""Test registry comparison functionality"""
print("\nš Testing registry comparison...")
try:
# Compare dev and prod registries
comparison = await mcp_server.compare_registries("dev", "prod")
if "error" in comparison:
print(f" ā Registry comparison failed: {comparison['error']}")
return False
# FastMCP handles task tracking via Docket
# Results are returned directly from async operations
print(" ā
Registry comparison successful")
subjects_info = comparison.get("subjects", {})
if subjects_info:
source_total = subjects_info.get("source_total", 0)
target_total = subjects_info.get("target_total", 0)
common = len(subjects_info.get("common", []))
source_only = len(subjects_info.get("source_only", []))
target_only = len(subjects_info.get("target_only", []))
print(" š Comparison Results:")
print(f" DEV subjects: {source_total}")
print(f" PROD subjects: {target_total}")
print(f" Common: {common}")
print(f" DEV only: {source_only}")
print(f" PROD only: {target_only}")
return True
except Exception as e:
print(f" ā Registry comparison test failed: {e}")
return False
@pytest.mark.asyncio
async def test_migration_tools_availability(test_env):
"""Test that migration tools are available and working"""
print("\nš ļø Testing migration tools availability...")
try:
# Test find_missing_schemas
missing_schemas = await mcp_server.find_missing_schemas("dev", "prod")
if "error" in missing_schemas:
print(f" ā find_missing_schemas failed: {missing_schemas['error']}")
return False
# FastMCP handles task tracking via Docket
# Results are returned directly from async operations
if "error" in missing_schemas:
print(f" ā find_missing_schemas failed: {missing_schemas['error']}")
return False
print(" ā
find_missing_schemas working")
print(f" Missing schemas: {missing_schemas.get('missing_count', 0)}")
# Test compare_contexts_across_registries (if contexts exist)
try:
context_comparison = await mcp_server.compare_contexts_across_registries("dev", "prod", ".")
if "error" not in context_comparison:
# FastMCP handles task tracking via Docket
# Results are returned directly from async operations
pass
print(" ā
compare_contexts_across_registries working")
subjects_info = context_comparison.get("subjects", {})
if subjects_info:
print(
f" Default context - DEV: {subjects_info.get('source_total', 0)}, PROD: {subjects_info.get('target_total', 0)}"
)
else:
print(f" ā ļø compare_contexts_across_registries: {context_comparison['error']}")
except Exception as e:
print(f" ā ļø compare_contexts_across_registries error: {e}")
return True
except Exception as e:
print(f" ā Migration tools availability test failed: {e}")
return False
if __name__ == "__main__":
try:
pytest.main([__file__, "-v"])
finally:
# Ensure cleanup happens even if tests are interrupted
cleanup_executor()