Skip to main content
Glama
streaming_validator_agent.py37.3 kB
#!/usr/bin/env python3 """ Streaming Validator Agent for MCP Integration Validates JSONL streaming command execution and response handling between MCP server and Claude Code with real-time verification. """ import os import sys import json import asyncio import time from pathlib import Path from typing import Dict, Any, List, Optional, AsyncIterator from datetime import datetime import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from test_agent_base import TestAgentBase logger = logging.getLogger(__name__) class StreamingValidatorAgent(TestAgentBase): """Test agent for validating JSONL streaming functionality.""" def __init__(self): super().__init__( name="StreamingValidatorAgent", description="Validates MCP JSONL streaming with real-time command execution" ) self.streaming_dir = self.test_base_dir / "test-streaming" self.active_streams = {} self.stream_metrics = {} async def validate_prerequisites(self) -> bool: """Validate streaming test prerequisites.""" try: # Ensure streaming directory exists self.streaming_dir.mkdir(parents=True, exist_ok=True) # Test basic streaming capability test_result = await self.execute_mcp_operation( "test_streaming", {} ) if not test_result.get("success", False): logger.error("Streaming not supported by MCP server") return False return True except Exception as e: logger.error(f"Prerequisites validation failed: {e}") return False async def execute_test_scenarios(self) -> List[Dict[str, Any]]: """ Execute comprehensive streaming test scenarios. Tests: 1. Basic streaming output 2. Large streaming data 3. Real-time streaming validation 4. Concurrent streams 5. Stream interruption handling 6. Backpressure management 7. Binary streaming 8. Error propagation in streams """ test_results = [] # Test 1: Basic Streaming logger.info("Test 1: Basic streaming output") result = await self._test_basic_streaming() test_results.append(result) # Test 2: Large Data Streaming logger.info("Test 2: Large streaming data") result = await self._test_large_streaming() test_results.append(result) # Test 3: Real-time Validation logger.info("Test 3: Real-time streaming validation") result = await self._test_realtime_streaming() test_results.append(result) # Test 4: Concurrent Streams logger.info("Test 4: Concurrent streams") result = await self._test_concurrent_streams() test_results.append(result) # Test 5: Stream Interruption logger.info("Test 5: Stream interruption handling") result = await self._test_stream_interruption() test_results.append(result) # Test 6: Backpressure logger.info("Test 6: Backpressure management") result = await self._test_backpressure() test_results.append(result) # Test 7: Binary Streaming logger.info("Test 7: Binary streaming") result = await self._test_binary_streaming() test_results.append(result) # Test 8: Error Propagation logger.info("Test 8: Error propagation in streams") result = await self._test_error_propagation() test_results.append(result) self.test_results = test_results return test_results async def _test_basic_streaming(self) -> Dict[str, Any]: """Test basic streaming output.""" test_name = "basic_streaming" try: # Create session for streaming session_result = await self.execute_mcp_operation( "create_session", { "name": "streaming-basic-test", "model": "claude-3-opus-20240229", "streaming": True } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create streaming session" } session_id = session_result.get("result", {}).get("session_id") # Execute streaming command stream_chunks = [] chunk_times = [] async for chunk in self._stream_command( session_id, "for i in {1..10}; do echo \"Line $i\"; sleep 0.1; done" ): stream_chunks.append(chunk) chunk_times.append(time.time()) # Validate streaming characteristics chunks_received = len(stream_chunks) all_lines_received = all( f"Line {i}" in ''.join(c.get("content", "") for c in stream_chunks) for i in range(1, 11) ) # Check if chunks arrived over time (not all at once) streaming_validated = False if len(chunk_times) > 1: time_spread = chunk_times[-1] - chunk_times[0] streaming_validated = time_spread > 0.5 # Should take at least 0.5s # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "chunks_received": chunks_received, "all_lines_received": all_lines_received, "streaming_validated": streaming_validated, "passed": all([ session_result["success"], chunks_received > 5, all_lines_received, streaming_validated ]), "details": { "chunk_count": chunks_received, "time_spread": chunk_times[-1] - chunk_times[0] if chunk_times else 0 } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_large_streaming(self) -> Dict[str, Any]: """Test streaming large amounts of data.""" test_name = "large_streaming" try: # Create session session_result = await self.execute_mcp_operation( "create_session", { "name": "streaming-large-test", "model": "claude-3-opus-20240229", "streaming": True, "buffer_size": 1048576 # 1MB buffer } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Generate large output total_bytes = 0 chunk_count = 0 start_time = time.time() # Stream 10MB of data async for chunk in self._stream_command( session_id, "dd if=/dev/zero bs=1024 count=10240 2>/dev/null | base64" ): chunk_count += 1 content = chunk.get("content", "") total_bytes += len(content) # Validate chunk structure if not isinstance(chunk, dict) or "type" not in chunk: logger.error(f"Invalid chunk structure: {chunk}") elapsed_time = time.time() - start_time throughput_mbps = (total_bytes / (1024 * 1024)) / elapsed_time if elapsed_time > 0 else 0 # Validate results size_correct = total_bytes > 10 * 1024 * 1024 # Should be > 10MB due to base64 chunking_efficient = chunk_count < total_bytes / 1024 # Chunks should be reasonably sized throughput_acceptable = throughput_mbps > 1.0 # At least 1MB/s # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "total_bytes": total_bytes, "chunk_count": chunk_count, "size_correct": size_correct, "chunking_efficient": chunking_efficient, "throughput_acceptable": throughput_acceptable, "passed": all([ session_result["success"], size_correct, chunking_efficient, throughput_acceptable ]), "details": { "total_mb": total_bytes / (1024 * 1024), "throughput_mbps": throughput_mbps, "avg_chunk_size": total_bytes / chunk_count if chunk_count > 0 else 0 } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_realtime_streaming(self) -> Dict[str, Any]: """Test real-time streaming validation.""" test_name = "realtime_streaming" try: # Create session session_result = await self.execute_mcp_operation( "create_session", { "name": "streaming-realtime-test", "model": "claude-3-opus-20240229", "streaming": True } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Test real-time event streaming events = [] event_times = {} # Monitor system events in real-time monitor_script = """ for i in {1..5}; do echo "EVENT:$(date +%s.%N):Event_$i" sleep 0.2 done """ async for chunk in self._stream_command(session_id, monitor_script): content = chunk.get("content", "") if "EVENT:" in content: for line in content.split('\n'): if line.startswith("EVENT:"): parts = line.split(':') if len(parts) >= 3: timestamp = float(parts[1]) event_name = parts[2] events.append(event_name) event_times[event_name] = { "emitted": timestamp, "received": time.time() } # Validate real-time characteristics all_events_received = len(events) == 5 # Check latency latencies = [] for event, times in event_times.items(): latency = times["received"] - times["emitted"] latencies.append(latency) avg_latency = sum(latencies) / len(latencies) if latencies else float('inf') low_latency = avg_latency < 0.5 # Should be under 500ms # Check ordering events_ordered = events == [f"Event_{i}" for i in range(1, 6)] # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "all_events_received": all_events_received, "low_latency": low_latency, "events_ordered": events_ordered, "passed": all([ session_result["success"], all_events_received, low_latency, events_ordered ]), "details": { "events_count": len(events), "avg_latency_ms": avg_latency * 1000, "events": events } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_concurrent_streams(self) -> Dict[str, Any]: """Test concurrent streaming sessions.""" test_name = "concurrent_streams" num_streams = 3 try: # Create multiple streaming sessions sessions = [] for i in range(num_streams): session_result = await self.execute_mcp_operation( "create_session", { "name": f"concurrent-stream-{i}", "model": "claude-3-opus-20240229", "streaming": True } ) if session_result["success"]: sessions.append(session_result.get("result", {}).get("session_id")) if len(sessions) != num_streams: return { "test": test_name, "passed": False, "error": f"Failed to create {num_streams} sessions" } # Stream from all sessions concurrently async def stream_worker(session_id: str, stream_id: int) -> Dict[str, Any]: chunks = [] start_time = time.time() async for chunk in self._stream_command( session_id, f"for i in {{1..5}}; do echo 'Stream {stream_id} message '$i; sleep 0.1; done" ): chunks.append(chunk) return { "stream_id": stream_id, "chunks": len(chunks), "duration": time.time() - start_time, "complete": f"Stream {stream_id} message 5" in ''.join( c.get("content", "") for c in chunks ) } # Run concurrent streams tasks = [ stream_worker(session_id, i) for i, session_id in enumerate(sessions) ] results = await asyncio.gather(*tasks) # Validate results all_complete = all(r["complete"] for r in results) streams_independent = len(set(r["chunks"] for r in results)) > 1 # Different chunk counts # Check for interference no_interference = all( f"Stream {r['stream_id']}" in ''.join( c.get("content", "") for c in self.active_streams.get(r['stream_id'], []) ) if r['stream_id'] in self.active_streams else True for r in results ) # Terminate all sessions for session_id in sessions: await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "sessions_created": len(sessions), "all_streams_complete": all_complete, "streams_independent": streams_independent, "no_interference": no_interference, "passed": all([ len(sessions) == num_streams, all_complete, streams_independent ]), "details": { "stream_results": [ { "stream_id": r["stream_id"], "chunks": r["chunks"], "duration": r["duration"] } for r in results ] } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_stream_interruption(self) -> Dict[str, Any]: """Test stream interruption handling.""" test_name = "stream_interruption" try: # Create session session_result = await self.execute_mcp_operation( "create_session", { "name": "stream-interrupt-test", "model": "claude-3-opus-20240229", "streaming": True } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Start long-running stream chunks_before_interrupt = [] interrupt_handled = False try: async for chunk in self._stream_command( session_id, "for i in {1..100}; do echo 'Message '$i; sleep 0.05; done" ): chunks_before_interrupt.append(chunk) # Interrupt after 10 chunks if len(chunks_before_interrupt) >= 10: # Cancel the stream cancel_result = await self.execute_mcp_operation( "cancel_stream", {"session_id": session_id} ) interrupt_handled = cancel_result.get("success", False) break except asyncio.CancelledError: interrupt_handled = True # Verify session still usable after interruption recovery_result = await self.execute_mcp_operation( "execute_prompt", { "session_id": session_id, "prompt": "echo 'Recovered after interrupt'", "streaming": False } ) session_recovered = ( recovery_result["success"] and "Recovered after interrupt" in str(recovery_result.get("result", {}).get("response", "")) ) # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "chunks_before_interrupt": len(chunks_before_interrupt), "interrupt_handled": interrupt_handled, "session_recovered": session_recovered, "passed": all([ session_result["success"], len(chunks_before_interrupt) >= 10, interrupt_handled, session_recovered ]), "details": { "interrupted_at": len(chunks_before_interrupt), "recovery_successful": session_recovered } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_backpressure(self) -> Dict[str, Any]: """Test backpressure management in streaming.""" test_name = "backpressure" try: # Create session with small buffer session_result = await self.execute_mcp_operation( "create_session", { "name": "backpressure-test", "model": "claude-3-opus-20240229", "streaming": True, "buffer_size": 4096 # Small buffer to trigger backpressure } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Generate fast output to trigger backpressure chunk_delays = [] last_chunk_time = time.time() backpressure_events = 0 async for chunk in self._stream_command( session_id, "for i in {1..1000}; do echo 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'; done" ): current_time = time.time() delay = current_time - last_chunk_time chunk_delays.append(delay) # Detect backpressure (significant delays) if delay > 0.1: # More than 100ms delay backpressure_events += 1 last_chunk_time = current_time # Simulate slow consumer if len(chunk_delays) % 10 == 0: await asyncio.sleep(0.05) # Validate backpressure handling backpressure_detected = backpressure_events > 0 avg_delay = sum(chunk_delays) / len(chunk_delays) if chunk_delays else 0 no_data_loss = len(chunk_delays) > 100 # Should receive many chunks # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "backpressure_detected": backpressure_detected, "no_data_loss": no_data_loss, "flow_controlled": avg_delay < 1.0, # Average delay should be reasonable "passed": all([ session_result["success"], no_data_loss, avg_delay < 1.0 ]), "details": { "total_chunks": len(chunk_delays), "backpressure_events": backpressure_events, "avg_delay_ms": avg_delay * 1000 } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_binary_streaming(self) -> Dict[str, Any]: """Test binary data streaming.""" test_name = "binary_streaming" try: # Create session session_result = await self.execute_mcp_operation( "create_session", { "name": "binary-stream-test", "model": "claude-3-opus-20240229", "streaming": True } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Generate and stream binary data binary_file = self.streaming_dir / "test-binary.dat" # Create binary test file create_result = await self.execute_mcp_operation( "execute_prompt", { "session_id": session_id, "prompt": f"dd if=/dev/urandom of={binary_file} bs=1024 count=100 2>/dev/null", "streaming": False } ) # Stream binary file with base64 encoding chunks = [] base64_valid = True async for chunk in self._stream_command( session_id, f"base64 {binary_file}" ): chunks.append(chunk) content = chunk.get("content", "") # Validate base64 encoding if content: try: import base64 # Try to decode a sample sample = content[:100].strip() if sample: base64.b64decode(sample) except: base64_valid = False # Verify complete transmission total_content = ''.join(c.get("content", "") for c in chunks) # Decode and verify size size_correct = False try: import base64 decoded = base64.b64decode(total_content.strip()) size_correct = len(decoded) == 100 * 1024 # 100KB except: pass # Clean up if binary_file.exists(): binary_file.unlink() # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "binary_created": create_result["success"], "chunks_received": len(chunks), "base64_valid": base64_valid, "size_correct": size_correct, "passed": all([ session_result["success"], create_result["success"], len(chunks) > 0, base64_valid, size_correct ]), "details": { "chunk_count": len(chunks), "total_base64_size": len(total_content) } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_error_propagation(self) -> Dict[str, Any]: """Test error propagation in streams.""" test_name = "error_propagation" try: # Create session session_result = await self.execute_mcp_operation( "create_session", { "name": "error-stream-test", "model": "claude-3-opus-20240229", "streaming": True } ) if not session_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create session" } session_id = session_result.get("result", {}).get("session_id") # Test 1: Command that fails mid-stream error_detected = False error_message = "" chunks_before_error = 0 async for chunk in self._stream_command( session_id, "echo 'Starting...'; sleep 1; echo 'Working...'; false; echo 'Should not see this'" ): chunks_before_error += 1 if chunk.get("type") == "error": error_detected = True error_message = chunk.get("error", "") break # Check for error in content if "error" in str(chunk.get("content", "")).lower(): error_detected = True # Test 2: Invalid command invalid_error = False async for chunk in self._stream_command( session_id, "this_command_does_not_exist --invalid-flag" ): if chunk.get("type") == "error" or "not found" in str(chunk.get("content", "")): invalid_error = True # Test 3: Permission denied permission_error = False async for chunk in self._stream_command( session_id, "cat /etc/shadow 2>&1" ): content = str(chunk.get("content", "")).lower() if "permission" in content or "denied" in content: permission_error = True # Verify session still functional recovery_result = await self.execute_mcp_operation( "execute_prompt", { "session_id": session_id, "prompt": "echo 'Still working'", "streaming": False } ) session_functional = ( recovery_result["success"] and "Still working" in str(recovery_result.get("result", {}).get("response", "")) ) # Terminate session await self.execute_mcp_operation( "terminate_session", {"session_id": session_id} ) return { "test": test_name, "session_created": session_result["success"], "error_detected": error_detected, "invalid_command_error": invalid_error, "permission_error": permission_error, "session_functional": session_functional, "passed": all([ session_result["success"], error_detected or chunks_before_error > 0, invalid_error, permission_error, session_functional ]), "details": { "chunks_before_error": chunks_before_error, "error_types_detected": { "command_failure": error_detected, "invalid_command": invalid_error, "permission_denied": permission_error } } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _stream_command(self, session_id: str, command: str) -> AsyncIterator[Dict[str, Any]]: """Stream command execution results.""" stream_id = f"{session_id}-{time.time()}" self.active_streams[stream_id] = [] try: # Start streaming execution result = await self.execute_mcp_operation( "execute_streaming_prompt", { "session_id": session_id, "prompt": command, "stream_id": stream_id } ) if not result["success"]: yield {"type": "error", "error": "Failed to start stream"} return # Stream chunks while True: chunk_result = await self.execute_mcp_operation( "get_stream_chunk", { "session_id": session_id, "stream_id": stream_id } ) if not chunk_result["success"]: break chunk = chunk_result.get("result", {}) if chunk.get("type") == "end": break self.active_streams[stream_id].append(chunk) yield chunk # Small delay to prevent tight loop await asyncio.sleep(0.01) finally: # Clean up stream if stream_id in self.active_streams: del self.active_streams[stream_id] async def validate_system_state(self) -> bool: """Validate system state after streaming tests.""" try: # Check for active streams if self.active_streams: logger.warning(f"Found {len(self.active_streams)} active streams") return False # Check streaming directory if self.streaming_dir.exists(): files = list(self.streaming_dir.glob("*")) if len(files) > 10: logger.warning(f"Too many files in streaming directory: {len(files)}") return False # Check for runaway processes import psutil streaming_processes = 0 for proc in psutil.process_iter(['name', 'cmdline']): try: cmdline = ' '.join(proc.info.get('cmdline', [])) if 'stream' in cmdline and 'shannon' in cmdline: streaming_processes += 1 except: continue if streaming_processes > 2: logger.warning(f"Found {streaming_processes} streaming processes") return False return True except Exception as e: logger.error(f"System state validation failed: {e}") return False async def cleanup(self): """Clean up streaming test artifacts.""" logger.info("Cleaning up streaming test artifacts") # Cancel any active streams for stream_id in list(self.active_streams.keys()): try: session_id = stream_id.split('-')[0] await self.execute_mcp_operation( "cancel_stream", {"session_id": session_id, "stream_id": stream_id} ) except: pass self.active_streams.clear() # Clean streaming directory if self.streaming_dir.exists(): try: import shutil shutil.rmtree(self.streaming_dir) except Exception as e: logger.warning(f"Failed to clean streaming directory: {e}") async def main(): """Run the streaming validator agent.""" agent = StreamingValidatorAgent() result = await agent.run() # Print summary print("\n" + "="*60) print("Streaming Validator Agent Results") print("="*60) print(json.dumps(result, indent=2)) # Exit with appropriate code sys.exit(0 if result.get("summary", {}).get("status") == "PASSED" else 1) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server