Skip to main content
Glama
operation_source.rs7.7 kB
use std::{ collections::HashMap, fs, path::PathBuf, sync::{Arc, Mutex}, }; use apollo_mcp_registry::{ files, platform_api::operation_collections::{ collection_poller::CollectionSource, event::CollectionEvent, }, uplink::persisted_queries::{ManifestSource, event::Event as ManifestEvent}, }; use futures::{Stream, StreamExt as _}; use tracing::warn; use crate::event::Event; use super::RawOperation; const OPERATION_DOCUMENT_EXTENSION: &str = "graphql"; /// The source of the operations exposed as MCP tools #[derive(Clone, Debug)] pub enum OperationSource { /// GraphQL document files Files(Vec<PathBuf>), /// Persisted Query manifest Manifest(ManifestSource), /// Operation collection Collection(CollectionSource), /// No operations provided None, } impl OperationSource { #[tracing::instrument(skip_all, fields(operation_source = ?self))] pub async fn into_stream(self) -> impl Stream<Item = Event> { match self { OperationSource::Files(paths) => Self::stream_file_changes(paths).boxed(), OperationSource::Manifest(manifest_source) => manifest_source .into_stream() .await .map(|event| { let ManifestEvent::UpdateManifest(operations) = event; Event::OperationsUpdated( operations.into_iter().map(RawOperation::from).collect(), ) }) .boxed(), OperationSource::Collection(collection_source) => collection_source .into_stream() .map(|event| match event { CollectionEvent::UpdateOperationCollection(operations) => { match operations .iter() .map(RawOperation::try_from) .collect::<Result<Vec<_>, _>>() { Ok(operations) => Event::OperationsUpdated(operations), Err(e) => Event::CollectionError(e), } } CollectionEvent::CollectionError(error) => Event::CollectionError(error), }) .boxed(), OperationSource::None => { futures::stream::once(async { Event::OperationsUpdated(vec![]) }).boxed() } } } #[tracing::instrument] fn stream_file_changes(paths: Vec<PathBuf>) -> impl Stream<Item = Event> { let path_count = paths.len(); let state = Arc::new(Mutex::new(HashMap::<PathBuf, Vec<RawOperation>>::new())); futures::stream::select_all(paths.into_iter().map(|path| { let state = Arc::clone(&state); files::watch(path.as_ref()) .filter_map(move |_| { let path = path.clone(); let state = Arc::clone(&state); async move { let mut operations = Vec::new(); if path.is_dir() { // Handle a directory if let Ok(entries) = fs::read_dir(&path) { for entry in entries.flatten() { let entry_path = entry.path(); if entry_path.extension().and_then(|e| e.to_str()) == Some(OPERATION_DOCUMENT_EXTENSION) { match fs::read_to_string(&entry_path) { Ok(content) => { // Be forgiving of empty files in the directory case. // It likely means a new file was created in an editor, // but the operation hasn't been written yet. if !content.trim().is_empty() { operations.push(RawOperation::from(( content, entry_path.to_str().map(|s| s.to_string()), ))); } } Err(e) => { return Some(Event::OperationError( e, path.to_str().map(|s| s.to_string()), )); } } } } } } else { // Handle a single file match fs::read_to_string(&path) { Ok(content) => { if !content.trim().is_empty() { operations.push(RawOperation::from(( content, path.to_str().map(|s| s.to_string()), ))); } else { warn!(?path, "Empty operation file"); } } Err(e) => { return Some(Event::OperationError( e, path.to_str().map(|s| s.to_string()), )); } } } match state.lock() { Ok(mut state) => { state.insert(path.clone(), operations); // All paths send an initial event on startup. To avoid repeated // operation events on startup, wait until all paths have been // loaded, then send a single event with the operations for all // paths. if state.len() == path_count { Some(Event::OperationsUpdated( state.values().flatten().cloned().collect::<Vec<_>>(), )) } else { None } } Err(_) => Some(Event::OperationError( std::io::Error::other("State mutex poisoned"), path.to_str().map(|s| s.to_string()), )), } } }) .boxed() })) .boxed() } } impl From<ManifestSource> for OperationSource { fn from(manifest_source: ManifestSource) -> Self { OperationSource::Manifest(manifest_source) } } impl From<Vec<PathBuf>> for OperationSource { fn from(paths: Vec<PathBuf>) -> Self { OperationSource::Files(paths) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apollographql/apollo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server