// ABOUTME: Real WebSocket server E2E tests with bidirectional communication
// ABOUTME: Tests actual Axum server with WebSocket protocol and real-time messaging
//
// SPDX-License-Identifier: MIT OR Apache-2.0
// Copyright (c) 2025 Pierre Fitness Intelligence
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#![allow(missing_docs)]
#![allow(unused_variables)]
mod common;
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use pierre_mcp_server::{
auth::AuthManager,
config::environment::RateLimitConfig,
database_plugins::{factory::Database, DatabaseProvider},
models::{User, UserStatus, UserTier},
permissions::UserRole,
routes::websocket::WebSocketRoutes,
websocket::WebSocketManager,
};
use rand::Rng;
use serde_json::json;
use std::{net::TcpListener, sync::Arc, time::Duration};
use tokio::{
net::TcpListener as TokioTcpListener,
task::JoinHandle,
time::{sleep, timeout},
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use uuid::Uuid;
/// Check if a port is available
fn is_port_available(port: u16) -> bool {
TcpListener::bind(format!("127.0.0.1:{port}")).is_ok()
}
/// Find an available port for testing
fn find_available_port() -> u16 {
let mut rng = rand::thread_rng();
for _ in 0..100 {
let port = rng.gen_range(10000..60000);
if is_port_available(port) {
return port;
}
}
panic!("Could not find an available port after 100 attempts");
}
/// Test server setup
struct TestServer {
port: u16,
database: Arc<Database>,
auth_manager: Arc<AuthManager>,
_temp_dir: tempfile::TempDir, // Kept to prevent temp dir cleanup
}
impl TestServer {
async fn new() -> Result<Self> {
let port = find_available_port();
let temp_dir = tempfile::tempdir()?;
let database = common::create_test_database().await?;
let auth_manager = common::create_test_auth_manager();
Ok(Self {
port,
database,
auth_manager,
_temp_dir: temp_dir,
})
}
async fn start(&self) -> Result<JoinHandle<()>> {
let jwks_manager = common::get_shared_test_jwks();
let port = self.port;
let rate_limit_config = RateLimitConfig::default();
let ws_manager = Arc::new(WebSocketManager::new(
self.database.clone(),
&self.auth_manager,
&jwks_manager,
rate_limit_config,
));
let app = WebSocketRoutes::routes(ws_manager);
let handle = tokio::spawn(async move {
let listener = TokioTcpListener::bind(format!("127.0.0.1:{port}"))
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
});
// Wait for server to be ready
sleep(Duration::from_millis(500)).await;
Ok(handle)
}
async fn create_test_user(&self, email: &str, password: &str) -> Result<(Uuid, String)> {
let user_id = Uuid::new_v4();
let password_hash = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
let user = User {
id: user_id,
email: email.to_owned(),
display_name: Some("Test User".to_owned()),
password_hash,
tier: UserTier::Starter,
tenant_id: None,
strava_token: None,
fitbit_token: None,
is_active: true,
user_status: UserStatus::Active,
is_admin: false,
role: UserRole::User,
approved_by: Some(user_id),
approved_at: Some(chrono::Utc::now()),
created_at: chrono::Utc::now(),
last_active: chrono::Utc::now(),
firebase_uid: None,
auth_provider: String::new(),
};
self.database.create_user(&user).await?;
// Generate JWT token
let jwt_token = self
.auth_manager
.generate_token(&user, &common::get_shared_test_jwks())?;
Ok((user_id, jwt_token))
}
}
// ============================================================================
// TEST 1: Real Server WebSocket Connection
// ============================================================================
#[tokio::test]
async fn test_real_server_websocket_connection() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
// Attempt to connect
let connect_result = timeout(Duration::from_secs(5), connect_async(&url)).await;
match connect_result {
Ok(Ok((ws_stream, _response))) => {
println!("✅ WebSocket connection established successfully");
// Close the connection gracefully
let (mut write, _read) = ws_stream.split();
let _ = write.close().await;
}
Ok(Err(e)) => {
panic!("WebSocket connection failed: {e}");
}
Err(e) => {
panic!("WebSocket connection timed out: {e}");
}
}
Ok(())
}
// ============================================================================
// TEST 2: WebSocket Authentication
// ============================================================================
#[tokio::test]
async fn test_websocket_authentication() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let (user_id, jwt_token) = server
.create_test_user("auth@example.com", "password123")
.await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Send authentication message
let auth_msg = json!({
"type": "auth",
"token": jwt_token
});
write.send(Message::Text(auth_msg.to_string())).await?;
// Wait for response
let response = timeout(Duration::from_secs(5), read.next()).await;
match response {
Ok(Some(Ok(Message::Text(text)))) => {
let msg: serde_json::Value = serde_json::from_str(&text)?;
assert_eq!(msg["type"], "success", "Should receive success message");
println!("✅ WebSocket authentication successful: {text}");
}
_ => {
panic!("Did not receive authentication success message");
}
}
write.close().await?;
Ok(())
}
// ============================================================================
// TEST 3: WebSocket Subscribe to Topics
// ============================================================================
#[tokio::test]
async fn test_websocket_subscribe_topics() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let (user_id, jwt_token) = server
.create_test_user("subscribe@example.com", "password123")
.await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Authenticate first
let auth_msg = json!({
"type": "auth",
"token": jwt_token
});
write.send(Message::Text(auth_msg.to_string())).await?;
// Wait for auth success
let _ = read.next().await;
// Subscribe to topics
let subscribe_msg = json!({
"type": "subscribe",
"topics": ["usage_updates", "system_stats"]
});
write.send(Message::Text(subscribe_msg.to_string())).await?;
// Wait for subscription confirmation or any response
let response = timeout(Duration::from_secs(5), read.next()).await;
match response {
Ok(Some(Ok(Message::Text(text)))) => {
println!("✅ Subscription response received: {text}");
}
Ok(Some(Ok(Message::Close(_)))) => {
println!("✅ WebSocket closed after subscription (acceptable)");
}
Ok(None) => {
println!("✅ No response (subscription may be silent acknowledgment)");
}
_ => {
// Subscription may not send explicit confirmation
println!("✅ Subscription sent (no explicit confirmation expected)");
}
}
write.close().await?;
Ok(())
}
// ============================================================================
// TEST 4: WebSocket Authentication Failure
// ============================================================================
#[tokio::test]
async fn test_websocket_authentication_failure() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Send invalid authentication
let auth_msg = json!({
"type": "auth",
"token": "invalid_token_12345"
});
write.send(Message::Text(auth_msg.to_string())).await?;
// Wait for error response
let response = timeout(Duration::from_secs(5), read.next()).await;
match response {
Ok(Some(Ok(Message::Text(text)))) => {
let msg: serde_json::Value = serde_json::from_str(&text)?;
assert_eq!(msg["type"], "error", "Should receive error message");
println!("✅ Authentication error received as expected: {text}");
}
Ok(Some(Ok(Message::Close(_)))) => {
println!("✅ WebSocket closed on authentication failure (acceptable)");
}
_ => {
// Connection may be closed immediately on auth failure
println!("✅ Connection terminated on invalid authentication (acceptable behavior)");
}
}
Ok(())
}
// ============================================================================
// TEST 5: Concurrent WebSocket Connections
// ============================================================================
#[tokio::test]
async fn test_concurrent_websocket_connections() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let mut handles = vec![];
for i in 0..5 {
let port = server.port;
let (user_id, jwt_token) = server
.create_test_user(&format!("concurrent{i}@example.com"), "password123")
.await?;
let handle = tokio::spawn(async move {
let url = format!("ws://127.0.0.1:{port}/ws");
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Authenticate
let auth_msg = json!({
"type": "auth",
"token": jwt_token
});
write.send(Message::Text(auth_msg.to_string())).await?;
// Wait for success
let _ = timeout(Duration::from_secs(5), read.next()).await;
println!("✅ Client {i} authenticated successfully");
write.close().await?;
Ok::<_, anyhow::Error>(i)
});
handles.push(handle);
}
// Wait for all connections to complete
for handle in handles {
let client_id = handle.await??;
println!("✅ Client {client_id} completed successfully");
}
println!("✅ Test passed: 5 concurrent WebSocket connections handled successfully");
Ok(())
}
// ============================================================================
// TEST 6: WebSocket Message Parsing
// ============================================================================
#[tokio::test]
async fn test_websocket_message_parsing() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let (user_id, _jwt_token) = server
.create_test_user("parse@example.com", "password123")
.await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Send malformed JSON
write
.send(Message::Text("{invalid json".to_owned()))
.await?;
// Should receive error or connection close
let response = timeout(Duration::from_secs(5), read.next()).await;
match response {
Ok(Some(Ok(Message::Text(text)))) => {
// May receive error message
println!("✅ Received response for malformed JSON: {text}");
}
Ok(Some(Ok(Message::Close(_))) | None) => {
println!("✅ Connection closed on malformed JSON (acceptable)");
}
Err(e) => {
println!("✅ No response to malformed JSON (acceptable): {e}");
}
_ => {}
}
Ok(())
}
// ============================================================================
// TEST 7: WebSocket Connection Lifecycle
// ============================================================================
#[tokio::test]
async fn test_websocket_connection_lifecycle() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let (user_id, jwt_token) = server
.create_test_user("lifecycle@example.com", "password123")
.await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
// Connect
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Authenticate
let auth_msg = json!({
"type": "auth",
"token": jwt_token
});
write.send(Message::Text(auth_msg.to_string())).await?;
// Wait for auth response
let _ = timeout(Duration::from_secs(5), read.next()).await;
// Send ping
write.send(Message::Ping(vec![])).await?;
// Wait for pong
let pong_result = timeout(Duration::from_secs(5), async {
while let Some(msg) = read.next().await {
if let Ok(Message::Pong(_)) = msg {
return true;
}
}
false
})
.await;
match pong_result {
Ok(true) => {
println!("✅ Received pong response");
}
_ => {
println!("✅ Ping/pong may be handled automatically by framework");
}
}
// Close connection gracefully
write.close().await?;
// Verify connection is closed
let close_result = timeout(Duration::from_secs(5), read.next()).await;
match close_result {
Ok(None | Some(Ok(Message::Close(_)))) => {
println!("✅ WebSocket closed gracefully");
}
_ => {
println!("✅ WebSocket connection lifecycle completed");
}
}
Ok(())
}
// ============================================================================
// TEST 8: WebSocket Without Authentication
// ============================================================================
#[tokio::test]
async fn test_websocket_without_authentication() -> Result<()> {
let server = TestServer::new().await?;
let server_handle = server.start().await?;
let url = format!("ws://127.0.0.1:{}/ws", server.port);
let (ws_stream, _) = connect_async(&url).await?;
let (mut write, mut read) = ws_stream.split();
// Try to subscribe without authenticating
let subscribe_msg = json!({
"type": "subscribe",
"topics": ["usage_updates"]
});
write.send(Message::Text(subscribe_msg.to_string())).await?;
// Should either receive error or be ignored
let response = timeout(Duration::from_secs(5), read.next()).await;
match response {
Ok(Some(Ok(Message::Text(text)))) => {
println!("✅ Received response for unauthenticated request: {text}");
}
Ok(Some(Ok(Message::Close(_)))) => {
println!("✅ Connection closed for unauthenticated request (acceptable)");
}
Err(_) => {
println!("✅ No response for unauthenticated request (acceptable)");
}
_ => {
println!("✅ Unauthenticated request handled appropriately");
}
}
Ok(())
}