Skip to main content
Glama
lib.rs17.4 kB
// #![warn // clippy::unwrap_in_result, // clippy::unwrap_used, // clippy::panic, // clippy::missing_panics_doc, // clippy::panic_in_result_fn // )] // #![allow(clippy::missing_errors_doc)] use std::{ io, net::ToSocketAddrs, result, sync::Arc, }; use futures::TryStreamExt; use serde::{ Deserialize, Serialize, }; use si_std::SensitiveString; use spicedb_client::{ SpicedbClient, builder::WriteRelationshipsRequestBuilder, types::ConsistencyRequirement, }; use spicedb_grpc::authzed::api::v1::{ Consistency, LookupSubjectsRequest, ObjectReference, WriteRelationshipsRequest, relationship_update::Operation, }; use telemetry::prelude::*; use thiserror::Error; use tonic::Code; use url::Url; mod types; pub use types::{ Permission, ReadSchemaResponse, Relationship, Relationships, SpiceDBObject, ZedToken, }; #[remain::sorted] #[derive(Error, Debug)] pub enum Error { #[error("error connecting to spicedb at {1}: {0}")] Connection(#[source] spicedb_client::result::Error, Url), #[error("spicedb endpoint has no host part: {0}")] EndpointNoHost(Url), #[error("cannot determine spicedb endpoint port number: {0}")] EndpointUnknownPort(Url), #[error("GRPC streaming error: {0}")] GRPC(#[source] spicedb_client::result::Error), #[error("error resolving ip addr for spicedb endpoint hostname: {0}")] ResolveHostname(#[source] io::Error), #[error("resolved hostname returned no entries")] ResolveHostnameNoEntries, #[error("spicedb client error: {0}")] SpiceDb(#[from] spicedb_client::result::Error), #[error("tokio task join error: {0}")] TokioJoin(#[from] tokio::task::JoinError), } pub type SpiceDbError = Error; pub type SpiceDbClient = Client; type Result<T> = result::Result<T, Error>; #[derive(Clone)] pub struct Client { inner: SpicedbClient, metadata: Arc<ConnectionMetadata>, } impl Client { #[instrument( name = "spicedb_client::new", level = "debug", skip_all, fields( db.connection_string = Empty, db.system = Empty, network.peer.address = Empty, network.protocol.name = Empty, network.transport = Empty, otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = Empty, server.port = Empty, ), )] pub async fn new(config: &SpiceDbConfig) -> Result<Self> { let span = current_span_for_instrument_at!("debug"); let db_system = "spicedb"; let db_connection_string = config.endpoint.to_string(); let network_transport = "ip_tcp"; let network_protocol_name = "spicedb"; let server_port = config .endpoint .port_or_known_default() .ok_or(Error::EndpointUnknownPort(config.endpoint.clone())) .map_err(|err| span.record_err(err))?; let server_address = match config.endpoint.host() { Some(url::Host::Domain(domain)) => { let domain = domain.to_owned(); // Resolve hostname to an IP address tokio::task::spawn_blocking(move || { (format!("{domain}:{server_port}")) .to_socket_addrs() .map_err(Error::ResolveHostname) .and_then(|mut iter| iter.next().ok_or(Error::ResolveHostnameNoEntries)) .map(|socket_addr| socket_addr.ip().to_string()) }) .await .map_err(|err| span.record_err(err))? .map_err(|err| span.record_err(err))? } Some(url::Host::Ipv4(addr)) => addr.to_string(), Some(url::Host::Ipv6(addr)) => addr.to_string(), None => return Err(span.record_err(Error::EndpointNoHost(config.endpoint.clone()))), }; let metadata = ConnectionMetadata { db_system, db_connection_string, network_peer_address: server_address.clone(), network_protocol_name, network_transport, server_address, server_port, }; span.record( "db.connection_string", metadata.db_connection_string.as_str(), ); span.record("db.system", metadata.db_system); span.record( "network.peer.address", metadata.network_peer_address.as_str(), ); span.record("network.protocol.name", metadata.network_protocol_name); span.record("network.transport", metadata.network_transport); span.record("server.address", metadata.server_address.as_str()); span.record("server.port", metadata.server_port); let mut inner = SpicedbClient::from_url_and_preshared_key( config.endpoint.to_string(), config.preshared_key.as_str(), ) .await .map_err(|err| span.record_err(Error::Connection(err, config.endpoint.clone())))?; // validate the connection. There is no native way to do this in spicedb, but if we can // either read a schema or be told that there is no schema, we know we can talk to spicedb match inner.read_schema().await { Ok(_) => (), // connection good and schema exists Err(spicedb_client::result::Error::TonicStatus(status)) if status.code() == Code::NotFound && status.message().contains("No schema has been defined") => {} // connection good but no schema Err(err) => return Err(Error::Connection(err, config.endpoint.clone())), // connection failed } info!("Connected to spicedb!"); span.record_ok(); Ok(Self { inner, metadata: Arc::new(metadata), }) } #[instrument( name = "spicedb_client::new_sync", level = "debug", skip_all, fields( db.connection_string = Empty, db.system = Empty, network.peer.address = Empty, network.protocol.name = Empty, network.transport = Empty, otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = Empty, server.port = Empty, ), )] pub fn new_sync(config: &SpiceDbConfig) -> Result<Self> { futures::executor::block_on(Self::new(config)) } #[instrument( name = "spicedb_client.read_schema", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn read_schema(&mut self) -> Result<ReadSchemaResponse> { let span = current_span_for_instrument_at!("debug"); let resp = self .inner .read_schema() .await .map_err(|err| span.record_err(Error::SpiceDb(err)))? .into(); span.record_ok(); Ok(resp) } #[instrument( name = "spicedb_client.write_schema", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn write_schema(&mut self, schema: impl ToString) -> Result<Option<ZedToken>> { let span = current_span_for_instrument_at!("debug"); let resp = self .inner .write_schema(schema) .await .map_err(|err| span.record_err(Error::SpiceDb(err)))? .written_at .map(|value| value.into()); span.record_ok(); Ok(resp) } #[instrument( name = "spicedb_client.read_relationships", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn read_relationship(&mut self, relationship: Relationship) -> Result<Relationships> { let span = current_span_for_instrument_at!("debug"); let mut relationships = vec![]; let results: result::Result<Vec<_>, _> = self .inner .read_relationships(relationship.into_read_request()) .await? .try_collect() .await; for r in results.map_err(|e| span.record_err(Error::GRPC(e.into())))? { if let Some(relationship) = r.relationship { relationships.push(relationship.into()); } } span.record_ok(); Ok(relationships) } #[instrument( name = "spicedb_client.create_relationships", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn create_relationships( &mut self, relationships: Relationships, ) -> Result<Option<ZedToken>> { self.update_relationships(relationships, Operation::Create) .await } #[instrument( name = "spicedb_client.delete_relationships", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn delete_relationships( &mut self, relationships: Relationships, ) -> Result<Option<ZedToken>> { self.update_relationships(relationships, Operation::Delete) .await } async fn update_relationships( &mut self, relationships: Relationships, operation: Operation, ) -> Result<Option<ZedToken>> { let span = current_span_for_instrument_at!("debug"); let request: WriteRelationshipsRequest = WriteRelationshipsRequest::new( relationships .into_iter() .map(|r| r.into_relationship_update(operation)), ); let resp = self .inner .write_relationships(request) .await .map_err(|err| span.record_err(Error::SpiceDb(err)))? .written_at .map(|value| value.into()); span.record_ok(); Ok(resp) } #[instrument( name = "spicedb_client.check_permissions", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn check_permissions(&mut self, permission: Permission) -> Result<bool> { let span = current_span_for_instrument_at!("debug"); let resp = self .inner .check_permission(permission.into_request()) .await .map_err(|err| span.record_err(Error::SpiceDb(err)))? .permissionship; span.record_ok(); Ok(Permission::has_permission(resp)) } #[instrument( name = "spicedb_client.lookup_subjects", level = "debug", skip_all, fields( db.connection_string = %self.metadata.db_connection_string(), db.system = %self.metadata.db_system(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ), )] pub async fn lookup_subjects( &mut self, resource_type: String, resource_id: String, permission: String, object_type: String, ) -> Result<Vec<String>> { let mut subjects = vec![]; let span = current_span_for_instrument_at!("debug"); let consistency = ConsistencyRequirement::MinimizeLatency(true); let resource_ref = ObjectReference { object_type: resource_type, object_id: resource_id, }; let lookup_request = LookupSubjectsRequest { resource: Some(resource_ref), permission, consistency: Some(Consistency { requirement: Some(consistency), }), subject_object_type: object_type, ..Default::default() }; let results: result::Result<Vec<_>, _> = self .inner .lookup_subjects(lookup_request) .await? .try_collect() .await; for r in results.map_err(|e| span.record_err(Error::GRPC(e.into())))? { if let Some(subject) = r.subject { subjects.push(subject.subject_object_id); } } span.record_ok(); Ok(subjects) } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SpiceDbConfig { pub enabled: bool, pub endpoint: Url, pub preshared_key: SensitiveString, } impl Default for SpiceDbConfig { fn default() -> Self { Self { enabled: false, endpoint: Url::parse("http://localhost:50051").expect("string is a valid URL"), preshared_key: SensitiveString::from("hobgoblin"), } } } #[derive(Clone, Debug)] pub struct ConnectionMetadata { db_system: &'static str, db_connection_string: String, network_peer_address: String, network_protocol_name: &'static str, network_transport: &'static str, server_address: String, server_port: u16, } impl ConnectionMetadata { pub fn db_system(&self) -> &str { self.db_system } pub fn db_connection_string(&self) -> &str { &self.db_connection_string } pub fn network_peer_address(&self) -> &str { &self.network_peer_address } pub fn network_protocol_name(&self) -> &str { self.network_protocol_name } pub fn network_transport(&self) -> &str { self.network_transport } pub fn server_address(&self) -> &str { &self.server_address } pub fn server_port(&self) -> u16 { self.server_port } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server