Skip to main content
Glama
pg.rs8.28 kB
use std::{ collections::HashMap, sync::Arc, }; use si_data_pg::{ PgPool, PgPoolConfig, PgRow, postgres_types::ToSql, }; use telemetry::tracing::info; use telemetry_utils::monotonic; use crate::error::LayerDbResult; mod embedded { use refinery::embed_migrations; embed_migrations!("./src/migrations"); } pub const DBNAME: &str = "si_layer_db"; pub const APPLICATION_NAME: &str = "si-layer-db"; pub fn default_pg_pool_config() -> PgPoolConfig { PgPoolConfig { dbname: DBNAME.into(), application_name: APPLICATION_NAME.into(), ..Default::default() } } #[derive(Clone, Debug)] pub struct PgLayer { pool: Arc<PgPool>, pub table_name: String, delete_query: String, get_value_query: String, get_value_by_prefix_query: String, get_value_many_query: String, get_most_recent_query: String, insert_value_query: String, contains_key_query: String, search_query: String, } impl PgLayer { pub fn new(pg_pool: PgPool, table_name: impl Into<String>) -> Self { let table_name = table_name.into(); Self { pool: Arc::new(pg_pool), delete_query: format!("DELETE FROM {table_name} WHERE key = $1"), get_value_query: format!("SELECT value FROM {table_name} WHERE key = $1 LIMIT 1"), get_value_by_prefix_query: format!( "SELECT key, value FROM {table_name} WHERE key like $1" ), get_value_many_query: format!( "SELECT key, value FROM {table_name} WHERE key = any($1)" ), get_most_recent_query: format!( "SELECT key, value FROM {table_name} ORDER BY created_at LIMIT $1" ), insert_value_query: format!( "INSERT INTO {table_name} (key, sort_key, value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING" ), contains_key_query: format!("SELECT key FROM {table_name} WHERE key = $1 LIMIT 1"), search_query: format!("SELECT value FROM {table_name} WHERE sort_key LIKE $1"), table_name, } } pub async fn migrate(&self) -> LayerDbResult<()> { self.pool.migrate(embedded::migrations::runner()).await?; Ok(()) } pub async fn get(&self, key: &str) -> LayerDbResult<Option<Vec<u8>>> { use std::time::Instant; use telemetry_utils::histogram; let start = Instant::now(); let key: String = key.into(); let client = self.pool.get().await?; let maybe_row = client.query_opt(&self.get_value_query, &[&key]).await?; match maybe_row { Some(row) => { monotonic!(layer_cache.hit.pg = 1); let value = row.get("value"); histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.table_name.as_str(), backend = "postgres", result = "hit" ); Ok(Some(value)) } None => { histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.table_name.as_str(), backend = "postgres", result = "miss" ); Ok(None) } } } pub async fn get_raw( &self, query: &str, params: &[&(dyn ToSql + Sync)], ) -> LayerDbResult<Option<PgRow>> { let client = self.pool.get().await?; match client.query_opt(query, params).await? { Some(row) => Ok(Some(row)), None => Ok(None), } } pub async fn get_many( &self, keys: &[Arc<str>], ) -> LayerDbResult<Option<HashMap<String, Vec<u8>>>> { use std::time::Instant; use telemetry_utils::histogram; let start = Instant::now(); let mut result = HashMap::new(); let client = self.pool.get().await?; let key_refs: Vec<&str> = keys.iter().map(|key_arc| key_arc.as_ref()).collect(); for row in client .query(&self.get_value_many_query, &[&key_refs]) .await? { monotonic!(layer_cache.hit.pg = 1); let key = row.get::<&str, String>("key").to_owned(); let value = row.get::<&str, Vec<u8>>("value"); // Emit metric for each key found (hit) histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.table_name.as_str(), backend = "postgres", result = "hit" ); result.insert(key, value); } if result.is_empty() { // All keys missed - emit one miss metric histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.table_name.as_str(), backend = "postgres", result = "miss" ); return Ok(None); } Ok(Some(result)) } pub async fn query( &self, query: &str, params: &[&(dyn ToSql + Sync)], ) -> LayerDbResult<Option<Vec<PgRow>>> { let client = self.pool.get().await?; Ok(Some(client.query(query, params).await?)) } pub async fn query_opt( &self, query: &str, params: &[&(dyn ToSql + Sync)], ) -> LayerDbResult<Option<PgRow>> { let client = self.pool.get().await?; Ok(client.query_opt(query, params).await?) } pub async fn get_many_by_prefix( &self, key: &str, ) -> LayerDbResult<Option<HashMap<String, Vec<u8>>>> { let mut result = HashMap::new(); let client = self.pool.get().await?; for row in client .query(&self.get_value_by_prefix_query, &[&format!("{}%", &key)]) .await? { result.insert( row.get::<&str, String>("key").to_owned(), row.get::<&str, Vec<u8>>("value"), ); } if result.is_empty() { return Ok(None); } Ok(Some(result)) } pub async fn get_most_recent( &self, limit: i64, ) -> LayerDbResult<Option<HashMap<String, Vec<u8>>>> { let mut result = HashMap::new(); let client = self.pool.get().await?; for row in client.query(&self.get_most_recent_query, &[&limit]).await? { result.insert( row.get::<&str, String>("key").to_owned(), row.get::<&str, Vec<u8>>("value"), ); } if result.is_empty() { return Ok(None); } Ok(Some(result)) } pub async fn search(&self, sort_key_like: impl AsRef<str>) -> LayerDbResult<Vec<Vec<u8>>> { let sort_key_like = sort_key_like.as_ref(); let client = self.pool.get().await?; let rows = client.query(&self.search_query, &[&sort_key_like]).await?; Ok(rows.into_iter().map(|r| r.get("value")).collect()) } pub async fn insert( &self, key: &str, sort_key: impl AsRef<str>, value: &[u8], ) -> LayerDbResult<()> { let client = self.pool.get().await?; let sort_key = sort_key.as_ref(); client .query(&self.insert_value_query, &[&key, &sort_key, &value]) .await?; Ok(()) } pub async fn insert_raw( &self, query: &str, params: &[&(dyn ToSql + Sync)], ) -> LayerDbResult<()> { let client = self.pool.get().await?; client.query(query, params).await?; Ok(()) } pub async fn delete(&self, key: &str) -> LayerDbResult<()> { let client = self.pool.get().await?; client.query(&self.delete_query, &[&key]).await?; Ok(()) } pub async fn contains_key(&self, key: &str) -> LayerDbResult<bool> { let client = self.pool.get().await?; let maybe_row = client.query_opt(&self.contains_key_query, &[&key]).await?; Ok(maybe_row.is_some()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server