Skip to main content
Glama
local_uds.rs27.4 kB
use std::{ io, path::{ Path, PathBuf, }, result, sync::Arc, time::Duration, }; use async_trait::async_trait; use bollard::{ Docker, container::{ Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions, }, errors::Error, models::{ HostConfig, Mount, MountTypeEnum, }, }; use cyclone_client::{ Client, ClientConfig, ClientError, Connection, CycloneClient, Execution, LivenessStatus, PingExecution, ReadinessStatus, UdsClient, UnixStream, Watch, WatchError, WatchStarted, new_unstarted_execution, }; use cyclone_core::{ CanonicalCommand, CycloneRequest, CycloneRequestable, process::{ self, ShutdownError, }, }; use derive_builder::Builder; use futures::StreamExt; use rand::{ Rng, distributions::Alphanumeric, thread_rng, }; use serde::{ Deserialize, Serialize, }; #[cfg(target_os = "linux")] use si_firecracker::{ errors::FirecrackerJailError, firecracker::FirecrackerJail, }; use tempfile::{ NamedTempFile, TempPath, }; use thiserror::Error; use tokio::{ io::{ AsyncRead, AsyncWrite, }, process::{ Child, Command, }, sync::oneshot, time, }; use tracing::{ debug, trace, }; use crate::instance::{ Instance, Spec, SpecBuilder, }; /// Error type for [`LocalUdsInstance`]. #[remain::sorted] #[derive(Debug, Error)] pub enum LocalUdsInstanceError { /// Spec builder error. #[error("builder error: {0}")] Builder(#[from] LocalUdsInstanceSpecBuilderError), /// Error when waiting for child process to shutdown. #[error("child shutdown error: {0}")] ChildShutdown(#[from] ShutdownError), /// Failed to spawn a child process. #[error("failed to spawn cyclone child process: {0}")] ChildSpawn(#[source] io::Error), /// Cyclone client error. #[error("client error: {0}")] Client(#[from] Box<ClientError>), /// Failed to build a container. #[error("failed to build a cyclone container: {0}")] ContainerBuild(#[source] Error), /// Failed to run a container. #[error("failed to spawn cyclone container: {0}")] ContainerRun(#[source] Error), /// Error when shutting down a container. #[error("container shutdown error: {0}")] ContainerShutdown(#[from] Error), /// Docker api not found #[error("no docker api")] DockerAPINotFound, #[cfg(target_os = "linux")] /// Failed to firecracker jail. #[error("firecracker error: {0}")] Firecracker(#[from] FirecrackerJailError), /// Failed to create firecracker-setup file. #[error("failed to create firecracker-setup file: {0}")] FirecrackerSetupCreate(#[source] io::Error), /// Failed to set permissions on the firecracker-setup file. #[error("failed to set permissions on the firecracker-setup file: {0}")] FirecrackerSetupPermissions(#[source] io::Error), /// Failed to run firecracker-setup file. #[error("failed to run firecracker-setup file: {0}")] FirecrackerSetupRun(String), /// Failed to write to firecracker-setup file. #[error("failed to write to firecracker-setup file: {0}")] FirecrackerSetupWrite(#[source] io::Error), /// Instance has exhausted its predefined request count. #[error("no remaining requests, cyclone server is considered unhealthy")] NoRemainingRequests, /// Failed to setup the host correctly. #[error("failed to setup host")] SetupFailed, /// Failed to create socket from temporary file. #[error("failed to create temp socket: {0}")] TempSocket(#[source] io::Error), /// Cyclone client `watch` endpoint error. #[error("watch error: {0}")] Watch(#[from] Box<WatchError>), /// Cyclone client `watch` session ended earlier than expected. #[error("server closed watch session before expected")] WatchClosed, /// Cyclone client initial `watch` session connection with retries timed out. #[error("timeout while retrying to start a client watch session")] WatchInitTimeout, /// Cyclone client `watch` session shut down earlier than expected. #[error("watch session is shut down, cyclone server is considered unhealthy")] WatchShutDown, } impl From<ClientError> for LocalUdsInstanceError { fn from(value: ClientError) -> Self { Box::new(value).into() } } impl From<WatchError> for LocalUdsInstanceError { fn from(value: WatchError) -> Self { Box::new(value).into() } } type Result<T> = result::Result<T, LocalUdsInstanceError>; /// A local Cyclone [`Instance`], managed as a spawned child process, communicating over a Unix /// domain socket ("Uds"). pub struct LocalUdsInstance { // The `TempPath` type is kept around as an [RAII // guard](https://rust-unofficial.github.io/patterns/patterns/behavioural/RAII.html), that is, // when `LocalUdsInstance` is dropped, the temp file is marked for deletion. _temp_path: Option<TempPath>, client: UdsClient, limit_requests: Option<u32>, runtime: Box<dyn LocalInstanceRuntime>, watch_shutdown_tx: oneshot::Sender<()>, } // TODO(nick): make this more useful. impl std::fmt::Debug for LocalUdsInstance { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LocalUdsInstance").finish() } } #[async_trait] impl Instance for LocalUdsInstance { type SpecBuilder = LocalUdsInstanceSpecBuilder; type Error = LocalUdsInstanceError; async fn terminate(&mut self) -> result::Result<(), Self::Error> { self.runtime.terminate().await } async fn ensure_healthy(&mut self) -> result::Result<(), Self::Error> { self.ensure_healthy_client().await?; Ok(()) } fn id(&self) -> u32 { self.runtime.id() } } #[async_trait] impl CycloneClient<UnixStream> for LocalUdsInstance { async fn watch(&mut self) -> result::Result<Watch<UnixStream>, ClientError> { self.ensure_healthy_client() .await .map_err(ClientError::unhealthy)?; self.client.watch().await } async fn liveness(&mut self) -> result::Result<LivenessStatus, ClientError> { self.ensure_healthy_client() .await .map_err(ClientError::unhealthy)?; self.client.liveness().await } async fn readiness(&mut self) -> result::Result<ReadinessStatus, ClientError> { self.ensure_healthy_client() .await .map_err(ClientError::unhealthy)?; self.client.readiness().await } async fn execute_ping(&mut self) -> result::Result<PingExecution<UnixStream>, ClientError> { self.ensure_healthy_client() .await .map_err(ClientError::unhealthy)?; let result = self.client.execute_ping().await; self.count_request(); result } async fn prepare_execution<Request>( &mut self, request: CycloneRequest<Request>, ) -> result::Result<Execution<UnixStream, Request, Request::Response>, ClientError> where Request: CycloneRequestable + Send + Sync, { self.ensure_healthy_client() .await .map_err(ClientError::unhealthy)?; let stream = self .client .websocket_stream(request.websocket_path()) .await?; let result = new_unstarted_execution(stream, request); self.count_request(); Ok(result) } } impl LocalUdsInstance { async fn ensure_healthy_client(&mut self) -> Result<()> { if !self.is_watch_shutdown_open() { return Err(LocalUdsInstanceError::WatchShutDown); } if !self.has_remaining_requests() { return Err(LocalUdsInstanceError::NoRemainingRequests); } Ok(()) } fn has_remaining_requests(&self) -> bool { match self.limit_requests { Some(0) => false, Some(_) | None => true, } } fn is_watch_shutdown_open(&self) -> bool { !self.watch_shutdown_tx.is_closed() } fn count_request(&mut self) { if let Some(limit_requests) = self.limit_requests.as_mut() { *limit_requests = limit_requests.saturating_sub(1); } } } /// The [`Spec`] for [`LocalUdsInstance`] #[derive(Builder, Clone, Debug, Default)] pub struct LocalUdsInstanceSpec { /// Canonical path to the `cyclone` program. #[builder(try_setter, setter(into), default)] cyclone_cmd_path: CanonicalCommand, /// Canonical path to the language server program. #[builder(try_setter, setter(into), default)] lang_server_cmd_path: CanonicalCommand, /// Overrides the default function timeout for the language server program, in seconds. #[builder(default)] lang_server_function_timeout: Option<usize>, /// Socket strategy for a spawned Cyclone server. #[builder(default)] socket_strategy: LocalUdsSocketStrategy, /// Runtime strategy for a spawned Cyclone server. #[builder(default)] runtime_strategy: LocalUdsRuntimeStrategy, /// Sets the watch timeout value for a spawned Cyclone server. #[builder(setter(into, strip_option), default)] watch_timeout: Option<Duration>, /// Sets the limit requests strategy for a spawned Cyclone server. #[builder(setter(into), default = "Some(1)")] limit_requests: Option<u32>, /// Enables the `ping` execution endpoint for a spawned Cyclone server. #[builder(private, setter(name = "_ping"), default = "false")] ping: bool, /// Enables the `resolver` execution endpoint for a spawned Cyclone server. #[builder(private, setter(name = "_resolver"), default = "false")] resolver: bool, /// Enables the `action` execution endpoint for a spawned Cyclone server. #[builder(private, setter(name = "_action"), default = "false")] action: bool, /// Size of the pool to configure for the spec. #[builder(setter(into), default = "500")] pub pool_size: u32, /// Sets the timeout for connecting to firecracker #[builder(setter(into), default = "10")] connect_timeout: u64, /// Sets whether or not the firecracker setup scripts will be created. #[builder(default = "true")] create_firecracker_setup_scripts: bool, } #[async_trait] impl Spec for LocalUdsInstanceSpec { type Instance = LocalUdsInstance; type Error = LocalUdsInstanceError; #[allow(unused_variables)] async fn clean(&self, id: u32) -> result::Result<(), Self::Error> { match self.runtime_strategy { LocalUdsRuntimeStrategy::LocalDocker => Ok(()), LocalUdsRuntimeStrategy::LocalProcess => Ok(()), #[cfg(target_os = "linux")] LocalUdsRuntimeStrategy::LocalFirecracker => LocalFirecrackerRuntime::clean(id).await, } } #[allow(unused_variables)] async fn prepare(&self, id: u32) -> result::Result<(), Self::Error> { match self.runtime_strategy { LocalUdsRuntimeStrategy::LocalDocker => Ok(()), LocalUdsRuntimeStrategy::LocalProcess => Ok(()), #[cfg(target_os = "linux")] LocalUdsRuntimeStrategy::LocalFirecracker => LocalFirecrackerRuntime::prepare(id).await, } } #[allow(unused_variables)] async fn setup(&mut self) -> result::Result<(), Self::Error> { match self.runtime_strategy { LocalUdsRuntimeStrategy::LocalDocker => Ok(()), LocalUdsRuntimeStrategy::LocalProcess => Ok(()), #[cfg(target_os = "linux")] LocalUdsRuntimeStrategy::LocalFirecracker => { LocalFirecrackerRuntime::setup_firecracker(self).await } } } #[allow(unused_assignments, unused_mut)] async fn spawn(&self, id: u32) -> result::Result<Self::Instance, Self::Error> { let (temp_path, socket) = temp_path_and_socket_from(&self.socket_strategy)?; let mut runtime = runtime_instance_from_spec(self, &socket, id).await?; runtime.spawn().await?; //TODO(scott): Firecracker requires the client to add a special connection detail. We //should find a better way to handle this. let mut firecracker_connect = false; #[cfg(target_os = "linux")] { firecracker_connect = matches!( self.runtime_strategy, LocalUdsRuntimeStrategy::LocalFirecracker ); } let config = ClientConfig { connect_timeout: Duration::from_millis(self.connect_timeout), firecracker_connect, ..Default::default() }; let mut client = Client::uds(runtime.socket(), Arc::new(config))?; // Establish the client watch session. As the process may be booting, we will retry for a // period before giving up and assuming that the server instance has failed. let watch = { let mut retries = 300; loop { match client.watch().await { Ok(watch) => { break watch; } Err(err) => err, }; if retries < 1 { runtime.terminate().await?; return Err(Self::Error::WatchInitTimeout); } retries -= 1; time::sleep(Duration::from_millis(64)).await; } }; let mut watch_progress = watch.start().await?; // Establish that we have received our first watch ping, which should happen immediately // after establishing a watch session watch_progress .next() .await .ok_or(Self::Error::WatchClosed)??; let (watch_shutdown_tx, watch_shutdown_rx) = oneshot::channel(); // Spawn a task to keep the watch session open until we shut it down tokio::spawn(watch_task(watch_progress, watch_shutdown_rx)); Ok(Self::Instance { _temp_path: temp_path, client, limit_requests: self.limit_requests, runtime, watch_shutdown_tx, }) } } impl SpecBuilder for LocalUdsInstanceSpecBuilder { type Spec = LocalUdsInstanceSpec; type Error = LocalUdsInstanceError; fn build(&self) -> result::Result<Self::Spec, Self::Error> { self.build().map_err(Into::into) } } impl LocalUdsInstanceSpecBuilder { /// Sets the limit requests strategy to `1` for a spawned Cyclone server. pub fn oneshot(&mut self) -> &mut Self { self.limit_requests(Some(1)) } /// Enables the `ping` execution endpoint for a spawned Cyclone server. pub fn ping(&mut self) -> &mut Self { self._ping(true) } /// Enables the `resolver` execution endpoint for a spawned Cyclone server. pub fn resolver(&mut self) -> &mut Self { self._resolver(true) } /// Enables the `action` execution endpoint for a spawned Cyclone server. pub fn action(&mut self) -> &mut Self { self._action(true) } /// Enables all available endpoints for a spawned Cyclone server pub fn all_endpoints(&mut self) -> &mut Self { self.action().resolver() } } /// Socket strategy when spawning [`Instance`]s using a local Unix domain socket. #[remain::sorted] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(untagged)] pub enum LocalUdsSocketStrategy { /// Use the given path as the socket location. Custom(PathBuf), /// Randomly assign a socket from a temp file. Random, /// Randomly assign a socket from a temp file in the given parent directory. RandomIn(PathBuf), } impl Default for LocalUdsSocketStrategy { fn default() -> Self { Self::Random } } impl LocalUdsSocketStrategy { /// Creates a random socket strategy. #[must_use] pub fn random() -> Self { Self::Random } /// Creates a random socket strategy in the given parent directory. pub fn random_in(path: impl Into<PathBuf>) -> Self { Self::RandomIn(path.into()) } /// Creates a custom socket strategy for the given socket location. pub fn custom(path: impl Into<PathBuf>) -> Self { Self::Custom(path.into()) } } fn temp_path_and_socket_from( socket_strategy: &LocalUdsSocketStrategy, ) -> Result<(Option<TempPath>, PathBuf)> { match socket_strategy { LocalUdsSocketStrategy::Random => { let temp_path = NamedTempFile::with_prefix("cyclone") .map_err(LocalUdsInstanceError::TempSocket)? .into_temp_path(); let socket = PathBuf::from(&temp_path); Ok((Some(temp_path), socket)) } LocalUdsSocketStrategy::RandomIn(parent_path) => { let temp_path = NamedTempFile::new_in(parent_path) .map_err(LocalUdsInstanceError::TempSocket)? .into_temp_path(); let socket = PathBuf::from(&temp_path); Ok((Some(temp_path), socket)) } LocalUdsSocketStrategy::Custom(socket) => Ok((None, socket.clone())), } } #[remain::sorted] /// Runtime strategy when spawning [`Instance`]s. #[derive(Clone, Debug, Copy, Serialize, Deserialize)] pub enum LocalUdsRuntimeStrategy { /// Run Docker containers on the local machine LocalDocker, #[cfg(target_os = "linux")] /// Run processes on firecracker LocalFirecracker, /// Run processes on the local machine LocalProcess, } impl Default for LocalUdsRuntimeStrategy { fn default() -> Self { // firecracker-setup: change LocalProcess to LocalFirecracker Self::LocalProcess } } #[async_trait] pub trait LocalInstanceRuntime: Send + Sync { fn id(&self) -> u32; fn socket(&mut self) -> PathBuf; async fn spawn(&mut self) -> result::Result<(), LocalUdsInstanceError>; async fn terminate(&mut self) -> result::Result<(), LocalUdsInstanceError>; } #[derive(Debug)] struct LocalProcessRuntime { cmd: Command, child: Option<Child>, socket: PathBuf, } impl LocalProcessRuntime { async fn build( socket: &PathBuf, spec: LocalUdsInstanceSpec, ) -> Result<Box<dyn LocalInstanceRuntime>> { let mut cmd = Command::new(&spec.cyclone_cmd_path); cmd.arg("--bind-uds") .arg(socket) .arg("--lang-server") .arg(&spec.lang_server_cmd_path) .arg("--enable-watch"); if let Some(timeout) = spec.lang_server_function_timeout { cmd.arg("--timeout").arg(timeout.to_string()); } if let Some(limit_requests) = spec.limit_requests { cmd.arg("--limit-requests").arg(limit_requests.to_string()); } if let Some(timeout) = spec.watch_timeout { cmd.arg("--watch-timeout") .arg(timeout.as_secs().to_string()); } if spec.ping { cmd.arg("--enable-ping"); } if spec.resolver { cmd.arg("--enable-resolver"); } if spec.action { cmd.arg("--enable-action-run"); } Ok(Box::new(LocalProcessRuntime { cmd, child: None, socket: socket.to_path_buf(), })) } } #[async_trait] impl LocalInstanceRuntime for LocalProcessRuntime { fn id(&self) -> u32 { 0 } fn socket(&mut self) -> PathBuf { self.socket.to_path_buf() } async fn spawn(&mut self) -> result::Result<(), LocalUdsInstanceError> { self.child = Some( self.cmd .spawn() .map_err(LocalUdsInstanceError::ChildSpawn)?, ); Ok(()) } async fn terminate(&mut self) -> result::Result<(), LocalUdsInstanceError> { match self.child.as_mut() { Some(c) => { process::child_shutdown(c, Some(process::Signal::SIGTERM), None).await?; Ok(()) } None => Ok(()), } } } #[derive(Debug)] struct LocalDockerRuntime { container_id: String, docker: Docker, socket: PathBuf, } impl LocalDockerRuntime { async fn build( socket: &Path, spec: LocalUdsInstanceSpec, ) -> Result<Box<dyn LocalInstanceRuntime>> { let mut cmd = vec![ String::from("--bind-uds"), socket.to_string_lossy().to_string(), String::from("--lang-server"), String::from("/usr/local/bin/lang-js"), String::from("--enable-watch"), ]; if let Some(limit_requests) = spec.limit_requests { cmd.push(String::from("--limit-requests")); cmd.push(limit_requests.to_string()) } if let Some(timeout) = spec.watch_timeout { cmd.push(String::from("--watch-timeout")); cmd.push(timeout.as_secs().to_string()); } if spec.ping { cmd.push(String::from("--enable-ping")); } if spec.resolver { cmd.push(String::from("--enable-resolver")); } if spec.action { cmd.push(String::from("--enable-action-run")); } let docker = Docker::connect_with_local_defaults()?; let rand_string: String = thread_rng() .sample_iter(&Alphanumeric) .take(10) .map(char::from) .collect(); let socket_dir = socket .parent() .expect("socket path not available") .to_str() .expect("unable to unpack path"); let mounts = vec![Mount { source: Some(String::from(socket_dir)), target: Some(String::from(socket_dir)), typ: Some(MountTypeEnum::BIND), ..Default::default() }]; let container_id = docker .create_container( Some(CreateContainerOptions { name: format!("cyclone-container-{rand_string}"), platform: Some(String::from("linux/amd64")), }), Config { image: Some(String::from("systeminit/cyclone:stable")), cmd: Some(cmd), host_config: Some(HostConfig { mounts: Some(mounts), ..Default::default() }), ..Default::default() }, ) .await? .id; Ok(Box::new(LocalDockerRuntime { container_id, docker, socket: socket.to_path_buf(), })) } } #[async_trait] impl LocalInstanceRuntime for LocalDockerRuntime { fn id(&self) -> u32 { 0 } fn socket(&mut self) -> PathBuf { self.socket.to_path_buf() } async fn spawn(&mut self) -> result::Result<(), LocalUdsInstanceError> { self.docker .start_container( &self.container_id.clone(), None::<StartContainerOptions<String>>, ) .await?; Ok(()) } async fn terminate(&mut self) -> result::Result<(), LocalUdsInstanceError> { self.docker .remove_container( &self.container_id, Some(RemoveContainerOptions { force: true, ..Default::default() }), ) .await?; Ok(()) } } #[derive(Debug)] #[cfg(target_os = "linux")] struct LocalFirecrackerRuntime { jail: FirecrackerJail, vm_id: u32, } #[cfg(target_os = "linux")] impl LocalFirecrackerRuntime { async fn build(_spec: LocalUdsInstanceSpec, id: u32) -> Result<Box<dyn LocalInstanceRuntime>> { let jail = FirecrackerJail::build(id).await?; Ok(Box::new(LocalFirecrackerRuntime { jail, vm_id: id })) } } #[async_trait] #[cfg(target_os = "linux")] impl LocalInstanceRuntime for LocalFirecrackerRuntime { fn id(&self) -> u32 { self.vm_id } fn socket(&mut self) -> PathBuf { self.jail.socket() } async fn spawn(&mut self) -> Result<()> { Ok(self.jail.spawn().await?) } async fn terminate(&mut self) -> Result<()> { Ok(self.jail.terminate().await?) } } #[cfg(target_os = "linux")] impl LocalFirecrackerRuntime { async fn clean(id: u32) -> Result<()> { Ok(FirecrackerJail::clean(id).await?) } async fn prepare(id: u32) -> Result<()> { Ok(FirecrackerJail::prepare(id).await?) } async fn setup_firecracker(spec: &LocalUdsInstanceSpec) -> Result<()> { Ok(FirecrackerJail::setup(spec.pool_size, spec.create_firecracker_setup_scripts).await?) } } #[allow(unused_variables)] async fn runtime_instance_from_spec( spec: &LocalUdsInstanceSpec, socket: &PathBuf, id: u32, ) -> Result<Box<dyn LocalInstanceRuntime>> { match spec.runtime_strategy { LocalUdsRuntimeStrategy::LocalProcess => { LocalProcessRuntime::build(socket, spec.clone()).await } LocalUdsRuntimeStrategy::LocalDocker => { LocalDockerRuntime::build(socket, spec.clone()).await } #[cfg(target_os = "linux")] LocalUdsRuntimeStrategy::LocalFirecracker => { LocalFirecrackerRuntime::build(spec.clone(), id).await } } } async fn watch_task<Strm>( mut watch_progress: WatchStarted<Strm>, mut shutdown_rx: oneshot::Receiver<()>, ) where Strm: AsyncRead + AsyncWrite + Connection + Unpin + Send + Sync + 'static, { loop { tokio::select! { // Got a shutdown message _ = Pin::new(&mut shutdown_rx) => { trace!("watch task received shutdown"); if let Err(err) = watch_progress.stop().await { trace!(error = ?err, "failed to cleanly close the watch session"); } break; } // Got progress on the watch session result = watch_progress.next() => { match result { // Got a ping, good news, proceed Some(Ok(())) => { }, // An error occurred on the stream. We are going to treat this as catastrophic // and end the watch. Some(Err(err)) => { debug!(error = ?err, "error on watch stream"); if let Err(err) = watch_progress.stop().await { debug!(error = ?err, "failed to cleanly close the watch session"); } break } // Stream is closed None => { trace!("watch stream has closed"); break } } } // All other arms are closed, nothing left to do but return else => { trace!("returning from watch task with all select arms closed"); break } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server