// ABOUTME: Complete OAuth+SSE integration test for real-time notifications
// ABOUTME: Tests end-to-end OAuth flow with SSE notification delivery to MCP client
//
// SPDX-License-Identifier: MIT OR Apache-2.0
// Copyright (c) 2025 Pierre Fitness Intelligence
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#![allow(missing_docs)]
use anyhow::Result;
use pierre_mcp_server::{
database::oauth_notifications::OAuthNotification, database_plugins::DatabaseProvider,
sse::manager::SseManager,
};
use reqwest::Client;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
mod common;
/// Test complete OAuth flow with SSE notifications
#[tokio::test]
async fn test_oauth_strava_with_sse_notifications() -> Result<()> {
println!("đ Testing complete OAuth+SSE notification flow");
// Create test server resources
let resources = common::create_test_server_resources().await?;
let (user_id, user) = common::create_test_user(&resources.database).await?;
// Create SSE manager
let sse_manager = Arc::new(SseManager::new(100));
// Simulate SSE connection registration (MCP client connects)
let mut sse_receiver = sse_manager.register_notification_stream(user_id).await;
println!("â
SSE connection registered for user: {user_id}");
// Generate JWT token for user
let jwks_manager = common::get_shared_test_jwks();
let jwt_token = resources
.auth_manager
.generate_token(&user, &jwks_manager)?;
println!("â
JWT token generated for user");
// Simulate OAuth authorization request (user clicks "Connect to Strava")
let client = Client::new();
let auth_url = format!("http://127.0.0.1:8081/api/oauth/auth/strava/{user_id}");
// Test OAuth authorization URL generation
println!("đ Testing OAuth authorization URL generation");
let auth_response = client
.get(&auth_url)
.header("Authorization", format!("Bearer {jwt_token}"))
.header("strava-client-id", "test_client_id")
.header("strava-client-secret", "test_client_secret")
.send()
.await;
match auth_response {
Ok(resp) => {
println!("â
OAuth authorization URL generated: {}", resp.status());
if resp.status().is_redirection() {
if let Some(location) = resp.headers().get("location") {
println!(" Redirect location: {location:?}");
}
}
}
Err(e) => {
println!("âšī¸ OAuth authorization test skipped (server not running): {e}");
}
}
// Simulate OAuth callback (Strava redirects back with auth code)
println!("đ Testing OAuth callback with SSE notification");
// Create a mock OAuth notification
let oauth_notification = OAuthNotification {
id: "test-oauth-notification".to_owned(),
user_id: user_id.to_string(),
provider: "strava".to_owned(),
success: true,
message: "Successfully connected to Strava! You can now access your fitness data."
.to_owned(),
expires_at: None,
created_at: chrono::Utc::now(),
read_at: None,
};
// Save notification to database using the correct method signature
resources
.database
.store_oauth_notification(
user_id,
&oauth_notification.provider,
oauth_notification.success,
&oauth_notification.message,
None, // expires_at
)
.await?;
println!("â
OAuth notification saved to database");
// Send notification via SSE
let notification_result = sse_manager
.send_notification(user_id, &oauth_notification)
.await;
println!(
"đ¤ SSE notification sent: {:?}",
notification_result.is_ok()
);
// Test SSE message reception with timeout
println!("đĨ Testing SSE message reception");
let sse_timeout = timeout(Duration::from_millis(100), sse_receiver.recv()).await;
match sse_timeout {
Ok(Ok(message)) => {
println!("â
SSE notification received: {message}");
// Verify message content
assert!(message.contains("oauth_notification"));
assert!(message.contains("strava"));
assert!(message.contains("Successfully connected"));
}
Ok(Err(e)) => {
println!("â ī¸ SSE receiver error: {e:?}");
}
Err(_) => {
println!("â° SSE message reception timeout (expected in unit test)");
}
}
// Test cleanup
sse_manager.unregister_notification_stream(user_id).await;
assert_eq!(sse_manager.active_notification_streams().await, 0);
println!("â
SSE connection cleanup successful");
println!("â
OAuth+SSE integration test completed successfully!");
Ok(())
}
/// Test MCP client token refresh with SSE notifications
#[tokio::test]
async fn test_mcp_client_oauth_notification_flow() -> Result<()> {
println!("đ Testing MCP client OAuth notification flow");
let resources = common::create_test_server_resources().await?;
let (user_id, user) = common::create_test_user(&resources.database).await?;
// Create SSE manager
let sse_manager = Arc::new(SseManager::new(100));
// Test token refresh endpoint (simulates MCP client auto-refresh)
let jwks_manager = common::get_shared_test_jwks();
let initial_token = resources
.auth_manager
.generate_token(&user, &jwks_manager)?;
println!("â
Initial JWT token generated");
let client = Client::new();
let refresh_request = json!({
"token": initial_token,
"user_id": user_id.to_string()
});
// Test refresh endpoint (would be called by MCP client)
let refresh_url = "http://127.0.0.1:8081/api/auth/refresh";
println!("đ Testing token refresh for MCP client");
let refresh_response = client.post(refresh_url).json(&refresh_request).send().await;
match refresh_response {
Ok(resp) if resp.status().is_success() => {
println!("â
Token refresh successful for MCP client");
}
Ok(resp) => {
println!(
"âšī¸ Token refresh response: {} (server may not be running)",
resp.status()
);
}
Err(e) => {
println!("âšī¸ Token refresh test skipped (server not running): {e}");
}
}
// Test SSE connection for real-time notifications
let mut receiver = sse_manager.register_notification_stream(user_id).await;
println!("â
MCP client SSE connection established");
// Simulate OAuth completion notification
let notification = OAuthNotification {
id: "mcp-client-notification".to_owned(),
user_id: user_id.to_string(),
provider: "strava".to_owned(),
success: true,
message: "OAuth completed - data ready for MCP tools".to_owned(),
expires_at: None,
created_at: chrono::Utc::now(),
read_at: None,
};
// Send notification
sse_manager
.send_notification(user_id, ¬ification)
.await?;
// Test notification delivery to MCP client
let msg_result = timeout(Duration::from_millis(50), receiver.recv()).await;
match msg_result {
Ok(Ok(msg)) => {
println!("â
MCP client received OAuth notification: {msg}");
assert!(msg.contains("data ready for MCP tools"));
}
_ => {
println!("â° MCP client notification timeout (expected in unit test)");
}
}
sse_manager.unregister_notification_stream(user_id).await;
println!("â
MCP client disconnected");
println!("â
MCP client OAuth notification flow test completed!");
Ok(())
}
/// Test error scenarios and edge cases
#[tokio::test]
async fn test_oauth_sse_error_scenarios() -> Result<()> {
println!("đ Testing OAuth+SSE error scenarios");
let resources = common::create_test_server_resources().await?;
let (user_id, _user) = common::create_test_user(&resources.database).await?;
let sse_manager = Arc::new(SseManager::new(100));
// Test notification to non-existent SSE connection
let notification = OAuthNotification {
id: "error-test".to_owned(),
user_id: user_id.to_string(),
provider: "strava".to_owned(),
success: false,
message: "OAuth failed - invalid credentials".to_owned(),
expires_at: None,
created_at: chrono::Utc::now(),
read_at: None,
};
// Create a random user ID that doesn't have a connection
let non_existent_user = uuid::Uuid::new_v4();
let result = sse_manager
.send_notification(non_existent_user, ¬ification)
.await;
assert!(result.is_err());
println!("â
Error handling for non-existent SSE connection");
// Test connection cleanup
let test_user_id = uuid::Uuid::new_v4();
let receiver = sse_manager.register_notification_stream(test_user_id).await;
assert_eq!(sse_manager.active_notification_streams().await, 1);
drop(receiver); // Simulate client disconnect
// Connection should still exist until explicitly cleaned up
sse_manager
.unregister_notification_stream(test_user_id)
.await;
assert_eq!(sse_manager.active_notification_streams().await, 0);
println!("â
SSE connection cleanup on client disconnect");
println!("â
Error scenario tests completed!");
Ok(())
}