# Real-Time WebSocket Server
WebSocket server for real-time skill metrics broadcasting in Session-Buddy.
## Overview
The `RealTimeMetricsServer` provides live skill monitoring capabilities via WebSocket connections. It broadcasts metrics updates to connected clients every second, enabling real-time dashboards and monitoring tools.
## Features
- **Real-time Broadcasting**: Sends metrics updates every 1 second (configurable)
- **Client Subscriptions**: Support for all-skills or per-skill monitoring
- **Anomaly Detection**: Broadcasts recent performance anomalies
- **Graceful Reconnection**: Handles client disconnections automatically
- **Async/Await**: Built on `asyncio` for high-performance concurrency
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ RealTimeMetricsServer │
│ │
│ ┌──────────────┐ ┌───────────────────────────────────┐ │
│ │ Clients │ │ Background Broadcaster │ │
│ │ │ │ │ │
│ │ ┌────────┐ │ │ • Fetch top 10 skills/sec │ │
│ │ │Client 1│──│───│> • Detect anomalies │ │
│ │ └────────┘ │ │ • Build JSON payload │ │
│ │ ┌────────┐ │ │ • Broadcast to all clients │ │
│ │ │Client 2│──│───│> │ │
│ │ └────────┘ │ └───────────────────────────────────┘ │
│ │ ┌────────┐ │ │ │
│ │ │Client 3│──│───┐ │ │
│ │ └────────┘ │ │ │ │
│ └──────────────┘ │ ▼ │
│ │ ┌─────────────────────┐ │
│ └──│ SkillsStorage │ │
│ │ • get_top_skills() │ │
│ │ • _detect_anomalies│ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Installation
The WebSocket server is included in the `session-buddy` package. Ensure dependencies are installed:
```bash
pip install websockets>=15.0
```
## Quick Start
### Start the Server
```python
import asyncio
from pathlib import Path
from session_buddy.realtime.websocket_server import RealTimeMetricsServer
async def main():
server = RealTimeMetricsServer(
host="localhost",
port=8765,
db_path=Path(".session-buddy/skills.db"),
update_interval=1.0
)
await server.start()
# Keep server running
try:
await asyncio.Future()
except KeyboardInterrupt:
await server.stop()
asyncio.run(main())
```
Or use the standalone runner:
```bash
python examples/run_websocket_server.py
```
### Connect a Client
```python
import asyncio
import json
import websockets
async def client():
uri = "ws://localhost:8765"
async with websockets.asyncio.client.connect(uri) as websocket:
# Receive welcome message
message = await websocket.recv()
print(json.loads(message))
# Subscribe to updates
await websocket.send(json.dumps({"type": "subscribe"}))
# Receive metrics
while True:
message = await websocket.recv()
data = json.loads(message)
if data["type"] == "metrics_update":
print(f"Update: {data['data']['top_skills']}")
asyncio.run(client())
```
## Message Protocol
### Server → Client Messages
#### Welcome Message
```json
{
"type": "connected",
"message": "Connected to Session-Buddy real-time metrics",
"timestamp": "2026-02-10T12:00:00Z"
}
```
#### Metrics Update
```json
{
"type": "metrics_update",
"timestamp": "2026-02-10T12:00:00Z",
"data": {
"top_skills": [
{
"skill_name": "pytest-run",
"total_invocations": 150,
"completion_rate": 95.5,
"avg_duration_seconds": 5.2
}
],
"anomalies": [
{
"skill_name": "ruff-check",
"detected_at": "2026-02-10T11:55:00Z",
"anomaly_type": "drop",
"deviation_score": 2.5
}
]
}
}
```
#### Subscription Confirmation
```json
{
"type": "subscription_confirmed",
"skill": "pytest-run",
"timestamp": "2026-02-10T12:00:00Z"
}
```
#### Pong Response
```json
{
"type": "pong"
}
```
### Client → Server Messages
#### Subscribe to Skills
```json
{
"type": "subscribe",
"skill": "pytest-run"
}
```
Set `skill` to `null` or omit to subscribe to all skills.
#### Unsubscribe
```json
{
"type": "unsubscribe"
}
```
#### Ping
```json
{
"type": "ping"
}
```
## API Reference
### RealTimeMetricsServer
#### Constructor
```python
RealTimeMetricsServer(
host: str = "localhost",
port: int = 8765,
db_path: Path | None = None,
update_interval: float = 1.0
)
```
**Parameters:**
- `host`: Server host address
- `port`: Server port number
- `db_path`: Path to SQLite database (default: `.session-buddy/skills.db`)
- `update_interval`: Seconds between metric broadcasts
#### Methods
##### `async start()`
Start the WebSocket server and metrics broadcaster.
##### `async stop()`
Gracefully stop the server and close all connections.
##### `register_client(websocket)`
Register a new WebSocket client.
##### `unregister_client(websocket)`
Remove a WebSocket client.
##### `async broadcast_metrics()`
Broadcast skill metrics to all connected clients (runs in background).
##### `async handle_client_message(websocket, message)`
Handle incoming message from client.
##### `async handle_subscription(websocket, skill_name)`
Handle skill subscription request.
##### `async client_handler(websocket)`
Handle WebSocket client connection lifecycle.
## Configuration
### Update Interval
Adjust how frequently metrics are broadcast:
```python
server = RealTimeMetricsServer(
host="localhost",
port=8765,
update_interval=0.5 # 500ms
)
```
**Trade-offs:**
- Lower intervals = more responsive, higher CPU/network usage
- Higher intervals = less resource usage, less responsive
Recommended: 1.0 second for most use cases.
### Database Location
Specify custom database path:
```python
server = RealTimeMetricsServer(
db_path=Path("/custom/path/skills.db")
)
```
## Client Libraries
### Python
```python
import websockets
async with websockets.asyncio.client.connect("ws://localhost:8765") as ws:
# Handle messages
async for message in ws:
data = json.loads(message)
# Process data
```
### JavaScript
```javascript
const ws = new WebSocket("ws://localhost:8765");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Received:", data);
};
// Subscribe
ws.send(JSON.stringify({ type: "subscribe" }));
```
### Node.js
```javascript
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:8765');
ws.on('message', (data) => {
const message = JSON.parse(data);
console.log('Received:', message);
});
// Subscribe
ws.send(JSON.stringify({ type: 'subscribe', skill: null }));
```
## Performance Considerations
### Scalability
The server is designed for:
- **Development**: 10-50 concurrent connections
- **Production**: Up to 1000 concurrent connections with proper tuning
For higher scale, consider:
- Multiple server instances behind a load balancer
- Redis pub/sub for cross-server broadcasting
- Connection pooling and rate limiting
### Memory Usage
Approximate memory per client: ~1-2 KB
With 1000 clients: ~1-2 GB RAM
### CPU Usage
- Idle (no clients): ~0.1% CPU
- With 10 clients: ~0.5% CPU
- With 100 clients: ~2-3% CPU
## Testing
Run the test suite:
```bash
pytest tests/test_websocket_server.py -v
```
### Manual Testing
1. **Start Server:**
```bash
python examples/run_websocket_server.py
```
2. **Test Client:**
```bash
python examples/websocket_client_example.py
```
3. **Test Specific Skill:**
```bash
python examples/websocket_client_example.py pytest-run
```
4. **Test Ping/Pong:**
```bash
python examples/websocket_client_example.py --test-ping
```
## Troubleshooting
### Port Already in Use
```python
# Use different port
server = RealTimeMetricsServer(port=8766)
```
Or kill existing process:
```bash
lsof -ti:8765 | xargs kill -9
```
### Connection Refused
- Verify server is running
- Check firewall settings
- Ensure correct host/port
### No Metrics Received
- Verify database has data
- Check logs for errors
- Ensure `update_interval` is reasonable
### High Memory Usage
- Reduce `update_interval` for less frequent broadcasts
- Implement client timeouts
- Limit maximum connections
## Security Considerations
### Production Deployment
1. **Authentication:**
```python
# Add token-based auth
async def client_handler(self, websocket):
token = await websocket.recv()
if not self.validate_token(token):
await websocket.close()
return
```
2. **TLS/SSL:**
```python
import ssl
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain("server.crt", "server.key")
self._server = await websockets.asyncio.server.serve(
self.client_handler,
self.host,
self.port,
ssl=ssl_context
)
```
3. **Rate Limiting:**
- Limit connections per IP
- Throttle message frequency
- Implement backpressure
4. **Input Validation:**
- Validate all incoming JSON
- Sanitize skill names
- Limit message sizes
## Integration with Dashboards
### Grafana
Use the WebSocket data with a Grafana plugin:
```javascript
// Custom panel plugin
const ws = new WebSocket('ws://localhost:8765');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// Update panel
panel.update(data);
};
```
### Custom Dashboard
```python
import streamlit as st
import websockets
import json
st.title("Skill Metrics")
# WebSocket connection placeholder
if st.button("Connect"):
ws = websockets.asyncio.client.connect("ws://localhost:8765")
# Display metrics
```
## License
BSD-3-Clause
## See Also
- [Session-Buddy Documentation](../README.md)
- [V4 Schema](../storage/migrations/V4__phase4_extensions__up.sql)
- [SkillsStorage API](../storage/skills_storage.py)