Skip to main content
Glama
streaming.rst12 kB
Streaming API Reference ======================= This section documents the streaming system that handles real-time JSONL communication with Claude Code. Stream Buffer ------------- .. module:: shannon_mcp.streaming.buffer .. autoclass:: StreamBuffer :members: :undoc-members: :show-inheritance: Efficient line-based buffering with backpressure handling. **Key Features:** - Configurable buffer size limits - Automatic line splitting - Memory-efficient operation - Backpressure support **Example Usage:** .. code-block:: python from shannon_mcp.streaming.buffer import StreamBuffer buffer = StreamBuffer(max_size=1024 * 1024) # 1MB limit # Add data buffer.add_chunk(b"First line\\nSecond ") buffer.add_chunk(b"line\\nThird line") # Get complete lines while buffer.has_line(): line = buffer.get_line() print(f"Line: {line}") # Check buffer status if buffer.is_full(): print("Buffer full, apply backpressure") JSONL Parser ------------ .. module:: shannon_mcp.streaming.parser .. autoclass:: JSONLParser :members: :undoc-members: :show-inheritance: Parses JSONL streams with validation and error recovery. **Message Types:** .. list-table:: :header-rows: 1 * - Type - Description - Fields * - content - Text content from Claude - type, content, token_count * - notification - Status updates - type, message, level * - error - Error messages - type, error, details * - metrics - Performance data - type, metrics, timestamp * - completion - Stream end marker - type, reason, summary **Example Usage:** .. code-block:: python from shannon_mcp.streaming.parser import JSONLParser parser = JSONLParser(strict=True) # Parse single line try: message = parser.parse_line('{"type": "content", "content": "Hello"}') print(f"Parsed: {message}") except JSONLParseError as e: print(f"Parse error: {e}") # Validate message if parser.validate_message(message): print("Message is valid") # Parse with schema schema = { "type": "object", "required": ["type", "content"], "properties": { "type": {"enum": ["content", "error", "notification"]}, "content": {"type": "string"} } } message = parser.parse_line(line, schema=schema) Stream Handler -------------- .. module:: shannon_mcp.streaming.handler .. autoclass:: StreamHandler :members: :undoc-members: :show-inheritance: Routes parsed messages to appropriate handlers. **Handler Registration:** .. code-block:: python from shannon_mcp.streaming.handler import StreamHandler handler = StreamHandler() # Register type handlers @handler.register("content") async def handle_content(message): print(f"Content: {message['content']}") return {"processed": True} @handler.register("error") async def handle_error(message): logging.error(f"Stream error: {message['error']}") raise StreamError(message['error']) # Register catch-all @handler.register_default() async def handle_unknown(message): logging.warning(f"Unknown message: {message}") # Process message result = await handler.handle(message) **Built-in Handlers:** .. code-block:: python # Content accumulation handler.enable_content_accumulation() # Metrics collection handler.enable_metrics_collection() # Error recovery handler.enable_error_recovery(max_retries=3) Stream Processor ---------------- .. module:: shannon_mcp.streaming.processor .. autoclass:: StreamProcessor :members: :undoc-members: :show-inheritance: High-level stream processing with full pipeline. **Pipeline Stages:** 1. **Reading** - Async reading from subprocess 2. **Buffering** - Line-based buffering 3. **Parsing** - JSONL parsing and validation 4. **Handling** - Message type routing 5. **Output** - Result aggregation **Example Usage:** .. code-block:: python from shannon_mcp.streaming.processor import StreamProcessor async def process_claude_stream(): processor = StreamProcessor( buffer_size=1024 * 1024, parser_strict=True, enable_metrics=True ) # Process stream async with processor.process_stream(process.stdout) as stream: async for message in stream: if message.type == "content": print(message.content, end="", flush=True) elif message.type == "notification": logging.info(f"Status: {message.message}") # Get metrics metrics = processor.get_metrics() print(f"Total tokens: {metrics.total_tokens}") print(f"Messages: {metrics.message_count}") Backpressure Manager -------------------- .. module:: shannon_mcp.streaming.backpressure .. autoclass:: BackpressureManager :members: :undoc-members: :show-inheritance: Manages flow control for streaming operations. **Strategies:** - **Pause/Resume** - Temporarily pause reading - **Buffering** - Use memory buffers - **Dropping** - Drop old messages - **Throttling** - Slow down processing **Example Usage:** .. code-block:: python from shannon_mcp.streaming.backpressure import BackpressureManager backpressure = BackpressureManager( strategy="adaptive", high_watermark=1000, low_watermark=100 ) async def read_with_backpressure(): async for chunk in stream: # Check pressure if await backpressure.should_pause(): await backpressure.wait_for_resume() # Process chunk await process_chunk(chunk) # Update metrics await backpressure.record_processed(len(chunk)) Stream Metrics -------------- .. module:: shannon_mcp.streaming.metrics .. autoclass:: StreamMetrics :members: :undoc-members: :show-inheritance: Collects and analyzes streaming performance metrics. **Collected Metrics:** .. list-table:: :header-rows: 1 * - Metric - Description - Unit * - bytes_read - Total bytes read from stream - bytes * - lines_processed - Number of lines parsed - count * - messages_handled - Messages successfully handled - count * - parse_errors - Failed parse attempts - count * - processing_time - Total processing duration - seconds * - throughput - Data processing rate - bytes/sec * - latency_p50 - Median message latency - milliseconds * - latency_p99 - 99th percentile latency - milliseconds **Example Usage:** .. code-block:: python from shannon_mcp.streaming.metrics import StreamMetrics metrics = StreamMetrics() # Record events metrics.record_bytes(1024) metrics.record_message("content", processing_time=0.005) metrics.record_error("parse_error") # Get summary summary = metrics.get_summary() print(f"Throughput: {summary.throughput_mbps:.2f} MB/s") print(f"Success rate: {summary.success_rate:.1%}") print(f"P99 latency: {summary.latency_p99:.1f}ms") Complete Streaming Example -------------------------- Here's a complete example showing all streaming components working together: .. code-block:: python import asyncio from shannon_mcp.streaming import ( StreamProcessor, StreamHandler, BackpressureManager, StreamMetrics ) async def stream_claude_response(prompt: str): # Initialize components processor = StreamProcessor() handler = StreamHandler() backpressure = BackpressureManager() metrics = StreamMetrics() # Set up handlers content_buffer = [] @handler.register("content") async def handle_content(msg): content_buffer.append(msg["content"]) metrics.record_message("content") @handler.register("notification") async def handle_notification(msg): print(f"\\n[{msg['level']}] {msg['message']}\\n") metrics.record_message("notification") @handler.register("error") async def handle_error(msg): metrics.record_error("stream_error") raise Exception(f"Stream error: {msg['error']}") # Start streaming process = await start_claude_process(prompt) try: async with processor.process_stream( process.stdout, handler=handler, backpressure=backpressure ) as stream: async for message in stream: # Backpressure check if backpressure.is_pressured(): await asyncio.sleep(0.1) # Return complete response return { "content": "".join(content_buffer), "metrics": metrics.get_summary() } finally: process.terminate() await process.wait() Error Handling -------------- The streaming system provides comprehensive error handling: .. code-block:: python from shannon_mcp.streaming.errors import ( StreamError, ParseError, BackpressureError, StreamTimeoutError ) try: async for message in stream: process_message(message) except ParseError as e: # Invalid JSONL format logger.error(f"Parse error at line {e.line_number}: {e.content}") except BackpressureError as e: # Buffer overflow logger.error(f"Backpressure limit reached: {e.buffer_size}") except StreamTimeoutError as e: # No data received within timeout logger.error(f"Stream timeout after {e.timeout}s") except StreamError as e: # General streaming error logger.error(f"Stream error: {e}") Best Practices -------------- 1. **Buffer Management** .. code-block:: python # Use appropriate buffer sizes buffer = StreamBuffer( max_size=1024 * 1024, # 1MB for normal use max_lines=1000 # Prevent line accumulation ) 2. **Error Recovery** .. code-block:: python # Implement retry logic async def stream_with_retry(process, max_retries=3): for attempt in range(max_retries): try: return await stream_response(process) except StreamError as e: if attempt == max_retries - 1: raise await asyncio.sleep(2 ** attempt) 3. **Resource Cleanup** .. code-block:: python # Always use context managers async with StreamProcessor() as processor: async with processor.process_stream(stdout) as stream: # Processing here pass # Automatic cleanup 4. **Performance Monitoring** .. code-block:: python # Monitor streaming performance if metrics.throughput < threshold: logger.warning(f"Low throughput: {metrics.throughput}") if metrics.error_rate > 0.01: # 1% error rate logger.error(f"High error rate: {metrics.error_rate}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server