Skip to main content
Glama

Claude Slack

test_simple_streaming.py6.43 kB
#!/usr/bin/env python3 """ Test for the simple streaming event system with auto-event proxy. Verifies that events are automatically emitted when API methods are called. """ import asyncio import json import sys import os from datetime import datetime # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from api.unified_api import ClaudeSlackAPI from api.events import EventTopic async def event_consumer(api, client_id, topics=None): """Consumer that subscribes to events""" print(f"\n[{client_id}] Starting consumer (topics: {topics or 'all'})...") events_received = [] try: # Subscribe with SSE format event_count = 0 async for sse_data in api.subscribe_sse(client_id, topics): # Parse SSE format lines = sse_data.strip().split('\n') for line in lines: if line.startswith('event:'): event_type = line[6:].strip() elif line.startswith('data:'): data = json.loads(line[5:].strip()) event_count += 1 events_received.append(data) print(f"\n[{client_id}] Event #{event_count}:") print(f" Type: {event_type}") print(f" Topic: {data.get('topic')}") print(f" Payload: {json.dumps(data.get('payload', {}), indent=2)}") # Stop after 5 events if event_count >= 5: print(f"\n[{client_id}] Received 5 events, stopping") break elif line.startswith(':'): # Heartbeat continue if event_count >= 5: break except asyncio.CancelledError: print(f"\n[{client_id}] Consumer cancelled") except Exception as e: print(f"\n[{client_id}] Error: {e}") return events_received async def test_auto_events(api): """Test that events are automatically emitted""" print("\n=== Testing Auto Event Emission ===") # Start a consumer for message events consumer_task = asyncio.create_task( event_consumer(api, "test-consumer", [EventTopic.MESSAGES]) ) # Give consumer time to start await asyncio.sleep(0.5) # Create test data print("\n[Test] Creating test agent...") await api.register_agent( name="test-agent", project_id=None, description="Test agent for streaming" ) print("\n[Test] Creating test channel...") channel_id = await api.create_channel( name="test-channel", description="Test channel", scope="global", created_by="test-agent" ) print("\n[Test] Joining channel...") await api.join_channel( agent_name="test-agent", agent_project_id=None, channel_id=channel_id ) # Send messages - these should auto-emit events! print("\n[Test] Sending messages (should auto-emit events)...") for i in range(3): msg_id = await api.send_message( channel_id=channel_id, sender_id="test-agent", content=f"Test message {i+1} - auto event test" ) print(f" Sent message ID: {msg_id}") await asyncio.sleep(0.1) # Wait for consumer to receive events await asyncio.sleep(1) # Cancel consumer consumer_task.cancel() events = await consumer_task print(f"\n[Test] Consumer received {len(events)} events") return events async def test_multiple_topics(api): """Test subscribing to multiple topics""" print("\n=== Testing Multiple Topics ===") # Start consumers for different topics all_consumer = asyncio.create_task( event_consumer(api, "all-consumer", None) # All topics ) channel_consumer = asyncio.create_task( event_consumer(api, "channel-consumer", [EventTopic.CHANNELS]) ) # Give consumers time to start await asyncio.sleep(0.5) # Create events of different types print("\n[Test] Creating mixed events...") # Channel event channel_id = await api.create_channel( name="multi-test-channel", description="Multi-topic test", scope="global", created_by="test-agent" ) # Member event await api.join_channel( agent_name="test-agent", agent_project_id=None, channel_id=channel_id ) # Message event await api.send_message( channel_id=channel_id, sender_id="test-agent", content="Multi-topic test message" ) # Wait and cancel await asyncio.sleep(2) all_consumer.cancel() channel_consumer.cancel() all_events = await all_consumer channel_events = await channel_consumer print(f"\n[Test] All-consumer got {len(all_events)} events") print(f"[Test] Channel-consumer got {len(channel_events)} events") async def test_statistics(api): """Test event statistics""" print("\n=== Testing Statistics ===") stats = api.get_event_stats() print("\nEvent System Statistics:") for key, value in stats.items(): print(f" {key}: {value}") async def main(): """Main test function""" print("=" * 60) print("Simple Streaming Event System Test") print("=" * 60) # Initialize API api = ClaudeSlackAPI( db_path="/tmp/test_simple_streaming.db", enable_semantic_search=False ) print("\nInitializing API...") await api.initialize() try: # Test auto event emission events = await test_auto_events(api) # Verify events were emitted assert len(events) > 0, "No events received!" print("\n✅ Auto event emission working!") # Test multiple topics await test_multiple_topics(api) # Show statistics await test_statistics(api) finally: # Cleanup print("\n\nCleaning up...") await api.close() print("\n" + "=" * 60) print("Test completed successfully!") print("=" * 60) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theo-nash/claude-slack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server