// ABOUTME: MCP protocol streaming implementation for session-based bidirectional communication
// ABOUTME: Handles SSE streaming of MCP protocol messages with session management
//
// SPDX-License-Identifier: MIT OR Apache-2.0
// Copyright (c) 2025 Pierre Fitness Intelligence
use crate::{
database::oauth_notifications::OAuthNotification,
errors::AppError,
mcp::{
protocol::{McpRequest, McpResponse},
resources::ServerResources,
tool_handlers::ToolHandlers,
},
};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::{
broadcast::{self, Sender},
RwLock,
};
use tracing::{debug, error, info};
/// MCP protocol stream for a specific session
pub struct McpProtocolStream {
/// Server resources including database and configuration
resources: Arc<ServerResources>,
/// Broadcast sender for streaming messages to clients
sender: Arc<RwLock<Option<broadcast::Sender<String>>>>,
/// Optional session identifier for this stream
session_id: Option<String>,
/// Maximum number of messages to buffer
buffer_size: usize,
}
impl McpProtocolStream {
/// Creates a new MCP protocol stream for SSE communication
#[must_use]
pub fn new(resources: Arc<ServerResources>) -> Self {
let buffer_size = resources.config.sse.max_buffer_size;
Self {
resources,
sender: Arc::new(RwLock::new(None)),
session_id: None,
buffer_size,
}
}
/// Subscribe to MCP protocol messages for this session
pub async fn subscribe(&self) -> broadcast::Receiver<String> {
let mut sender_guard = self.sender.write().await;
let sender = if let Some(existing_sender) = sender_guard.take() {
*sender_guard = Some(existing_sender.clone());
existing_sender
} else {
let (tx, _) = broadcast::channel(self.buffer_size);
*sender_guard = Some(tx.clone());
tx
};
drop(sender_guard);
sender.subscribe()
}
/// Handle MCP request and stream response
///
/// # Errors
///
/// Returns an error if no active sender is available for this stream
pub async fn handle_request(&self, request: McpRequest) -> Result<(), AppError> {
// Process the MCP request using existing handlers
let response =
ToolHandlers::handle_tools_call_with_resources(request, &self.resources).await;
// Stream the response
self.send_response(response).await?;
Ok(())
}
/// Send MCP response through SSE stream
async fn send_response(&self, response: McpResponse) -> Result<(), AppError> {
let sender_guard = self.sender.read().await;
if let Some(sender) = sender_guard.as_ref() {
// Send only the JSON data - SSE formatting handled by Axum SSE helper
let json_data = serde_json::to_string(&response)
.map_err(|e| AppError::internal(format!("Failed to serialize response: {e}")))?;
sender
.send(json_data)
.map_err(|e| AppError::internal(format!("Failed to send MCP response: {e}")))?;
Ok(())
} else {
Err(AppError::internal("No active sender for protocol stream"))
}
}
/// Send error event through SSE stream
///
/// # Errors
///
/// Returns an error if:
/// - No active sender is available for this stream
/// - JSON serialization fails
/// - Sending the error event fails
pub async fn send_error(&self, error_message: &str) -> Result<(), AppError> {
let error_response =
McpResponse::error(Some(Value::Null), -32603, error_message.to_owned());
let sender_guard = self.sender.read().await;
if let Some(sender) = sender_guard.as_ref() {
// Send only the JSON data - SSE formatting handled by Axum SSE helper
let json_data = serde_json::to_string(&error_response)
.map_err(|e| AppError::internal(format!("Failed to serialize error: {e}")))?;
sender
.send(json_data)
.map_err(|e| AppError::internal(format!("Failed to send error event: {e}")))?;
Ok(())
} else {
Err(AppError::internal("No active sender for protocol stream"))
}
}
/// Check if stream has active subscribers
pub async fn has_subscribers(&self) -> bool {
let sender_guard = self.sender.read().await;
sender_guard
.as_ref()
.is_some_and(|sender| sender.receiver_count() > 0)
}
/// Set session ID for this stream
pub fn set_session_id(&mut self, session_id: String) {
self.session_id = Some(session_id);
}
/// Get session ID for this stream
#[must_use]
pub const fn get_session_id(&self) -> Option<&String> {
self.session_id.as_ref()
}
/// Send OAuth notification through MCP protocol stream
///
/// Build JSON-RPC notification for OAuth completion
fn build_oauth_notification_json(notification: &OAuthNotification) -> Result<String, AppError> {
let mcp_notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/oauth_completed",
"params": {
"provider": notification.provider,
"success": notification.success,
"message": notification.message,
"user_id": notification.user_id,
}
});
serde_json::to_string(&mcp_notification)
.map_err(|e| AppError::internal(format!("Failed to serialize notification: {e}")))
}
/// Broadcast notification via sender
fn broadcast_notification(
sender: &Sender<String>,
json_data: String,
provider: &str,
) -> Result<(), AppError> {
match sender.send(json_data) {
Ok(count) => {
info!(
"OAuth notification broadcast succeeded! Reached {} receiver(s) for provider {}",
count, provider
);
Ok(())
}
Err(e) => {
error!("OAuth notification broadcast FAILED: {}", e);
Err(AppError::internal(format!(
"Failed to send OAuth notification: {e}"
)))
}
}
}
/// # Errors
///
/// Returns an error if:
/// - No active sender is available for this stream
/// - JSON serialization fails
/// - Sending the notification fails
pub async fn send_oauth_notification(
&self,
notification: &OAuthNotification,
) -> Result<(), AppError> {
debug!(
"send_oauth_notification called for provider: {}",
notification.provider
);
let sender = self.get_active_sender().await?;
let json_data = Self::build_oauth_notification_json(notification)?;
debug!("JSON data to send: {}", json_data);
Self::broadcast_notification(&sender, json_data, ¬ification.provider)
}
async fn get_active_sender(&self) -> Result<Sender<String>, AppError> {
let sender_guard = self.sender.read().await;
let Some(sender) = sender_guard.as_ref().cloned() else {
drop(sender_guard);
error!("No active sender for protocol stream");
return Err(AppError::internal("No active sender for protocol stream"));
};
drop(sender_guard);
debug!(
"Active SSE receivers for this stream: {}",
sender.receiver_count()
);
Ok(sender)
}
}