#!/usr/bin/env python3
"""
Test script for Schema Registry metadata endpoints.
This script demonstrates how to use the new metadata endpoints to gather
information about schema registries including version, commit ID, and
cluster information.
Usage:
python test-registry-metadata.py
"""
import json
import requests
# Configuration
SCHEMA_REGISTRY_URL = "http://localhost:8081"
MCP_SERVER_URL = "http://localhost:38000"
def test_direct_metadata_endpoints():
"""Test the metadata endpoints directly against the Schema Registry."""
print("š Testing direct Schema Registry metadata endpoints...")
# Test /v1/metadata/id endpoint
print("\nš Testing /v1/metadata/id endpoint:")
try:
response = requests.get(f"{SCHEMA_REGISTRY_URL}/v1/metadata/id", timeout=10)
if response.status_code == 200:
metadata_id = response.json()
print(f" ā
Metadata ID: {json.dumps(metadata_id, indent=2)}")
# Extract cluster information
scope = metadata_id.get("scope", {})
clusters = scope.get("clusters", {})
kafka_cluster = clusters.get("kafka-cluster")
schema_registry_cluster = clusters.get("schema-registry-cluster")
print(f" š Kafka Cluster ID: {kafka_cluster}")
print(f" š Schema Registry Cluster ID: {schema_registry_cluster}")
else:
print(f" ā Failed to get metadata ID: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting metadata ID: {e}")
# Test /v1/metadata/version endpoint
print("\nš Testing /v1/metadata/version endpoint:")
try:
response = requests.get(f"{SCHEMA_REGISTRY_URL}/v1/metadata/version", timeout=10)
if response.status_code == 200:
metadata_version = response.json()
print(f" ā
Metadata Version: {json.dumps(metadata_version, indent=2)}")
# Extract version information
version = metadata_version.get("version")
commit_id = metadata_version.get("commitId")
print(f" š Schema Registry Version: {version}")
print(f" š Commit ID: {commit_id}")
else:
print(f" ā Failed to get metadata version: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting metadata version: {e}")
def test_mcp_metadata_tools():
"""Test the enhanced MCP tools that now include metadata."""
print("\nš§ Testing enhanced MCP tools with metadata...")
# Test enhanced connection test tool
print("\nš Testing test_registry_connection tool (now enhanced with metadata):")
try:
response = requests.post(f"{MCP_SERVER_URL}/mcp/tools/test_registry_connection", json={}, timeout=10)
if response.status_code == 200:
connection_info = response.json()
print(f" ā
Enhanced connection test: {json.dumps(connection_info, indent=2)}")
else:
print(f" ā Failed to get enhanced connection test: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting enhanced connection test: {e}")
# Test enhanced statistics tools
print("\nš Testing count_schemas tool (now enhanced with metadata):")
try:
response = requests.post(f"{MCP_SERVER_URL}/mcp/tools/count_schemas", json={}, timeout=10)
if response.status_code == 200:
stats_info = response.json()
print(f" ā
Enhanced schema count: {json.dumps(stats_info, indent=2)}")
else:
print(f" ā Failed to get enhanced schema count: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting enhanced schema count: {e}")
def test_enhanced_registry_info():
"""Test the enhanced get_registry_info tool."""
print("\nš§ Testing enhanced get_registry_info tool...")
try:
response = requests.post(f"{MCP_SERVER_URL}/mcp/tools/get_registry_info", json={}, timeout=10)
if response.status_code == 200:
registry_info = response.json()
print(f" ā
Enhanced registry info: {json.dumps(registry_info, indent=2)}")
# Highlight the new metadata fields
if "version" in registry_info:
print(f" š Registry Version: {registry_info['version']}")
if "commit_id" in registry_info:
print(f" š Commit ID: {registry_info['commit_id']}")
if "kafka_cluster_id" in registry_info:
print(f" š Kafka Cluster ID: {registry_info['kafka_cluster_id']}")
if "schema_registry_cluster_id" in registry_info:
print(f" š Schema Registry Cluster ID: {registry_info['schema_registry_cluster_id']}")
else:
print(f" ā Failed to get enhanced registry info: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting enhanced registry info: {e}")
def test_enhanced_statistics():
"""Test the enhanced statistics tools that now include metadata."""
print("\nš§ Testing enhanced statistics tools...")
# Test enhanced registry statistics
print("\nš Testing get_registry_statistics tool (now enhanced with metadata):")
try:
response = requests.post(f"{MCP_SERVER_URL}/mcp/tools/get_registry_statistics", json={}, timeout=10)
if response.status_code == 200:
stats_info = response.json()
print(f" ā
Enhanced registry statistics: {json.dumps(stats_info, indent=2)}")
# Highlight the new metadata fields
if "version" in stats_info:
print(f" š Registry Version: {stats_info['version']}")
if "commit_id" in stats_info:
print(f" š Commit ID: {stats_info['commit_id']}")
else:
print(f" ā Failed to get enhanced registry statistics: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting enhanced registry statistics: {e}")
# Test enhanced contexts count
print("\nš Testing count_contexts tool (now enhanced with metadata):")
try:
response = requests.post(f"{MCP_SERVER_URL}/mcp/tools/count_contexts", json={}, timeout=10)
if response.status_code == 200:
contexts_info = response.json()
print(f" ā
Enhanced contexts count: {json.dumps(contexts_info, indent=2)}")
else:
print(f" ā Failed to get enhanced contexts count: HTTP {response.status_code}")
except Exception as e:
print(f" ā Error getting enhanced contexts count: {e}")
def main():
"""Run all metadata tests."""
print("š Schema Registry Metadata Testing")
print("=" * 50)
# Test direct endpoints
test_direct_metadata_endpoints()
# Test enhanced MCP tools
test_mcp_metadata_tools()
# Test enhanced registry info
test_enhanced_registry_info()
# Test enhanced statistics
test_enhanced_statistics()
print("\nā
Metadata testing completed!")
print("\nš” Usage Examples:")
print(" - Use get_registry_info() for comprehensive registry information with metadata")
print(" - Use test_registry_connection() for connection testing with metadata")
print(" - Use count_schemas(), count_contexts(), etc. for statistics with metadata")
print(" - Use get_registry_statistics() for comprehensive stats with metadata")
if __name__ == "__main__":
main()