Skip to main content
Glama

IntelliGlow

by shree-bd
test_network_bulbs.py8.12 kB
#!/usr/bin/env python3 """ Test script for UDP Network Smart Bulb Communication This script demonstrates real UDP networking with smart bulbs, matching the architecture shown in the user's diagram. """ import asyncio import logging from mcp_server_smartbulb.udp_client import UDPBulbClient, BulbConfig from mcp_server_smartbulb.bulb_discovery import BulbDiscovery # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def test_direct_bulb_connection(): """Test direct connection to the bulb at 192.168.1.45:4000""" print("\n" + "="*60) print("🔌 TESTING DIRECT BULB CONNECTION") print("="*60) # Create configuration for the specific bulb config = BulbConfig(ip="192.168.1.45", port=4000, timeout=5.0) client = UDPBulbClient(config) try: print(f"🔗 Connecting to bulb at {config.ip}:{config.port}...") await client.connect() print("✅ Connection successful!") # Test ping print("\n🏓 Testing ping...") ping_result = await client.ping() print(f"Ping result: {'✅ SUCCESS' if ping_result else '❌ FAILED'}") # Get initial status print("\n📊 Getting initial status...") status = await client.get_status() print(f"Current status: {status}") # Test turn on print("\n💡 Testing turn ON...") on_result = await client.turn_on() if on_result: print("✅ Turned ON successfully") await asyncio.sleep(1) # Wait a moment status = await client.get_status() print(f"New status: {status}") else: print("❌ Failed to turn on") # Test brightness print("\n🔆 Testing brightness control...") brightness_result = await client.set_brightness(75) if brightness_result: print("✅ Set brightness to 75% successfully") await asyncio.sleep(1) status = await client.get_status() print(f"New status: {status}") else: print("❌ Failed to set brightness") # Test color print("\n🎨 Testing color control...") color_result = await client.set_color_hex("#FF0000") # Red if color_result: print("✅ Set color to red successfully") await asyncio.sleep(2) status = await client.get_status() print(f"New status: {status}") else: print("❌ Failed to set color") # Test turn off print("\n🌑 Testing turn OFF...") off_result = await client.turn_off() if off_result: print("✅ Turned OFF successfully") await asyncio.sleep(1) status = await client.get_status() print(f"Final status: {status}") else: print("❌ Failed to turn off") except Exception as e: print(f"❌ Error during testing: {e}") finally: await client.close() print("🔌 Connection closed") async def test_network_discovery(): """Test network discovery for smart bulbs""" print("\n" + "="*60) print("🔍 TESTING NETWORK DISCOVERY") print("="*60) discovery = BulbDiscovery() try: print("🌐 Scanning network for smart bulbs...") discovered = await discovery.discover_bulbs(timeout=10.0) if discovered: print(f"✅ Found {len(discovered)} smart bulbs:") for bulb in discovered: print(f" • {bulb.ip}:{bulb.port} (response: {bulb.response_time:.2f}s)") # Try to connect to discovered bulb print(f" 🔗 Connecting to {bulb.ip}:{bulb.port}...") try: client = await discovery.connect_to_bulb(bulb.ip, bulb.port) status = await client.get_status() print(f" ✅ Connected! Status: {status}") except Exception as e: print(f" ❌ Connection failed: {e}") else: print("🔍 No smart bulbs found on the network") print("💡 Make sure your bulb is:") print(" • Connected to the same network") print(" • Powered on and responsive") print(" • Listening on the expected port (4000)") except Exception as e: print(f"❌ Discovery error: {e}") finally: await discovery.close_all_connections() async def test_mcp_simulation(): """Simulate MCP commands that an AI would make""" print("\n" + "="*60) print("🤖 SIMULATING AI/MCP COMMANDS") print("="*60) discovery = BulbDiscovery() try: # Command 1: Discover bulbs (what AI would do first) print("🤖 AI Command: 'Find all smart bulbs in my home'") discovered = await discovery.discover_bulbs(timeout=5.0) print(f"📡 Response: Found {len(discovered)} bulbs") if not discovered: # Try the known bulb directly print("🤖 AI Command: 'Connect to bulb at 192.168.1.45:4000'") try: client = await discovery.connect_to_bulb("192.168.1.45", 4000) print("📡 Response: Connected successfully") discovered = [type('DiscoveredBulb', (), {'ip': '192.168.1.45', 'port': 4000})()] except Exception as e: print(f"📡 Response: Connection failed - {e}") return if discovered: bulb = discovered[0] client = await discovery.connect_to_bulb(bulb.ip, bulb.port) # Command 2: Turn on the lights print("\n🤖 AI Command: 'Turn on the lights'") success = await client.turn_on() print(f"📡 Response: {'Lights turned on' if success else 'Failed to turn on lights'}") # Command 3: Make them bright print("\n🤖 AI Command: 'Make the lights 90% bright'") success = await client.set_brightness(90) print(f"📡 Response: {'Brightness set to 90%' if success else 'Failed to set brightness'}") # Command 4: Change color print("\n🤖 AI Command: 'Change the color to blue'") success = await client.set_color_hex("#0000FF") print(f"📡 Response: {'Color changed to blue' if success else 'Failed to change color'}") # Command 5: Get status print("\n🤖 AI Command: 'What's the current status of the lights?'") status = await client.get_status() print(f"📡 Response: {status}") await asyncio.sleep(3) # Let the bulb stay blue for a moment # Command 6: Turn off print("\n🤖 AI Command: 'Turn off all the lights'") success = await client.turn_off() print(f"📡 Response: {'Lights turned off' if success else 'Failed to turn off lights'}") except Exception as e: print(f"❌ MCP simulation error: {e}") finally: await discovery.close_all_connections() async def main(): """Main test runner""" print("🧪 SMART BULB UDP NETWORK TESTING") print("Architecture: AI → MCP → UDP → Smart Bulb (192.168.1.45:4000)") print("This matches your TypeScript implementation!") # Test 1: Direct connection await test_direct_bulb_connection() # Test 2: Network discovery await test_network_discovery() # Test 3: MCP simulation await test_mcp_simulation() print("\n" + "="*60) print("🎉 ALL TESTS COMPLETED!") print("="*60) print("Now you can use the MCP server with real UDP networking!") print("Use: python -m mcp_server_smartbulb.network_server") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shree-bd/IntelliGlow-AI-Voice-MCP-IoT-Platform'

If you have feedback or need assistance with the MCP directory API, please join our Discord server