METADATAā¢10.3 kB
Metadata-Version: 2.4
Name: sse-starlette
Version: 2.4.1
Summary: SSE plugin for Starlette
Author-email: sysid <sysid@gmx.de>
License-Expression: BSD-3-Clause
Project-URL: Source, https://github.com/sysid/sse-starlette
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Internet :: WWW/HTTP
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
License-File: AUTHORS
Requires-Dist: anyio>=4.7.0
Provides-Extra: examples
Requires-Dist: uvicorn>=0.34.0; extra == "examples"
Requires-Dist: fastapi>=0.115.12; extra == "examples"
Requires-Dist: sqlalchemy[asyncio,examples]>=2.0.41; extra == "examples"
Requires-Dist: starlette>=0.41.3; extra == "examples"
Requires-Dist: aiosqlite>=0.21.0; extra == "examples"
Provides-Extra: uvicorn
Requires-Dist: uvicorn>=0.34.0; extra == "uvicorn"
Provides-Extra: granian
Requires-Dist: granian>=2.3.1; extra == "granian"
Provides-Extra: daphne
Requires-Dist: daphne>=4.2.0; extra == "daphne"
Dynamic: license-file
# Server-Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/)
[](https://pepy.tech/project/sse-starlette)
[![PyPI Version][pypi-image]][pypi-url]
[![Build Status][build-image]][build-url]
[![Code Coverage][coverage-image]][coverage-url]
> Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the [W3C SSE specification](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
## Installation
```bash
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
```
## Quick Start
```python
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
for i in range(10):
yield {"data": f"Event {i}"}
await asyncio.sleep(1)
async def sse_endpoint(request):
return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
```
## Core Features
- **Standards Compliant**: Full SSE specification implementation
- **Framework Integration**: Native Starlette and FastAPI support
- **Async/Await**: Built on modern Python async patterns
- **Connection Management**: Automatic client disconnect detection
- **Graceful Shutdown**: Proper cleanup on server termination
## Key Components
### EventSourceResponse
The main response class that handles SSE streaming:
```python
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
for item in data:
yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
```
### ServerSentEvent
For structured event creation:
```python
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
data="Custom message",
event="notification",
id="msg-123",
retry=5000
)
```
### JSONServerSentEvent
For an easy way to send json data as SSE events:
```python
from sse_starlette import JSONServerSentEvent
event = JSONServerSentEvent(
data={"field":"value"}, # Anything serializable with json.dumps
)
```
## Advanced Usage
### Custom Ping Configuration
```python
from sse_starlette import ServerSentEvent
def custom_ping():
return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
generate_events(),
ping=10, # Ping every 10 seconds
ping_message_factory=custom_ping
)
```
### Database Streaming (Thread-Safe)
```python
async def stream_database_results(request):
# CORRECT: Create session within generator context
async with AsyncSession() as session:
results = await session.execute(select(User))
for row in results:
if await request.is_disconnected():
break
yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
```
### Error Handling and Timeouts
```python
async def robust_stream(request):
try:
for i in range(100):
if await request.is_disconnected():
break
yield {"data": f"Item {i}"}
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - perform cleanup
raise
return EventSourceResponse(
robust_stream(request),
send_timeout=30, # Timeout hanging sends
headers={"Cache-Control": "no-cache"}
)
```
### Memory Channels Alternative
For complex data flows, use memory channels instead of generators:
```python
import anyio
from functools import partial
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def channel_endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel)
)
```
## Configuration Options
### EventSourceResponse Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `content` | `ContentStream` | Required | Async generator or iterable |
| `ping` | `int` | 15 | Ping interval in seconds |
| `sep` | `str` | `"\r\n"` | Line separator (`\r\n`, `\r`, `\n`) |
| `send_timeout` | `float` | `None` | Send operation timeout |
| `headers` | `dict` | `None` | Additional HTTP headers |
| `ping_message_factory` | `Callable` | `None` | Custom ping message creator |
### Client Disconnection
```python
async def monitored_stream(request):
events_sent = 0
try:
while events_sent < 100:
if await request.is_disconnected():
print(f"Client disconnected after {events_sent} events")
break
yield {"data": f"Event {events_sent}"}
events_sent += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
```
## Testing
When testing SSE endpoints with pytest, reset the global event state:
```python
import pytest
from sse_starlette.sse import AppStatus
@pytest.fixture
def reset_sse_app_status():
AppStatus.should_exit_event = None
yield
AppStatus.should_exit_event = None
```
## Production Considerations
### Performance Limits
- **Memory**: Each connection maintains a buffer. Monitor memory usage.
- **Connections**: Limited by system file descriptors and application design.
- **Network**: High-frequency events can saturate bandwidth.
### Error Recovery
Implement client-side reconnection logic:
```javascript
function createEventSource(url) {
const eventSource = new EventSource(url);
eventSource.onerror = function() {
setTimeout(() => {
createEventSource(url); // Reconnect after delay
}, 5000);
};
return eventSource;
}
```
## Learning Resources
### Examples Directory
The `examples/` directory contains production-ready patterns:
- **`01_basic_sse.py`**: Fundamental SSE concepts
- **`02_message_broadcasting.py`**: Multi-client message distribution
- **`03_database_streaming.py`**: Thread-safe database integration
- **`04_advanced_features.py`**: Custom protocols and error handling
### Demonstrations Directory
The `examples/demonstrations/` directory provides educational scenarios:
**Basic Patterns** (`basic_patterns/`):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
**Production Scenarios** (`production_scenarios/`):
- Load testing with concurrent clients
- Network interruption handling
**Advanced Patterns** (`advanced_patterns/`):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
```bash
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
```
## Troubleshooting
### Common Issues
**"RuntimeError: Event object bound to different event loop"**
- Reset `AppStatus.should_exit_event = None` between tests
**Database session errors with async generators**
- Create database sessions inside generators, not as dependencies
**Hanging connections after client disconnect**
- Always check `await request.is_disconnected()` in loops
- Use `send_timeout` parameter to detect dead connections
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
### Performance Optimization
```python
# Connection limits
class ConnectionLimiter:
def __init__(self, max_connections=100):
self.semaphore = asyncio.Semaphore(max_connections)
async def limited_endpoint(self, request):
async with self.semaphore:
return EventSourceResponse(generate_events())
```
## Contributing
See examples and demonstrations for implementation patterns. Run tests with:
```bash
make test-unit # Unit tests only
make test # All tests including integration
```
<!-- Badges -->
[pypi-image]: https://badge.fury.io/py/sse-starlette.svg
[pypi-url]: https://pypi.org/project/sse-starlette/
[build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg
[build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml
[coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg
[coverage-url]: https://codecov.io/gh/sysid/sse-starlette