// ABOUTME: HTTP route handlers for A2A protocol endpoints and client management
// ABOUTME: Implements REST API endpoints for A2A authentication, tool execution, and client administration
//
// SPDX-License-Identifier: MIT OR Apache-2.0
// Copyright (c) 2025 Pierre Fitness Intelligence
//! A2A HTTP Routes
//!
//! HTTP endpoints for A2A (Agent-to-Agent) protocol management
use crate::a2a::{
agent_card::AgentCard,
auth::{A2AAuthenticator, A2AClient},
client::{
A2AClientManager, A2ARateLimitStatus, ClientCredentials, ClientRegistrationRequest,
ClientUsageStats,
},
protocol::A2AError,
};
use crate::constants::time::DAY_SECONDS;
use crate::database_plugins::DatabaseProvider;
use crate::mcp::resources::ServerResources;
use crate::protocols::universal::{UniversalRequest, UniversalToolExecutor};
use crate::utils::auth::extract_bearer_token;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use subtle::ConstantTimeEq;
use tracing::warn;
use uuid::Uuid;
/// A2A dashboard overview statistics
#[derive(Debug, Serialize)]
pub struct A2ADashboardOverview {
/// Total number of registered A2A clients
pub total_clients: u32,
/// Number of active clients
pub active_clients: u32,
/// Total number of sessions
pub total_sessions: u32,
/// Number of active sessions
pub active_sessions: u32,
/// Request count today
pub requests_today: u32,
/// Request count this month
pub requests_this_month: u32,
/// Most frequently used capability
pub most_used_capability: Option<String>,
/// Error rate (0.0-1.0)
pub error_rate: f64,
/// Usage breakdown by client tier
pub usage_by_tier: Vec<A2ATierUsage>,
}
/// Usage statistics for an A2A client tier
#[derive(Debug, Serialize)]
pub struct A2ATierUsage {
/// Tier name (free, pro, enterprise)
pub tier: String,
/// Number of clients in this tier
pub client_count: u32,
/// Total requests from this tier
pub request_count: u32,
/// Percentage of total usage
pub percentage: f64,
}
/// Request to register a new A2A client
#[derive(Debug, Deserialize)]
pub struct A2AClientRequest {
/// Client application name
pub name: String,
/// Client description
pub description: String,
/// Requested A2A capabilities
pub capabilities: Vec<String>,
/// Optional OAuth redirect URIs
pub redirect_uris: Option<Vec<String>>,
/// Contact email for client owner
pub contact_email: String,
/// Agent software version
pub agent_version: Option<String>,
/// URL to client documentation
pub documentation_url: Option<String>,
}
/// A2A Routes handler
pub struct A2ARoutes {
resources: Arc<ServerResources>,
client_manager: Arc<A2AClientManager>,
authenticator: Arc<A2AAuthenticator>,
tool_executor: UniversalToolExecutor,
}
impl A2ARoutes {
/// Extract and validate JWT token from Authorization header
fn extract_jwt_token(auth_header: Option<&str>) -> Result<String, Value> {
let auth = auth_header.ok_or_else(|| {
json!({
"code": -32001,
"message": "Missing Authorization header"
})
})?;
let token = extract_bearer_token(auth).map_err(|e| {
warn!(
error = %e,
"Failed to extract bearer token from A2A authorization header"
);
json!({
"code": -32001,
"message": "Invalid authorization header format"
})
})?;
Ok(token.to_owned())
}
/// Validate JWT token and return user ID
fn validate_jwt_and_get_user_id(&self, token: &str) -> Result<String, Value> {
self.resources
.auth_manager
.validate_token(token, &self.resources.jwks_manager)
.map(|claims| claims.sub)
.map_err(|e| {
warn!(
error = %e,
"A2A authentication token validation failed"
);
json!({
"code": -32001,
"message": "Invalid or expired authentication token"
})
})
}
/// Creates a new A2A routes instance
#[must_use]
pub fn new(resources: Arc<ServerResources>) -> Self {
let client_manager = resources.a2a_client_manager.clone(); // Safe: Arc clone for shared client manager
let authenticator = Arc::new(A2AAuthenticator::new(resources.clone())); // Safe: Arc clone for authenticator creation
let tool_executor = UniversalToolExecutor::new(resources.clone()); // Safe: Arc clone for tool executor creation
Self {
resources,
client_manager,
authenticator,
tool_executor,
}
}
/// Get A2A agent card
///
/// # Errors
///
/// Returns `A2AError` if the agent card cannot be created
pub fn get_agent_card(&self) -> Result<AgentCard, A2AError> {
Ok(AgentCard::with_base_url(&self.resources.config.base_url))
}
/// Get A2A dashboard overview
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Database operations fail
/// - Client list cannot be retrieved
pub async fn get_dashboard_overview(
&self,
_auth_header: Option<&str>,
) -> Result<A2ADashboardOverview, A2AError> {
// Use existing client manager methods for real data
let clients = self
.client_manager
.list_all_clients()
.await
.map_err(|e| A2AError::DatabaseError(e.to_string()))?;
let total_clients = u32::try_from(clients.len()).unwrap_or(u32::MAX);
let active_clients =
u32::try_from(clients.iter().filter(|c| c.is_active).count()).unwrap_or(0);
// Sessions and usage stats based on database queries
// These would need proper session tracking implementation
let total_sessions = 0; // No session tracking implemented yet
let active_sessions = 0; // No session tracking implemented yet
let requests_today = 0; // No usage logging implemented yet
let requests_this_month = 0; // No usage logging implemented yet
let most_used_capability = None; // No usage tracking implemented yet
let error_rate = 0.0; // No error tracking implemented yet
// Create tier structure based on user subscription level
let usage_tiers = if active_clients > 0 {
vec![A2ATierUsage {
tier: "basic".into(),
client_count: active_clients,
request_count: 0, // No usage tracking yet
percentage: 100.0,
}]
} else {
vec![]
};
let overview = A2ADashboardOverview {
total_clients,
active_clients,
total_sessions,
active_sessions,
requests_today,
requests_this_month,
most_used_capability,
error_rate,
usage_by_tier: usage_tiers,
};
Ok(overview)
}
/// Register new A2A client
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Authentication fails or no valid auth header
/// - Client registration fails
/// - Database operations fail
pub async fn register_client(
&self,
auth_header: Option<&str>,
request: A2AClientRequest,
) -> Result<ClientCredentials, A2AError> {
// Extract and validate JWT to get the authenticated user's ID
let token = Self::extract_jwt_token(auth_header)
.map_err(|e| A2AError::AuthenticationFailed(format!("JWT extraction failed: {e}")))?;
let user_id_str = self
.validate_jwt_and_get_user_id(&token)
.map_err(|e| A2AError::AuthenticationFailed(format!("JWT validation failed: {e}")))?;
let user_id = Uuid::parse_str(&user_id_str)
.map_err(|e| A2AError::InternalError(format!("Invalid user ID format: {e}")))?;
let registration = ClientRegistrationRequest {
name: request.name,
description: request.description,
capabilities: request.capabilities,
redirect_uris: request.redirect_uris.unwrap_or_default(),
contact_email: request.contact_email,
};
self.client_manager
.register_client(registration, user_id)
.await
}
/// List A2A clients
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Database operations fail
/// - Client list cannot be retrieved
pub async fn list_clients(
&self,
_auth_header: Option<&str>,
) -> Result<Vec<A2AClient>, A2AError> {
self.client_manager.list_all_clients().await
}
/// Get A2A client usage statistics
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Client does not exist
/// - Database operations fail
pub async fn get_client_usage(
&self,
_auth_header: Option<&str>,
client_id: &str,
) -> Result<ClientUsageStats, A2AError> {
self.client_manager.get_client_usage(client_id).await
}
/// Get A2A client rate limit status
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Client does not exist
/// - Database operations fail
pub async fn get_client_rate_limit(
&self,
_auth_header: Option<&str>,
client_id: &str,
) -> Result<A2ARateLimitStatus, A2AError> {
self.client_manager
.get_client_rate_limit_status(client_id)
.await
}
/// Deactivate A2A client
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Client does not exist
/// - Database operations fail
pub async fn deactivate_client(
&self,
_auth_header: Option<&str>,
client_id: &str,
) -> Result<(), A2AError> {
self.client_manager.deactivate_client(client_id).await
}
/// Authenticate A2A request
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Required fields are missing from the request
/// - Client authentication fails
/// - Session creation fails
pub async fn authenticate(&self, request: Value) -> Result<Value, A2AError> {
// Parse authentication request
let client_id = request
.get("client_id")
.and_then(|v| v.as_str())
.ok_or_else(|| A2AError::InvalidRequest("Missing client_id".into()))?;
let client_secret = request
.get("client_secret")
.and_then(|v| v.as_str())
.ok_or_else(|| A2AError::InvalidRequest("Missing client_secret".into()))?;
let scopes = request
.get("scopes")
.and_then(|v| v.as_array())
.map_or_else(
|| vec!["read".into()],
|arr| {
arr.iter()
.filter_map(|v| v.as_str())
.map(str::to_owned)
.collect::<Vec<String>>()
},
);
// Verify client exists and credentials are valid
let client = self
.client_manager
.get_client(client_id)
.await?
.ok_or_else(|| A2AError::AuthenticationFailed("Invalid client_id".into()))?;
if !client.is_active {
return Err(A2AError::AuthenticationFailed(
"Client is deactivated".into(),
));
}
// Verify client secret using constant-time comparison to prevent timing attacks
let credentials = self
.client_manager
.get_client_credentials(client_id)
.await?
.ok_or_else(|| A2AError::AuthenticationFailed("Invalid credentials".into()))?;
// Use constant-time comparison to prevent timing attacks
let expected_secret = credentials.client_secret.as_bytes();
let provided_secret = client_secret.as_bytes();
// Both secrets must be the same length and content for authentication to succeed
let secrets_match = expected_secret.len() == provided_secret.len()
&& expected_secret.ct_eq(provided_secret).into();
if !secrets_match {
return Err(A2AError::AuthenticationFailed(
"Invalid client_secret".into(),
));
}
// Create A2A session
let session_token = self
.resources
.database
.create_a2a_session(client_id, None, &scopes, 24)
.await
.map_err(|e| A2AError::InternalError(format!("Failed to create session: {e}")))?;
Ok(json!({
"status": "authenticated",
"session_token": session_token,
"expires_in": DAY_SECONDS,
"token_type": "Bearer",
"scope": scopes.join(" ")
}))
}
/// Handle tool execution method
async fn handle_tools_execute(
&self,
auth_header: Option<&str>,
params: &Value,
id: &Value,
) -> Result<Value, A2AError> {
let tool_name = params
.get("tool_name")
.and_then(|t| t.as_str())
.ok_or_else(|| A2AError::InvalidRequest("Missing tool_name in params".into()))?;
let parameters = params
.get("parameters")
.cloned()
.unwrap_or_else(|| json!({}));
// Extract and validate JWT token
let token = match Self::extract_jwt_token(auth_header) {
Ok(token) => token,
Err(error) => {
return Ok(json!({
"jsonrpc": "2.0",
"id": id,
"error": error
}));
}
};
let user_id = match self.validate_jwt_and_get_user_id(&token) {
Ok(user_id) => user_id,
Err(error) => {
return Ok(json!({
"jsonrpc": "2.0",
"id": id,
"error": error
}));
}
};
// Create universal request
let universal_request = UniversalRequest {
tool_name: tool_name.to_owned(),
parameters,
user_id,
protocol: "a2a".into(),
tenant_id: None, // A2A doesn't have tenant context yet
progress_token: None,
cancellation_token: None,
progress_reporter: None,
};
// Execute the tool
match self.tool_executor.execute_tool(universal_request).await {
Ok(universal_response) => {
let response = json!({
"jsonrpc": "2.0",
"id": id,
"result": universal_response.result.unwrap_or_else(|| json!({}))
});
Ok(response)
}
Err(e) => {
let error_response = json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32000,
"message": format!("Tool execution failed: {e}"),
"data": null
}
});
Ok(error_response)
}
}
}
/// Handle client info method
fn handle_client_info(id: &Value) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"name": "Pierre Fitness AI",
"version": "1.0.0",
"capabilities": [
"fitness-data-analysis",
"goal-management",
"activity-insights",
"performance-metrics"
],
"protocols": ["A2A", "MCP"],
"description": "AI-powered fitness data analysis and insights platform"
}
})
}
/// Handle session heartbeat method
async fn handle_session_heartbeat(
&self,
auth_header: Option<&str>,
id: &Value,
) -> Result<Value, A2AError> {
let token = match Self::extract_jwt_token(auth_header) {
Ok(token) => token,
Err(error) => {
return Ok(json!({
"jsonrpc": "2.0",
"id": id,
"error": error
}));
}
};
match self
.resources
.database
.update_a2a_session_activity(&token)
.await
{
Ok(()) => Ok(json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"status": "alive",
"timestamp": Utc::now().to_rfc3339()
}
})),
Err(e) => Ok(json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32000,
"message": format!("Failed to update session: {e}")
}
})),
}
}
/// Handle capabilities list method
fn handle_capabilities_list(id: &Value) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"capabilities": [
{
"name": "fitness-data-analysis",
"description": "Analyze fitness and activity data for insights",
"version": "1.0.0"
},
{
"name": "goal-management",
"description": "Create and track fitness goals",
"version": "1.0.0"
},
{
"name": "activity-insights",
"description": "Generate insights from activity patterns",
"version": "1.0.0"
},
{
"name": "performance-metrics",
"description": "Calculate performance metrics and trends",
"version": "1.0.0"
}
]
}
})
}
/// Execute A2A tool
///
/// # Errors
///
/// Returns `A2AError` if:
/// - Request is malformed or missing required fields
/// - Authentication fails
/// - Tool execution fails
pub async fn execute_tool(
&self,
auth_header: Option<&str>,
request: Value,
) -> Result<Value, A2AError> {
// Parse the JSON-RPC request
let method = request
.get("method")
.and_then(|m| m.as_str())
.ok_or_else(|| A2AError::InvalidRequest("Missing method field".into()))?;
let params = request
.get("params")
.ok_or_else(|| A2AError::InvalidRequest("Missing params field".into()))?;
let id = request.get("id").cloned().unwrap_or_else(|| json!(1));
match method {
"tools.execute" => self.handle_tools_execute(auth_header, params, &id).await,
"client.info" => Ok(Self::handle_client_info(&id)),
"session.heartbeat" => self.handle_session_heartbeat(auth_header, &id).await,
"capabilities.list" => Ok(Self::handle_capabilities_list(&id)),
_ => Ok(json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32601,
"message": format!("Method '{method}' not found"),
"data": {
"available_methods": [
"tools.execute",
"client.info",
"session.heartbeat",
"capabilities.list"
]
}
}
})),
}
}
}
impl Clone for A2ARoutes {
fn clone(&self) -> Self {
// Clone uses shared ServerResources - no expensive object recreation
let tool_executor = UniversalToolExecutor::new(self.resources.clone()); // Safe: Arc clone for tool executor creation
Self {
resources: self.resources.clone(), // Safe: Arc clone for resource sharing
client_manager: self.client_manager.clone(), // Safe: Arc clone for A2A context
authenticator: self.authenticator.clone(), // Safe: Arc clone for A2A context
tool_executor,
}
}
}