test_config_scenarios.pyโข5.7 kB
#!/usr/bin/env python3
"""
Test Elasticsearch configuration changes without external dependencies.
"""
import json
import socket
from pathlib import Path
def load_config():
"""Load current configuration."""
config_path = Path(__file__).parent.parent / "src" / "config.json"
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
def update_config(new_settings):
"""Update configuration with new settings."""
config_path = Path(__file__).parent.parent / "src" / "config.json"
# Load current config
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Update elasticsearch settings
config["elasticsearch"].update(new_settings)
# Save updated config
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return config
def test_port_connection(host, port, timeout=3):
"""Test if a port is open using socket."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return True, f"โ
Port {port} is open on {host}"
else:
return False, f"๐ Port {port} is closed on {host}"
except socket.gaierror:
return False, f"โ Cannot resolve hostname {host}"
except Exception as e:
return False, f"โ Error testing {host}:{port} - {str(e)}"
def test_configuration_scenarios():
"""Test various Elasticsearch configuration scenarios."""
print("๐งช Testing Elasticsearch Configuration Scenarios")
print("=" * 60)
print()
# Show current config
current_config = load_config()
es_config = current_config["elasticsearch"]
current_host = es_config["host"]
current_port = es_config["port"]
print("๐ Current Configuration:")
print(f" - Host: {current_host}")
print(f" - Port: {current_port}")
print(f" - Auto Setup: {es_config.get('auto_setup', False)}")
print()
# Test current configuration
print("๐ Test 1: Current configuration connectivity")
success, message = test_port_connection(current_host, current_port)
print(f" {message}")
if success:
print(" ๐ Elasticsearch port is reachable")
print(" โ
This means Elasticsearch is likely running")
else:
print(" ๐ง Port not reachable - demonstrates connection error scenario")
print(" ๐ก Our enhanced error messages would guide users to:")
print(" ๐ฏ Use 'setup_elasticsearch' tool")
print(" โ๏ธ Check configuration with 'get_config' tool")
print()
# Test common alternative ports
test_ports = [9201, 9202, 9300]
print("๐ Test 2: Testing alternative ports (likely to fail)")
for test_port in test_ports:
success, message = test_port_connection(current_host, test_port)
print(f" {message}")
if not success:
print(f" ๐ก Connection to {test_port} failed - example of config mismatch")
print()
# Test host variations
print("๐ Test 3: Testing host variations")
test_hosts = ["localhost", "127.0.0.1"]
for test_host in test_hosts:
success, message = test_port_connection(test_host, current_port)
print(f" {message}")
if success:
print(f" โ
{test_host} resolves and port is reachable")
else:
print(f" โน๏ธ {test_host} connection failed")
print()
# Demonstrate config change impact
print("๐ Test 4: Demonstrating configuration change impact")
wrong_port = 9999
print(f" Temporarily changing port from {current_port} to {wrong_port}")
# Save original for restoration
original_config = current_config.copy()
# Update to wrong port
update_config({"port": wrong_port})
updated_config = load_config()
print(f" โ
Config updated: port is now {updated_config['elasticsearch']['port']}")
# Test connection with wrong port
success, message = test_port_connection(current_host, wrong_port)
print(f" {message}")
if not success:
print(" ๐ This demonstrates how incorrect config causes connection failures")
print(" ๐ฏ Enhanced error handling helps identify such configuration issues")
print()
# Restore original config
print("๐ Test 5: Restoring original configuration")
update_config({"port": current_port})
restored_config = load_config()
print(f" โ
Restored: port back to {restored_config['elasticsearch']['port']}")
# Verify restoration
success, message = test_port_connection(current_host, current_port)
print(f" {message}")
print()
# Summary of error handling improvements
print("๐ฏ Configuration Testing Summary:")
print("๐ก Key benefits of enhanced error handling:")
print(" ๐ Clear error categorization (Connection, Timeout, Index, etc.)")
print(" ๐ Specific problem identification")
print(" ๐ก Actionable solutions with tool suggestions")
print(" ๐ง Guidance for configuration issues")
print(" โ๏ธ Help with Elasticsearch setup and troubleshooting")
print()
print("๐ง Error scenarios tested:")
print(" ๐ Connection refused (port closed)")
print(" โ๏ธ Configuration mismatch (wrong port)")
print(" ๐ Host resolution variations")
print(" ๐ Configuration persistence and restoration")
if __name__ == "__main__":
test_configuration_scenarios()