Skip to main content
Glama
test_streaming_integration.py9.88 kB
""" Functional tests for JSONL streaming with real Claude Code output. """ import pytest import asyncio import json import time from typing import List, Dict, Any from shannon_mcp.managers.session import SessionManager from shannon_mcp.managers.binary import BinaryManager from shannon_mcp.streaming.jsonl import JSONLStreamReader, JSONLParser from shannon_mcp.streaming.handler import StreamHandler class TestStreamingIntegration: """Test real JSONL streaming from Claude Code.""" @pytest.fixture async def streaming_session(self): """Create a session configured for streaming.""" binary_manager = BinaryManager() binaries = await binary_manager.discover_binaries() if not binaries: pytest.skip("No Claude Code binary found") session_manager = SessionManager(binary_manager=binary_manager) session = await session_manager.create_session( "test-streaming", options={"stream": True} ) await session_manager.start_session(session.id) yield session_manager, session await session_manager.close_session(session.id) @pytest.mark.asyncio async def test_jsonl_stream_parsing(self, streaming_session): """Test parsing real JSONL stream from Claude Code.""" session_manager, session = streaming_session parser = JSONLParser() # Collect all JSONL messages messages = [] async for chunk in session_manager.stream_prompt( session.id, "Count from 1 to 3, explaining each number." ): if isinstance(chunk, bytes): chunk = chunk.decode('utf-8') if isinstance(chunk, str): # Parse JSONL try: message = json.loads(chunk) messages.append(message) print(f"\nParsed message: {message.get('type', 'unknown')}") except json.JSONDecodeError: # Try line-by-line parsing for line in chunk.split('\n'): if line.strip(): try: message = json.loads(line) messages.append(message) except json.JSONDecodeError: pass # Verify message types message_types = [msg.get('type') for msg in messages] print(f"\nMessage types received: {message_types}") # Should have various message types assert len(messages) > 0 assert any('content' in msg for msg in messages) @pytest.mark.asyncio async def test_stream_handler_processing(self, streaming_session): """Test StreamHandler processing real Claude Code streams.""" session_manager, session = streaming_session handler = StreamHandler() # Track different message types content_chunks = [] metadata_messages = [] def handle_content(message): if message.get('type') == 'content': content_chunks.append(message.get('content', '')) else: metadata_messages.append(message) handler.add_handler('content', handle_content) handler.add_handler('metadata', lambda m: metadata_messages.append(m)) # Process stream async for chunk in session_manager.stream_prompt( session.id, "Write a haiku about coding." ): await handler.process_chunk(chunk) # Verify processing print(f"\nContent chunks: {len(content_chunks)}") print(f"Metadata messages: {len(metadata_messages)}") # Should have content full_content = "".join(content_chunks) print(f"Full content:\n{full_content}") assert len(full_content) > 0 assert len(content_chunks) > 0 @pytest.mark.asyncio async def test_backpressure_handling(self, streaming_session): """Test handling backpressure with slow consumers.""" session_manager, session = streaming_session # Simulate slow consumer chunks_received = [] processing_times = [] async def slow_consumer(chunk): start = time.time() chunks_received.append(chunk) await asyncio.sleep(0.1) # Simulate slow processing processing_times.append(time.time() - start) # Process stream with backpressure start_time = time.time() async for chunk in session_manager.stream_prompt( session.id, "List 10 programming languages with brief descriptions." ): await slow_consumer(chunk) total_time = time.time() - start_time print(f"\nBackpressure test:") print(f" Chunks received: {len(chunks_received)}") print(f" Total time: {total_time:.2f}s") print(f" Avg processing time: {sum(processing_times)/len(processing_times):.3f}s") # Should handle backpressure gracefully assert len(chunks_received) > 0 assert total_time > len(chunks_received) * 0.1 # Respected slow consumer @pytest.mark.asyncio async def test_stream_metrics_extraction(self, streaming_session): """Test extracting metrics from streaming responses.""" session_manager, session = streaming_session # Metrics collector metrics = { "tokens": 0, "chunks": 0, "content_length": 0, "latency_ms": [] } last_chunk_time = time.time() async for chunk in session_manager.stream_prompt( session.id, "Explain what an API is in exactly 3 sentences." ): current_time = time.time() metrics["latency_ms"].append((current_time - last_chunk_time) * 1000) last_chunk_time = current_time metrics["chunks"] += 1 if isinstance(chunk, dict): if 'content' in chunk: metrics["content_length"] += len(chunk['content']) if 'tokens' in chunk: metrics["tokens"] += chunk['tokens'] # Calculate statistics avg_latency = sum(metrics["latency_ms"]) / len(metrics["latency_ms"]) print(f"\nStream metrics:") print(f" Chunks: {metrics['chunks']}") print(f" Content length: {metrics['content_length']}") print(f" Tokens: {metrics['tokens']}") print(f" Avg chunk latency: {avg_latency:.2f}ms") assert metrics["chunks"] > 0 assert metrics["content_length"] > 0 @pytest.mark.asyncio async def test_stream_error_recovery(self, streaming_session): """Test error recovery in streaming.""" session_manager, session = streaming_session # Track successful chunks and errors successful_chunks = [] errors = [] # Use prompt that might cause issues async for chunk in session_manager.stream_prompt( session.id, "Parse this invalid JSON and explain the errors: {key: value, 'bad': }" ): try: if isinstance(chunk, dict): successful_chunks.append(chunk) elif isinstance(chunk, bytes): chunk_str = chunk.decode('utf-8') parsed = json.loads(chunk_str) successful_chunks.append(parsed) except Exception as e: errors.append(str(e)) # Continue processing despite errors print(f"\nError recovery test:") print(f" Successful chunks: {len(successful_chunks)}") print(f" Errors encountered: {len(errors)}") # Should still get some response despite potential parsing issues assert len(successful_chunks) > 0 or len(errors) > 0 @pytest.mark.asyncio async def test_concurrent_streams(self, streaming_session): """Test handling multiple concurrent streams.""" session_manager, session = streaming_session # Create additional sessions session2 = await session_manager.create_session("test-concurrent-2") await session_manager.start_session(session2.id) session3 = await session_manager.create_session("test-concurrent-3") await session_manager.start_session(session3.id) # Define prompts prompts = [ (session.id, "Count from 1 to 5"), (session2.id, "List 5 colors"), (session3.id, "Name 5 animals") ] # Collect results concurrently async def collect_stream(session_id, prompt): chunks = [] async for chunk in session_manager.stream_prompt(session_id, prompt): chunks.append(chunk) return session_id, chunks # Run streams concurrently tasks = [collect_stream(sid, prompt) for sid, prompt in prompts] results = await asyncio.gather(*tasks) print(f"\nConcurrent streaming results:") for session_id, chunks in results: print(f" Session {session_id}: {len(chunks)} chunks") # All streams should complete assert len(results) == 3 assert all(len(chunks) > 0 for _, chunks in results) # Cleanup await session_manager.close_session(session2.id) await session_manager.close_session(session3.id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server