Skip to main content
Glama
process_gatherer.rs3.22 kB
use std::{ collections::HashSet, result, }; use nix::unistd::{ Pid, getpgrp, }; #[cfg(target_os = "linux")] use procfs::process::all_processes; use telemetry::{ prelude::info, tracing::debug, }; use telemetry_utils::metric; use thiserror::Error; use tokio::{ sync::{ Mutex, mpsc, }, time::{ Duration, sleep, }, }; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; type Result<T> = result::Result<T, ProcessGathererError>; #[remain::sorted] #[derive(Debug, Error)] pub enum ProcessGathererError { #[error("failed to get processes: {0}")] Process(#[from] procfs::ProcError), #[error("shutdown error: {0}")] Shutdown(#[from] mpsc::error::SendError<CancellationToken>), } pub struct ProcessGathererTask { client: ProcessGatherer, shutdown_token: CancellationToken, } impl ProcessGathererTask { const NAME: &'static str = "ProcessGathererTask"; fn create(client: ProcessGatherer, shutdown_token: CancellationToken) -> Result<Self> { Ok(Self { client, shutdown_token, }) } async fn run(mut self) { loop { tokio::select! { _ = self.shutdown_token.cancelled() => { debug!(task = Self::NAME, "received cancellation"); self.client.stop().await; break; } _ = self.client.start() => { } else => { break; } } } } } #[derive(Default)] pub struct ProcessGatherer { procs: Mutex<HashSet<String>>, } impl ProcessGatherer { pub fn new() -> Self { let procs = HashSet::new().into(); Self { procs } } pub async fn start(&mut self) -> Result<()> { loop { { let mut procs = self.procs.lock().await; for proc in all_processes()?.flatten() { if let Ok(stat) = proc.stat() { if Pid::from_raw(stat.pgrp) == getpgrp() { procs.insert(stat.comm); } } } } sleep(Duration::from_millis(100)).await; } } pub async fn stop(&mut self) { for proc in self.procs.lock().await.iter() { metric!(counter.cyclone.process = 1, proc = proc); info!(counter.cyclone.process = 1, proc = proc); } } } #[must_use] pub struct ProcessGathererShutdownGuard { shutdown_token: CancellationToken, } impl ProcessGathererShutdownGuard { pub async fn wait(self) -> Result<()> { self.shutdown_token.cancelled().await; Ok(()) } } pub fn init( enable: bool, tracker: &TaskTracker, shutdown_token: CancellationToken, ) -> Result<ProcessGathererShutdownGuard> { let client = ProcessGatherer::new(); let guard = ProcessGathererShutdownGuard { shutdown_token: shutdown_token.clone(), }; if enable { tracker.spawn(ProcessGathererTask::create(client, shutdown_token.clone())?.run()); } Ok(guard) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server