Skip to main content
Glama

Convex MCP server

Official
by get-convex
metrics.rs16.8 kB
use std::time::Duration; use metrics::{ log_counter_with_labels, log_distribution_with_labels, register_convex_counter, register_convex_histogram, StaticMetricLabel, StatusTimer, STATUS_LABEL, }; use serde::{ Deserialize, Serialize, }; use serde_json::Value as JsonValue; use sync_types::{ types::ClientEvent, ClientMessage, Timestamp, }; register_convex_histogram!( SYNC_CONNECT_SECONDS, "Time between SyncWorker creation and receiving Connect message", &[STATUS_LABEL[0], "partition_id"] ); /// Measuring time between SyncWorker creation and receiving Connect message. pub fn connect_timer(partition_id: u64) -> StatusTimer { let mut timer = StatusTimer::new(&SYNC_CONNECT_SECONDS); timer.add_label(StaticMetricLabel::new( "partition_id", partition_id.to_string(), )); timer } register_convex_histogram!( SYNC_HANDLE_MESSAGE_SECONDS, "Time to handle a websocket message", &[STATUS_LABEL[0], "partition_id", "endpoint"] ); pub fn handle_message_timer(partition_id: u64, message: &ClientMessage) -> StatusTimer { let mut timer = StatusTimer::new(&SYNC_HANDLE_MESSAGE_SECONDS); let request_name = match message { ClientMessage::Authenticate { .. } => "Authenticate", ClientMessage::Connect { .. } => "Connect", ClientMessage::Action { .. } => "Action", ClientMessage::ModifyQuerySet { .. } => "ModifyQuerySet", ClientMessage::Mutation { .. } => "Mutation", ClientMessage::Event { .. } => "Event", }; timer.add_label(StaticMetricLabel::new("endpoint", request_name.to_owned())); timer.add_label(StaticMetricLabel::new( "partition_id", partition_id.to_string(), )); timer } register_convex_histogram!( SYNC_UPDATE_QUERIES_SECONDS, "Time to update queries", &[STATUS_LABEL[0], "partition_id"] ); pub fn update_queries_timer(partition_id: u64) -> StatusTimer { let mut timer = StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS); timer.add_label(StaticMetricLabel::new( "partition_id", partition_id.to_string(), )); timer } register_convex_histogram!( MODIFY_QUERY_TO_TRANSITION_SECONDS, "Time between getting a ModifyQuerySet message and sending the Transition", &[STATUS_LABEL[0], "partition_id"] ); pub fn modify_query_to_transition_timer(partition_id: u64) -> StatusTimer { let mut timer = StatusTimer::new(&MODIFY_QUERY_TO_TRANSITION_SECONDS); timer.add_label(StaticMetricLabel::new( "partition_id", partition_id.to_string(), )); timer } register_convex_histogram!( SYNC_MUTATION_QUEUE_SECONDS, "Time between a mutation entering and exiting the single threaded sync worker queue", &[STATUS_LABEL[0], "partition_id"] ); pub fn mutation_queue_timer(partition_id: u64) -> StatusTimer { let mut timer = StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS); timer.add_label(StaticMetricLabel::new( "partition_id", partition_id.to_string(), )); timer } register_convex_histogram!( SYNC_QUERY_MODIFICATION_ARGS_BYTES, "Size of query modification args in ClientMessages", &["partition_id"] ); pub fn log_query_modification_args_size(partition_id: u64, size: usize) { log_distribution_with_labels( &SYNC_QUERY_MODIFICATION_ARGS_BYTES, size as f64, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_MUTATION_ARGS_BYTES, "Size of mutation args in ClientMessages", &["partition_id"] ); pub fn log_mutation_args_size(partition_id: u64, size: usize) { log_distribution_with_labels( &SYNC_MUTATION_ARGS_BYTES, size as f64, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_ACTION_ARGS_BYTES, "Size of action args in ClientMessages", &["partition_id"] ); pub fn log_action_args_size(partition_id: u64, size: usize) { log_distribution_with_labels( &SYNC_ACTION_ARGS_BYTES, size as f64, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_counter!( SYNC_QUERY_FAILED_TOTAL, "Number of query failures", &["partition_id"] ); pub fn log_query_failed(partition_id: u64) { log_counter_with_labels( &SYNC_QUERY_FAILED_TOTAL, 1, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!(SYNC_QUERY_SET_TOTAL, "Size of query set", &["partition_id"]); pub fn log_query_set_size(partition_id: u64, num_queries: usize) { log_distribution_with_labels( &SYNC_QUERY_SET_TOTAL, num_queries as f64, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_counter!( SYNC_QUERY_RESULT_DEDUP_TOTAL, "Number of deduplicated query results", &["partition_id"] ); pub fn log_query_result_dedup(partition_id: u64, same_value: bool) { let sample = if same_value { 1 } else { 0 }; log_counter_with_labels( &SYNC_QUERY_RESULT_DEDUP_TOTAL, sample, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_counter!( SYNC_EMPTY_TRANSITION_TOTAL, "Number of empty transitions", &["partition_id"] ); pub fn log_empty_transition(partition_id: u64) { log_counter_with_labels( &SYNC_EMPTY_TRANSITION_TOTAL, 1, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_counter!( SYNC_CONNECT_TOTAL, "Number of new WS connections", &["partition_id", "reason"] ); register_convex_histogram!( SYNC_RECONNECT_PREV_CONNECTIONS, "How many previous connections happened on a given reconnect", &["partition_id"] ); pub fn log_connect(partition_id: u64, last_close_reason: String, connection_count: u32) { log_counter_with_labels( &SYNC_CONNECT_TOTAL, 1, vec![ StaticMetricLabel::new("reason", last_close_reason), StaticMetricLabel::new("partition_id", partition_id.to_string()), ], ); log_distribution_with_labels( &SYNC_RECONNECT_PREV_CONNECTIONS, connection_count.into(), vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_TRANSITION_MESSAGE_SIZE_BYTES, "Heap size of Transition messages sent to clients", &["partition_id"] ); pub fn log_transition_size(partition_id: u64, transition_heap_size: usize) { log_distribution_with_labels( &SYNC_TRANSITION_MESSAGE_SIZE_BYTES, transition_heap_size as f64, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_LINEARIZABILITY_DELAY_SECONDS, "How far behind the current backend is behind what the client has observed", &["partition_id"] ); pub fn log_linearizability_violation(partition_id: u64, delay_secs: f64) { log_distribution_with_labels( &SYNC_LINEARIZABILITY_DELAY_SECONDS, delay_secs, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, "Delay between receiving a client message over the web socket and processing it", &["partition_id"], ); pub fn log_process_client_message_delay(partition_id: u64, delay: Duration) { log_distribution_with_labels( &SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, delay.as_secs_f64(), vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_CLIENT_CONSTRUCT_TO_FIRST_MESSAGE_SECONDS, "Time from client construction to first message", &["partition_id"], ); pub fn log_client_construct_to_first_message_millis(partition_id: u64, ms: f64) { log_distribution_with_labels( &SYNC_CLIENT_CONSTRUCT_TO_FIRST_MESSAGE_SECONDS, ms / 1000.0, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_histogram!( SYNC_CLIENT_CONSTRUCT_TO_WEBSOCKET_OPENED_SECONDS, "Time from client construction to websocket open", &["partition_id"] ); pub fn log_client_construct_to_websocket_opened_millis(partition_id: u64, ms: f64) { log_distribution_with_labels( &SYNC_CLIENT_CONSTRUCT_TO_WEBSOCKET_OPENED_SECONDS, ms / 1000.0, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } register_convex_counter!( SYNC_CLIENT_METRICS_INCOMPLETE_TOTAL, "Number of incomplete metric reports from the client", &["partition_id"] ); pub fn log_client_metrics_incomplete(partition_id: u64) { log_counter_with_labels( &SYNC_CLIENT_METRICS_INCOMPLETE_TOTAL, 1, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } pub fn log_client_connect_timings(partition_id: u64, marks: Vec<ClientMark>) { let mut client_constructed: Option<f64> = None; let mut websocket_opened: Option<f64> = None; let mut first_message_received: Option<f64> = None; for mark in marks { match mark.name { ClientMarkName::ClientConstructed => client_constructed = Some(mark.start_time), ClientMarkName::WebSocketOpen => websocket_opened = Some(mark.start_time), ClientMarkName::FirstMessageReceived => first_message_received = Some(mark.start_time), } } match (client_constructed, websocket_opened, first_message_received) { (Some(construct), Some(opened), Some(first)) => { log_client_construct_to_first_message_millis(partition_id, first - construct); log_client_construct_to_websocket_opened_millis(partition_id, opened - construct); }, _ => { log_client_metrics_incomplete(partition_id); }, } } #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub enum ClientMarkName { /// When a client is constructed (the browser client, not the React client) ClientConstructed, /// When the browser/runtime runs `WebSocket.onopen` WebSocketOpen, /// When the client receives its first WebSocket message from the server FirstMessageReceived, } #[derive(Clone, Debug, PartialEq)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub struct ClientMark { pub name: ClientMarkName, pub start_time: f64, } impl TryFrom<JsonValue> for ClientMark { type Error = anyhow::Error; fn try_from(value: JsonValue) -> Result<Self, Self::Error> { let v: ClientMarkJson = serde_json::from_value(value)?; let client_mark = match v.name { ClientMarkNameJson::ClientConstructed => ClientMark { name: ClientMarkName::ClientConstructed, start_time: v.start_time, }, ClientMarkNameJson::WebSocketOpen => ClientMark { name: ClientMarkName::WebSocketOpen, start_time: v.start_time, }, ClientMarkNameJson::FirstMessageReceived => ClientMark { name: ClientMarkName::FirstMessageReceived, start_time: v.start_time, }, }; Ok(client_mark) } } pub fn log_client_transition(partition_id: u64, transition_transit_time: f64, message_length: f64) { log_distribution_with_labels( &SYNC_TRANSITION_TRANSIT_TIME_SECONDS, transition_transit_time / 1000.0, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); log_distribution_with_labels( &SYNC_TRANSITION_MESSAGE_LENGTH_BYTES, message_length, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); // Only log this for messages 100KB or larger so the portion due to clock skew // is smaller. if message_length >= 100_000.0 { log_distribution_with_labels( &SYNC_TRANSITION_BYTES_PER_SECOND, message_length / (transition_transit_time / 1000.0), vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } } #[derive(Clone, Debug)] pub enum TypedClientEvent { ClientConnect { marks: Vec<ClientMark>, }, ClientReceivedTransition { /// Time from the server sending the transition to the client fully /// receiving it (after finishing downloading it), corrected by /// an estimated clock skew observed when the client sends a /// smaller message in the other direction. transition_transit_time: f64, message_length: f64, }, } impl TryFrom<ClientEvent> for TypedClientEvent { type Error = anyhow::Error; fn try_from(value: ClientEvent) -> Result<Self, Self::Error> { match value.event_type.as_str() { "ClientConnect" => { let parsed_marks = if let JsonValue::Array(marks) = value.event { marks .into_iter() .map(ClientMark::try_from) .collect::<anyhow::Result<Vec<_>>>() } else { Err(anyhow::anyhow!("Client marks JSON is not an array")) }?; Ok(TypedClientEvent::ClientConnect { marks: parsed_marks, }) }, "ClientReceivedTransition" => { let event_data: ClientReceivedTransitionEvent = serde_json::from_value(value.event)?; Ok(TypedClientEvent::ClientReceivedTransition { transition_transit_time: event_data.transition_transit_time, message_length: event_data.message_length, }) }, _ => Err(anyhow::anyhow!("Unknown ClientEvent type")), } } } #[derive(Deserialize, Serialize)] #[serde(rename_all = "camelCase")] enum ClientMarkNameJson { ClientConstructed, WebSocketOpen, FirstMessageReceived, } #[derive(Deserialize, Serialize)] #[serde(rename_all = "camelCase")] struct ClientMarkJson { name: ClientMarkNameJson, start_time: f64, } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ClientReceivedTransitionEvent { transition_transit_time: f64, message_length: f64, } register_convex_histogram!( SYNC_TRANSITION_TRANSIT_TIME_SECONDS, "Time for transition to transit from client (corrected by an estimated clock skew)", &["partition_id"] ); register_convex_histogram!( SYNC_TRANSITION_MESSAGE_LENGTH_BYTES, "Length of transition message from client", &["partition_id"] ); register_convex_histogram!( SYNC_TRANSITION_BYTES_PER_SECOND, "Length of transition message over server-to-client transit time, from client", &["partition_id"] ); register_convex_histogram!( SYNC_QUERY_INVALIDATION_LAG_SECONDS, "Time between an invalidating write and a query being rerun", &["partition_id"] ); register_convex_counter!( SYNC_QUERY_INVALIDATION_LAG_UNKNOWN_TOTAL, "Count of query subscriptions invalidated where the correspoding invalidating write timestamp \ was unknown", &["partition_id"] ); pub fn log_query_invalidated( partition_id: u64, invalid_ts: Option<Timestamp>, current_ts: Timestamp, ) { if let Some(invalid_ts) = invalid_ts { log_distribution_with_labels( &SYNC_QUERY_INVALIDATION_LAG_SECONDS, current_ts.secs_since_f64(invalid_ts), vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } else { log_counter_with_labels( &SYNC_QUERY_INVALIDATION_LAG_UNKNOWN_TOTAL, 1, vec![StaticMetricLabel::new( "partition_id", partition_id.to_string(), )], ); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server