Skip to main content
Glama
8b-is
by 8b-is
sse.rs6.67 kB
//! Server-Sent Events (SSE) input adapter //! //! Visualizes real-time event streams as living trees use super::*; use anyhow::Result; use async_trait::async_trait; use reqwest; use std::time::{Duration, SystemTime}; pub struct SseAdapter; #[async_trait] impl InputAdapter for SseAdapter { fn name(&self) -> &'static str { "SSE" } fn supported_formats(&self) -> Vec<&'static str> { vec!["sse", "events", "stream"] } async fn can_handle(&self, input: &InputSource) -> bool { match input { InputSource::Url(url) => { url.contains("/events") || url.contains("/stream") || url.contains("sse") } InputSource::Raw { format_hint, .. } => { format_hint.as_ref().map(|h| h == "sse").unwrap_or(false) } _ => false, } } async fn parse(&self, input: InputSource) -> Result<ContextNode> { match input { InputSource::Url(url) => self.parse_sse_stream(&url).await, InputSource::Raw { data, .. } => self.parse_sse_data(&data).await, _ => anyhow::bail!("SSE adapter requires URL or raw data"), } } } impl SseAdapter { async fn parse_sse_stream(&self, url: &str) -> Result<ContextNode> { let _client = reqwest::Client::new(); // Create root node for event stream let mut root = ContextNode { id: url.to_string(), name: "Event Stream".to_string(), node_type: NodeType::EventSource, quantum_state: Some(QuantumState { amplitude: 1.0, frequency: 1.0, // 1 Hz update rate phase: 0.0, collapse_probability: 0.0, // Never collapses, always streaming }), children: vec![], metadata: serde_json::json!({ "url": url, "status": "connecting", "event_count": 0 }), entanglements: vec![], }; // For demo, just show structure // In real implementation, would stream events root.children = vec![ ContextNode { id: format!("{}/types", url), name: "Event Types".to_string(), node_type: NodeType::Directory, quantum_state: None, children: vec![ self.create_event_type_node("user_action", 10.5), self.create_event_type_node("system_update", 2.0), self.create_event_type_node("error", 0.1), ], metadata: serde_json::json!({}), entanglements: vec![], }, ContextNode { id: format!("{}/timeline", url), name: "Timeline".to_string(), node_type: NodeType::Directory, quantum_state: None, children: self.create_timeline_nodes(), metadata: serde_json::json!({}), entanglements: vec![], }, ]; Ok(root) } async fn parse_sse_data(&self, data: &[u8]) -> Result<ContextNode> { let content = String::from_utf8_lossy(data); let mut events = Vec::new(); // Parse SSE format for line in content.lines() { if let Some(event_data) = line.strip_prefix("data: ") { if let Ok(json) = serde_json::from_str::<serde_json::Value>(event_data) { events.push(json); } } } Ok(ContextNode { id: "sse_data".to_string(), name: "SSE Events".to_string(), node_type: NodeType::EventSource, quantum_state: None, children: events .iter() .enumerate() .map(|(i, event)| ContextNode { id: format!("event_{}", i), name: event .get("type") .and_then(|t| t.as_str()) .unwrap_or("event") .to_string(), node_type: NodeType::EventType, quantum_state: None, children: vec![], metadata: event.clone(), entanglements: vec![], }) .collect(), metadata: serde_json::json!({ "event_count": events.len() }), entanglements: vec![], }) } fn create_event_type_node(&self, event_type: &str, frequency: f64) -> ContextNode { ContextNode { id: format!("type_{}", event_type), name: event_type.to_string(), node_type: NodeType::EventType, quantum_state: Some(QuantumState { amplitude: frequency / 10.0, // Normalize frequency, phase: 0.0, collapse_probability: 0.5, }), children: vec![], metadata: serde_json::json!({ "average_per_second": frequency, "total_count": (frequency * 3600.0) as u64, // Last hour }), entanglements: vec![], } } fn create_timeline_nodes(&self) -> Vec<ContextNode> { let now = SystemTime::now(); let mut nodes = Vec::new(); // Create nodes for last 5 time buckets for i in 0..5 { let bucket_time = now - Duration::from_secs(i * 60); nodes.push(ContextNode { id: format!("bucket_{}", i), name: format!("{} min ago", i), node_type: NodeType::Directory, quantum_state: Some(QuantumState { amplitude: 1.0 / (i as f64 + 1.0), // Decay over time frequency: 1.0, phase: i as f64 * std::f64::consts::PI / 5.0, collapse_probability: i as f64 / 5.0, }), children: vec![], metadata: serde_json::json!({ "timestamp": bucket_time, "event_count": 42 - (i * 8), // Simulated decay }), entanglements: if i > 0 { vec![Entanglement { target_id: format!("bucket_{}", i - 1), strength: 0.8, relationship: "temporal_sequence".to_string(), }] } else { vec![] }, }); } nodes } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/8b-is/smart-tree'

If you have feedback or need assistance with the MCP directory API, please join our Discord server