Skip to main content
Glama

propublica-mcp

METADATA•10.3 kB
Metadata-Version: 2.4 Name: sse-starlette Version: 2.4.1 Summary: SSE plugin for Starlette Author-email: sysid <sysid@gmx.de> License-Expression: BSD-3-Clause Project-URL: Source, https://github.com/sysid/sse-starlette Classifier: Development Status :: 5 - Production/Stable Classifier: Environment :: Web Environment Classifier: Intended Audience :: Developers Classifier: Operating System :: OS Independent Classifier: Programming Language :: Python :: 3 Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 Classifier: Programming Language :: Python :: 3.12 Classifier: Topic :: Internet :: WWW/HTTP Requires-Python: >=3.9 Description-Content-Type: text/markdown License-File: LICENSE License-File: AUTHORS Requires-Dist: anyio>=4.7.0 Provides-Extra: examples Requires-Dist: uvicorn>=0.34.0; extra == "examples" Requires-Dist: fastapi>=0.115.12; extra == "examples" Requires-Dist: sqlalchemy[asyncio,examples]>=2.0.41; extra == "examples" Requires-Dist: starlette>=0.41.3; extra == "examples" Requires-Dist: aiosqlite>=0.21.0; extra == "examples" Provides-Extra: uvicorn Requires-Dist: uvicorn>=0.34.0; extra == "uvicorn" Provides-Extra: granian Requires-Dist: granian>=2.3.1; extra == "granian" Provides-Extra: daphne Requires-Dist: daphne>=4.2.0; extra == "daphne" Dynamic: license-file # Server-Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/) [![Downloads](https://static.pepy.tech/badge/sse-starlette/week)](https://pepy.tech/project/sse-starlette) [![PyPI Version][pypi-image]][pypi-url] [![Build Status][build-image]][build-url] [![Code Coverage][coverage-image]][coverage-url] > Background: https://sysid.github.io/server-sent-events/ Production ready Server-Sent Events implementation for Starlette and FastAPI following the [W3C SSE specification](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events). ## Installation ```bash pip install sse-starlette uv add sse-starlette # To run the examples and demonstrations uv add sse-starlette[examples] # Recommended ASGI server uv add sse-starlette[uvicorn,granian,daphne] ``` ## Quick Start ```python import asyncio from starlette.applications import Starlette from starlette.routing import Route from sse_starlette import EventSourceResponse async def generate_events(): for i in range(10): yield {"data": f"Event {i}"} await asyncio.sleep(1) async def sse_endpoint(request): return EventSourceResponse(generate_events()) app = Starlette(routes=[Route("/events", sse_endpoint)]) ``` ## Core Features - **Standards Compliant**: Full SSE specification implementation - **Framework Integration**: Native Starlette and FastAPI support - **Async/Await**: Built on modern Python async patterns - **Connection Management**: Automatic client disconnect detection - **Graceful Shutdown**: Proper cleanup on server termination ## Key Components ### EventSourceResponse The main response class that handles SSE streaming: ```python from sse_starlette import EventSourceResponse # Basic usage async def stream_data(): for item in data: yield {"data": item, "event": "update", "id": str(item.id)} return EventSourceResponse(stream_data()) ``` ### ServerSentEvent For structured event creation: ```python from sse_starlette import ServerSentEvent event = ServerSentEvent( data="Custom message", event="notification", id="msg-123", retry=5000 ) ``` ### JSONServerSentEvent For an easy way to send json data as SSE events: ```python from sse_starlette import JSONServerSentEvent event = JSONServerSentEvent( data={"field":"value"}, # Anything serializable with json.dumps ) ``` ## Advanced Usage ### Custom Ping Configuration ```python from sse_starlette import ServerSentEvent def custom_ping(): return ServerSentEvent(comment="Custom ping message") return EventSourceResponse( generate_events(), ping=10, # Ping every 10 seconds ping_message_factory=custom_ping ) ``` ### Database Streaming (Thread-Safe) ```python async def stream_database_results(request): # CORRECT: Create session within generator context async with AsyncSession() as session: results = await session.execute(select(User)) for row in results: if await request.is_disconnected(): break yield {"data": row.name, "id": str(row.id)} return EventSourceResponse(stream_database_results(request)) ``` ### Error Handling and Timeouts ```python async def robust_stream(request): try: for i in range(100): if await request.is_disconnected(): break yield {"data": f"Item {i}"} await asyncio.sleep(0.5) except asyncio.CancelledError: # Client disconnected - perform cleanup raise return EventSourceResponse( robust_stream(request), send_timeout=30, # Timeout hanging sends headers={"Cache-Control": "no-cache"} ) ``` ### Memory Channels Alternative For complex data flows, use memory channels instead of generators: ```python import anyio from functools import partial async def data_producer(send_channel): async with send_channel: for i in range(10): await send_channel.send({"data": f"Item {i}"}) await anyio.sleep(1) async def channel_endpoint(request): send_channel, receive_channel = anyio.create_memory_object_stream(10) return EventSourceResponse( receive_channel, data_sender_callable=partial(data_producer, send_channel) ) ``` ## Configuration Options ### EventSourceResponse Parameters | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `content` | `ContentStream` | Required | Async generator or iterable | | `ping` | `int` | 15 | Ping interval in seconds | | `sep` | `str` | `"\r\n"` | Line separator (`\r\n`, `\r`, `\n`) | | `send_timeout` | `float` | `None` | Send operation timeout | | `headers` | `dict` | `None` | Additional HTTP headers | | `ping_message_factory` | `Callable` | `None` | Custom ping message creator | ### Client Disconnection ```python async def monitored_stream(request): events_sent = 0 try: while events_sent < 100: if await request.is_disconnected(): print(f"Client disconnected after {events_sent} events") break yield {"data": f"Event {events_sent}"} events_sent += 1 await asyncio.sleep(1) except asyncio.CancelledError: print("Stream cancelled") raise ``` ## Testing When testing SSE endpoints with pytest, reset the global event state: ```python import pytest from sse_starlette.sse import AppStatus @pytest.fixture def reset_sse_app_status(): AppStatus.should_exit_event = None yield AppStatus.should_exit_event = None ``` ## Production Considerations ### Performance Limits - **Memory**: Each connection maintains a buffer. Monitor memory usage. - **Connections**: Limited by system file descriptors and application design. - **Network**: High-frequency events can saturate bandwidth. ### Error Recovery Implement client-side reconnection logic: ```javascript function createEventSource(url) { const eventSource = new EventSource(url); eventSource.onerror = function() { setTimeout(() => { createEventSource(url); // Reconnect after delay }, 5000); }; return eventSource; } ``` ## Learning Resources ### Examples Directory The `examples/` directory contains production-ready patterns: - **`01_basic_sse.py`**: Fundamental SSE concepts - **`02_message_broadcasting.py`**: Multi-client message distribution - **`03_database_streaming.py`**: Thread-safe database integration - **`04_advanced_features.py`**: Custom protocols and error handling ### Demonstrations Directory The `examples/demonstrations/` directory provides educational scenarios: **Basic Patterns** (`basic_patterns/`): - Client disconnect detection and cleanup - Graceful server shutdown behavior **Production Scenarios** (`production_scenarios/`): - Load testing with concurrent clients - Network interruption handling **Advanced Patterns** (`advanced_patterns/`): - Memory channels vs generators - Error recovery and circuit breakers - Custom protocol development Run any demonstration: ```bash python examples/demonstrations/basic_patterns/client_disconnect.py python examples/demonstrations/production_scenarios/load_simulations.py python examples/demonstrations/advanced_patterns/error_recovery.py ``` ## Troubleshooting ### Common Issues **"RuntimeError: Event object bound to different event loop"** - Reset `AppStatus.should_exit_event = None` between tests **Database session errors with async generators** - Create database sessions inside generators, not as dependencies **Hanging connections after client disconnect** - Always check `await request.is_disconnected()` in loops - Use `send_timeout` parameter to detect dead connections If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826 ### Performance Optimization ```python # Connection limits class ConnectionLimiter: def __init__(self, max_connections=100): self.semaphore = asyncio.Semaphore(max_connections) async def limited_endpoint(self, request): async with self.semaphore: return EventSourceResponse(generate_events()) ``` ## Contributing See examples and demonstrations for implementation patterns. Run tests with: ```bash make test-unit # Unit tests only make test # All tests including integration ``` <!-- Badges --> [pypi-image]: https://badge.fury.io/py/sse-starlette.svg [pypi-url]: https://pypi.org/project/sse-starlette/ [build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg [build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml [coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg [coverage-url]: https://codecov.io/gh/sysid/sse-starlette

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/asachs01/propublica-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server