Skip to main content
Glama
manifest_poller.rs8.55 kB
use std::collections::HashMap; use std::path::PathBuf; use std::pin::Pin; use super::event::Event; use crate::uplink::UplinkConfig; use crate::uplink::persisted_queries::manifest::PersistedQueryManifest; use crate::uplink::persisted_queries::manifest::SignedUrlChunk; use crate::uplink::persisted_queries::{ MaybePersistedQueriesManifestChunks, PersistedQueriesManifestChunk, PersistedQueriesManifestQuery, }; use crate::uplink::stream_from_uplink_transforming_new_response; use futures::prelude::*; use reqwest::Client; use tokio::fs::read_to_string; use tower::BoxError; /// Holds the current state of persisted queries #[derive(Debug)] pub struct PersistedQueryManifestPollerState { /// The current persisted query manifest pub persisted_query_manifest: PersistedQueryManifest, } #[derive(Clone, Debug)] pub enum ManifestSource { LocalStatic(Vec<PathBuf>), LocalHotReload(Vec<PathBuf>), Uplink(UplinkConfig), } impl ManifestSource { pub async fn into_stream(self) -> impl Stream<Item = Event> { match create_manifest_stream(self).await { Ok(stream) => stream .map(|result| match result { Ok(manifest) => Event::UpdateManifest( manifest .iter() .map(|(k, v)| (k.operation_id.clone(), v.clone())) .collect(), ), Err(e) => { tracing::error!("error from manifest stream: {}", e); Event::UpdateManifest(vec![]) } }) .boxed(), Err(e) => { tracing::error!("failed to create manifest stream: {}", e); futures::stream::empty().boxed() } } } } async fn manifest_from_uplink_chunks( new_chunks: Vec<PersistedQueriesManifestChunk>, http_client: Client, ) -> Result<PersistedQueryManifest, BoxError> { let mut new_persisted_query_manifest = PersistedQueryManifest::default(); tracing::debug!("ingesting new persisted queries: {:?}", &new_chunks); // TODO: consider doing these fetches in parallel for new_chunk in new_chunks { fetch_chunk_into_manifest( new_chunk, &mut new_persisted_query_manifest, http_client.clone(), ) .await? } tracing::debug!( "Loaded {} persisted queries.", new_persisted_query_manifest.len() ); Ok(new_persisted_query_manifest) } async fn fetch_chunk_into_manifest( chunk: PersistedQueriesManifestChunk, manifest: &mut PersistedQueryManifest, http_client: Client, ) -> Result<(), BoxError> { let mut it = chunk.urls.iter().peekable(); while let Some(chunk_url) = it.next() { match fetch_chunk(http_client.clone(), chunk_url).await { Ok(chunk) => { manifest.add_chunk(&chunk); return Ok(()); } Err(e) => { if it.peek().is_some() { // There's another URL to try, so log as debug and move on. tracing::debug!( "failed to fetch persisted query list chunk from {}: {}. \ Other endpoints will be tried", chunk_url, e ); continue; } else { // No more URLs; fail the function. return Err(e); } } } } // The loop always returns unless there's another iteration after it, so the // only way we can fall off the loop is if we never entered it. Err("persisted query chunk did not include any URLs to fetch operations from".into()) } async fn fetch_chunk(http_client: Client, chunk_url: &String) -> Result<SignedUrlChunk, BoxError> { let chunk = http_client .get(chunk_url.clone()) .send() .await .and_then(|r| r.error_for_status()) .map_err(|e| -> BoxError { format!("error fetching persisted queries manifest chunk from {chunk_url}: {e}").into() })? .json::<SignedUrlChunk>() .await .map_err(|e| -> BoxError { format!("error reading body of persisted queries manifest chunk from {chunk_url}: {e}") .into() })?; chunk.validate() } /// A stream of manifest updates type ManifestStream = dyn Stream<Item = Result<PersistedQueryManifest, BoxError>> + Send + 'static; async fn create_manifest_stream( source: ManifestSource, ) -> Result<Pin<Box<ManifestStream>>, BoxError> { match source { ManifestSource::LocalStatic(paths) => Ok(stream::once(load_local_manifests(paths)).boxed()), ManifestSource::LocalHotReload(paths) => Ok(create_hot_reload_stream(paths).boxed()), ManifestSource::Uplink(uplink_config) => { let client = Client::builder() .timeout(uplink_config.timeout) .gzip(true) .build()?; Ok(create_uplink_stream(uplink_config, client).boxed()) } } } async fn load_local_manifests(paths: Vec<PathBuf>) -> Result<PersistedQueryManifest, BoxError> { let mut complete_manifest = PersistedQueryManifest::default(); for path in paths.iter() { let raw_file_contents = read_to_string(path).await.map_err(|e| -> BoxError { format!( "Failed to read persisted query list file at path: {}, {}", path.to_string_lossy(), e ) .into() })?; let chunk = SignedUrlChunk::parse_and_validate(&raw_file_contents)?; complete_manifest.add_chunk(&chunk); } tracing::debug!( "Loaded {} persisted queries from local files.", complete_manifest.len() ); Ok(complete_manifest) } fn create_uplink_stream( uplink_config: UplinkConfig, http_client: Client, ) -> impl Stream<Item = Result<PersistedQueryManifest, BoxError>> { stream_from_uplink_transforming_new_response::< PersistedQueriesManifestQuery, MaybePersistedQueriesManifestChunks, Option<PersistedQueryManifest>, >(uplink_config, move |response| { let http_client = http_client.clone(); Box::new(Box::pin(async move { match response { Some(chunks) => manifest_from_uplink_chunks(chunks, http_client) .await .map(Some) .map_err(|e| -> BoxError { e }), None => Ok(None), } })) }) .filter_map(|result| async move { match result { Ok(Some(manifest)) => Some(Ok(manifest)), Ok(None) => Some(Ok(PersistedQueryManifest::default())), Err(e) => Some(Err(e.into())), } }) } fn create_hot_reload_stream( paths: Vec<PathBuf>, ) -> impl Stream<Item = Result<PersistedQueryManifest, BoxError>> { // Create file watchers for each path let file_watchers = paths.into_iter().map(|raw_path| { crate::files::watch(raw_path.as_ref()).then(move |_| { let path = raw_path.clone(); async move { match read_to_string(&path).await { Ok(raw_file_contents) => { match SignedUrlChunk::parse_and_validate(&raw_file_contents) { Ok(chunk) => Ok((path, chunk)), Err(e) => Err(e), } } Err(e) => Err(e.into()), } } .boxed() }) }); // We need to keep track of the local manifest chunks so we can replace them when // they change. let mut chunks: HashMap<String, SignedUrlChunk> = HashMap::new(); // Combine all watchers into a single stream stream::select_all(file_watchers).map(move |result| { result.map(|(path, chunk)| { tracing::debug!( "hot reloading persisted query manifest file at path: {}", path.to_string_lossy() ); chunks.insert(path.to_string_lossy().to_string(), chunk); let mut manifest = PersistedQueryManifest::default(); for chunk in chunks.values() { manifest.add_chunk(chunk); } manifest }) }) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apollographql/apollo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server