Skip to main content
Glama
client.rs63.6 kB
use std::{ marker::PhantomData, net::{ SocketAddr, ToSocketAddrs, }, path::PathBuf, result, str::{ self, FromStr, }, sync::Arc, time::Duration, }; use async_trait::async_trait; use cyclone_core::{ CycloneRequest, CycloneRequestable, LivenessStatus, LivenessStatusParseError, ReadinessStatus, ReadinessStatusParseError, }; use http::{ request::Builder, uri::{ Authority, InvalidUri, InvalidUriParts, PathAndQuery, Scheme, }, }; use hyper::{ Body, Method, Request, Response, StatusCode, Uri, body, client::{ HttpConnector, ResponseFuture, connect::Connection, }, service::Service, }; use hyperlocal::{ UnixClientExt, UnixConnector, UnixStream, }; use telemetry::prelude::*; use telemetry_http::propagation; use thiserror::Error; use tokio::{ io::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, }, net::TcpStream, }; use tokio_tungstenite::{ WebSocketStream, tungstenite::{ client::IntoClientRequest, handshake::client::Request as WsRequest, }, }; use crate::{ Execution, PingExecution, Watch, new_unstarted_execution, ping, watch, }; #[remain::sorted] #[derive(Debug, Error)] pub enum ClientError { #[error("cannot create client uri")] ClientUri(#[source] http::Error), #[error("failed to connect")] Connect(#[source] Box<dyn std::error::Error + Send + Sync>), #[error("failed to connect to the Firecracker VM")] FirecrackerConnect, #[error("invalid liveness status")] InvalidLivenessStatus(#[from] LivenessStatusParseError), #[error("invalid readiness status")] InvalidReadinessStatus(#[from] ReadinessStatusParseError), #[error("invalid URI")] InvalidUri(#[from] InvalidUri), #[error("invalid websocket uri scheme: {0}")] InvalidWebsocketScheme(String), #[error("IO error: {0}")] Io(#[from] std::io::Error), #[error("missing authority")] MissingAuthority, #[error("missing websocket scheme")] MissingWebsocketScheme, #[error("no socket addrs where resolved")] NoSocketAddrResolved, #[error("failed reading http response body")] ReadResponseBody(#[source] hyper::Error), #[error("failed to create an http request")] Request(#[source] hyper::http::Error), #[error("failed to create request uri")] RequestUri(#[source] InvalidUriParts), #[error("http response failed")] Response(#[source] hyper::Error), #[error("failed to resolve socket addrs")] SocketAddrResolve(#[source] std::io::Error), #[error("failed to create a tungstenite http request")] TungsteniteRequest(#[source] Box<tokio_tungstenite::tungstenite::Error>), #[error("unexpected status code: {0}")] UnexpectedStatusCode(StatusCode), #[error("client is not healthy")] Unhealthy(#[source] Box<dyn std::error::Error + Send + Sync>), #[error("failed to decode as a UTF8 string")] Utf8Decode(#[from] std::str::Utf8Error), #[error("failed to establish a websocket connection")] WebsocketConnection(#[source] Box<tokio_tungstenite::tungstenite::Error>), } impl ClientError { pub fn unhealthy(source: impl std::error::Error + Send + Sync + 'static) -> Self { Self::Unhealthy(Box::new(source)) } } type Result<T> = result::Result<T, ClientError>; #[derive(Debug)] pub struct Client<Conn, Strm, Sock> { config: Arc<ClientConfig>, inner_client: hyper::Client<Conn, Body>, connector: Conn, socket: Sock, uri: Uri, _phantom: PhantomData<Strm>, } impl<Conn, Strm, Sock> Clone for Client<Conn, Strm, Sock> where Conn: Clone, Sock: Clone, { fn clone(&self) -> Self { Self { config: self.config.clone(), inner_client: self.inner_client.clone(), connector: self.connector.clone(), socket: self.socket.clone(), uri: self.uri.clone(), _phantom: PhantomData, } } } pub type UdsClient = Client<UnixConnector, UnixStream, PathBuf>; pub type HttpClient = Client<HttpConnector, TcpStream, SocketAddr>; #[async_trait] pub trait CycloneClient<Strm> where Strm: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { async fn watch(&mut self) -> result::Result<Watch<Strm>, ClientError>; async fn liveness(&mut self) -> result::Result<LivenessStatus, ClientError>; async fn readiness(&mut self) -> result::Result<ReadinessStatus, ClientError>; async fn execute_ping(&mut self) -> result::Result<PingExecution<Strm>, ClientError>; async fn prepare_execution<Request>( &mut self, request: CycloneRequest<Request>, ) -> result::Result<Execution<Strm, Request, Request::Response>, ClientError> where Request: CycloneRequestable + Send + Sync; } impl Client<(), (), ()> { pub fn http( socket_addrs: impl ToSocketAddrs, ) -> Result<Client<HttpConnector, TcpStream, SocketAddr>> { let socket = socket_addrs .to_socket_addrs() .map_err(ClientError::SocketAddrResolve)? .next() .ok_or(ClientError::NoSocketAddrResolved)?; let connector = HttpConnector::new(); let inner_client = hyper::Client::builder().build(connector.clone()); let scheme = Scheme::HTTP; let authority = Authority::try_from(format!("{}:{}", socket.ip(), socket.port()))?; let uri = Uri::builder() .scheme(scheme) .authority(authority) .path_and_query("/") .build() .map_err(ClientError::ClientUri)?; let config = Arc::new(ClientConfig::default()); Ok(Client { config, inner_client, connector, socket, uri, _phantom: PhantomData, }) } // TODO(scott): firecracker connect here feels really flimsy. This likely needs an enum to // select behavior over. pub fn uds( socket: impl Into<PathBuf>, config: Arc<ClientConfig>, ) -> Result<Client<UnixConnector, UnixStream, PathBuf>> { let socket = socket.into(); let connector = UnixConnector; let inner_client = hyper::Client::unix(); let scheme = Scheme::try_from("unix")?; let authority = Uri::from(hyperlocal::Uri::new(&socket, "/")) .into_parts() .authority .ok_or(ClientError::MissingAuthority)?; let uri = Uri::builder() .scheme(scheme) .authority(authority) .path_and_query("/") .build() .map_err(ClientError::ClientUri)?; Ok(Client { config, inner_client, connector, socket, uri, _phantom: PhantomData, }) } } #[async_trait] impl<Conn, Strm, Sock> CycloneClient<Strm> for Client<Conn, Strm, Sock> where Conn: Service<Uri, Response = Strm> + Clone + Send + Sync + 'static, Conn::Error: Into<Box<dyn std::error::Error + Send + Sync>>, Conn::Future: Unpin + Send, Strm: AsyncRead + AsyncWrite + Connection + Unpin + Send + Sync + 'static, Sock: Send + Sync + std::fmt::Debug, { async fn watch(&mut self) -> Result<Watch<Strm>> { let stream = self.websocket_stream("/watch").await?; Ok(watch::watch(stream, self.config.watch_timeout)) } async fn liveness(&mut self) -> Result<LivenessStatus> { let response = self.get("/liveness").await?; if response.status() != StatusCode::OK { return Err(ClientError::UnexpectedStatusCode(response.status())); } let body = body::to_bytes(response) .await .map_err(ClientError::ReadResponseBody)?; let result = LivenessStatus::from_str(str::from_utf8(body.as_ref())?)?; Ok(result) } async fn readiness(&mut self) -> Result<ReadinessStatus> { let response = self.get("/readiness").await?; if response.status() != StatusCode::OK { return Err(ClientError::UnexpectedStatusCode(response.status())); } let body = body::to_bytes(response) .await .map_err(ClientError::ReadResponseBody)?; let result = ReadinessStatus::from_str(str::from_utf8(body.as_ref())?)?; Ok(result) } async fn execute_ping(&mut self) -> Result<PingExecution<Strm>> { let stream = self.websocket_stream("/execute/ping").await?; Ok(ping::execute(stream)) } async fn prepare_execution<Request>( &mut self, request: CycloneRequest<Request>, ) -> result::Result<Execution<Strm, Request, Request::Response>, ClientError> where Request: CycloneRequestable + Send + Sync, { let stream = self.websocket_stream(request.websocket_path()).await?; Ok(new_unstarted_execution(stream, request)) } } impl<Conn, Strm, Sock> Client<Conn, Strm, Sock> where Conn: Service<Uri, Response = Strm> + Clone + Send + Sync + 'static, Conn::Error: Into<Box<dyn std::error::Error + Send + Sync>>, Conn::Future: Unpin + Send, Strm: AsyncRead + AsyncWrite + Connection + Unpin + Send + Sync + 'static, Sock: Send + Sync + std::fmt::Debug, { fn http_request_uri<P>(&self, path_and_query: P) -> Result<Uri> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let mut parts = self.uri.clone().into_parts(); parts.path_and_query = Some(path_and_query.try_into()?); let uri = Uri::from_parts(parts).map_err(ClientError::RequestUri)?; Ok(uri) } fn ws_request_uri<P>(&self, path_and_query: P) -> Result<Uri> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let mut parts = self.uri.clone().into_parts(); let uri_scheme = parts.scheme.take(); match uri_scheme { Some(scheme) => match scheme.as_str() { "http" | "unix" => { let _replaced = parts.scheme.replace(Scheme::try_from("ws")?); } "https" => { let _replaced = parts.scheme.replace(Scheme::try_from("wss")?); } unsupported => { return Err(ClientError::InvalidWebsocketScheme(unsupported.to_string())); } }, None => return Err(ClientError::MissingWebsocketScheme), } parts.path_and_query = Some(path_and_query.try_into()?); let uri = Uri::from_parts(parts).map_err(ClientError::RequestUri)?; Ok(uri) } fn new_http_request<P>(&self, path_and_query: P) -> Result<Builder> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let uri = self.http_request_uri(path_and_query)?; Ok(Request::builder().uri(uri)) } fn new_ws_request<P>(&self, path_and_query: P) -> Result<WsRequest> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let uri = self.ws_request_uri(path_and_query)?; let mut request = uri .into_client_request() .map_err(|err| ClientError::TungsteniteRequest(Box::new(err)))?; propagation::inject_headers(request.headers_mut()); Ok(request) } async fn get<P>(&self, path_and_query: P) -> Result<Response<Body>> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let mut builder = self.new_http_request(path_and_query)?.method(Method::GET); match builder.headers_mut() { Some(headers) => propagation::inject_headers(headers), None => trace!("request builder has an error"), }; let request = builder.body(Body::empty()).map_err(ClientError::Request)?; self.request(request).await.map_err(ClientError::Response) } fn request(&self, req: Request<Body>) -> ResponseFuture { self.inner_client.request(req) } async fn connect(&mut self) -> Result<Strm> { let mut stream = self .connector .call(self.uri.clone()) .await .map_err(|err| ClientError::Connect(err.into()))?; // Firecracker requires a special connection method to be inserted at the head of the // stream. if self.config.firecracker_connect { let connect_cmd = "CONNECT 52\n"; let mut retries = 30; let mut single_byte = vec![0u8; 1]; stream.write_all(connect_cmd.as_bytes()).await?; loop { // We need to read off the response to clear the stream, but sometimes this connect // message hangs if the VM is still being allocated when we ask. stream = match tokio::time::timeout( self.config.connect_timeout, stream.read_exact(&mut single_byte), ) .await { Ok(_) => { if single_byte == [b'\n'] || single_byte == [b'\0'] { break; }; stream } Err(_) => { // We timed out, let's get a new stream and try again. retries -= 1; stream.shutdown().await?; stream = self .connector .call(self.uri.clone()) .await .map_err(|err| ClientError::Connect(err.into()))?; stream.write_all(connect_cmd.as_bytes()).await?; stream } }; if retries <= 0 { return Err(ClientError::FirecrackerConnect); } } } Ok(stream) } pub async fn websocket_stream<P>(&mut self, path_and_query: P) -> Result<WebSocketStream<Strm>> where P: TryInto<PathAndQuery, Error = InvalidUri>, { let stream = self.connect().await?; let request = self.new_ws_request(path_and_query)?; let (websocket_stream, response) = tokio_tungstenite::client_async(request, stream) .await .map_err(|err| ClientError::WebsocketConnection(Box::new(err)))?; if response.status() != StatusCode::SWITCHING_PROTOCOLS { return Err(ClientError::UnexpectedStatusCode(response.status())); } Ok(websocket_stream) } } #[derive(Debug)] pub struct ClientConfig { pub connect_timeout: Duration, pub firecracker_connect: bool, pub watch_timeout: Duration, } impl Default for ClientConfig { fn default() -> Self { Self { connect_timeout: Duration::from_millis(10), // firecracker-setup: change firecracker_connect to "true" firecracker_connect: false, watch_timeout: Duration::from_secs(10), } } } #[allow(clippy::panic, clippy::assertions_on_constants)] #[cfg(test)] mod tests { use std::{ collections::HashMap, env, path::Path, }; use base64::{ Engine, engine::general_purpose, }; use buck2_resources::Buck2Resources; use cyclone_core::{ ActionRunRequest, ComponentKind, ComponentView, ComponentViewWithGeometry, DebugRequest, FunctionResult, ManagementRequest, ProgressMessage, ResolverFunctionComponent, ResolverFunctionRequest, SchemaVariantDefinitionRequest, ValidationRequest, }; use cyclone_server::{ Config, ConfigBuilder, Runnable as _, Server, }; use futures::StreamExt; use serde_json::json; use tempfile::{ NamedTempFile, TempPath, }; use test_log::test; use tracing::warn; use super::*; fn rand_uds() -> TempPath { NamedTempFile::new() .expect("failed to create named tempfile") .into_temp_path() } #[allow(clippy::disallowed_methods)] // Used to determine if running in development fn lang_server_path() -> String { if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { let resources = Buck2Resources::read().expect("failed to read buck2 resources"); let lang_server_cmd_path = resources .get_ends_with("lang-js") .expect("failed to get lang-js resource") .to_string_lossy() .to_string(); warn!( lang_server_cmd_path = lang_server_cmd_path.as_str(), "detected development run", ); lang_server_cmd_path } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { let lang_server_cmd_path = Path::new(&dir) .join("../../bin/lang-js/target/lang-js") .canonicalize() .expect( "failed to canonicalize local dev build of <root>/bin/lang-js/target/lang-js", ) .to_string_lossy() .to_string(); warn!( lang_server_cmd_path = lang_server_cmd_path.as_str(), "detected development run", ); lang_server_cmd_path } else { unimplemented!("tests must be run either with Cargo or Buck2"); } } async fn uds_server(builder: &mut ConfigBuilder, tmp_socket: &TempPath) -> Server { let config = builder .unix_domain_socket(tmp_socket) .try_lang_server_path(lang_server_path()) .expect("failed to resolve lang server path") .build() .expect("failed to build config"); Server::from_config(config, Box::new(telemetry::NoopClient)) .await .expect("failed to init server") } async fn uds_client_for_running_server( builder: &mut ConfigBuilder, tmp_socket: &TempPath, ) -> UdsClient { // TODO: Audit that the environment access only happens in single-threaded code. unsafe { env::set_var("SI_LANG_JS_LOG", "debug") }; let server = uds_server(builder, tmp_socket).await; let path = server .local_socket() .as_domain_socket() .expect("expected a domain socket") .to_owned(); tokio::spawn(async move { server.run().await }); let config = Arc::new(ClientConfig::default()); Client::uds(path, config).expect("failed to create uds client") } async fn http_server(builder: &mut ConfigBuilder) -> Server { let config = builder .http_socket("127.0.0.1:0") .expect("failed to resolve socket addr") .try_lang_server_path(lang_server_path()) .expect("failed to resolve lang server path") .build() .expect("failed to build config"); Server::from_config(config, Box::new(telemetry::NoopClient)) .await .expect("failed to init server") } async fn http_client_for_running_server(builder: &mut ConfigBuilder) -> HttpClient { // TODO: Audit that the environment access only happens in single-threaded code. unsafe { env::set_var("SI_LANG_JS_LOG", "debug") }; let server = http_server(builder).await; let socket = *server .local_socket() .as_socket_addr() .expect("expected a socket addr"); tokio::spawn(async move { server.run().await }); Client::http(socket).expect("failed to create client") } fn base64_encode(input: impl AsRef<[u8]>) -> String { general_purpose::STANDARD_NO_PAD.encode(input) } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_watch() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.watch(Some(Duration::from_secs(2)))).await; // Start the protocol let mut progress = client .watch() .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume 3 pings for _ in 0..2 { match progress.next().await { Some(Ok(_)) => assert!(true), Some(Err(err)) => panic!("failed to receive ping; err={err:?}"), None => panic!("stream ended early"), } } // Signal the client's desire to stop the watch progress.stop().await.expect("failed to stop protocol"); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_watch() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.watch(Some(Duration::from_secs(2))), &tmp_socket) .await; // Start the protocol let mut progress = client .watch() .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume 3 pings for _ in 0..2 { match progress.next().await { Some(Ok(_)) => assert!(true), Some(Err(err)) => panic!("failed to receive ping; err={err:?}"), None => panic!("stream ended early"), } } // Signal the client's desire to stop the watch progress.stop().await.expect("failed to stop protocol"); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_liveness() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(&mut builder).await; let response = client.liveness().await.expect("failed to get liveness"); assert_eq!(response, LivenessStatus::Ok); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_liveness() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(&mut builder, &tmp_socket).await; let response = client.liveness().await.expect("failed to get liveness"); assert_eq!(response, LivenessStatus::Ok); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_readiness() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(&mut builder).await; let response = client.readiness().await.expect("failed to get readiness"); assert_eq!(response, ReadinessStatus::Ready); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_readiness() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(&mut builder, &tmp_socket).await; let response = client.readiness().await.expect("failed to get readiness"); assert_eq!(response, ReadinessStatus::Ready); } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_execute_ping() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.enable_ping(true)).await; client .execute_ping() .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol") } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_execute_ping_not_enabled() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.enable_ping(false)).await; match client.execute_ping().await { Err(ClientError::WebsocketConnection(_)) => assert!(true), Err(unexpected) => panic!("unexpected error: {unexpected:?}"), Ok(_) => panic!("stream not expected"), } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_execute_ping() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_ping(true), &tmp_socket).await; client .execute_ping() .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol") } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_execute_ping_not_enabled() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_ping(false), &tmp_socket).await; match client.execute_ping().await { Err(ClientError::WebsocketConnection(_)) => assert!(true), Err(unexpected) => panic!("unexpected error: {unexpected:?}"), Ok(_) => panic!("stream not expected"), } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn http_execute_resolver() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.enable_resolver(true)).await; let req = ResolverFunctionRequest { execution_id: "1234".to_string(), handler: "doit".to_string(), component: ResolverFunctionComponent { data: ComponentView { properties: serde_json::json!({"salt": "n", "peppa": "pig"}), kind: ComponentKind::Standard, }, parents: vec![ ComponentView { properties: serde_json::json!({}), kind: ComponentKind::Standard, }, ComponentView { properties: serde_json::json!({}), kind: ComponentKind::Standard, }, ], }, response_type: cyclone_core::ResolverFunctionResponseType::Object, code_base64: base64_encode( r#"function doit(input) { console.log(`${Object.keys(input).length}`); console.log('my butt'); const v = { a: 'b' }; return v; }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "2") } Some(Ok(unexpected)) => panic!("unexpected msg kind: {unexpected:?}"), Some(Err(err)) => panic!("failed to receive 'i like' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "my butt") } Some(Ok(unexpected)) => panic!("unexpected msg kind: {unexpected:?}"), Some(Err(err)) => panic!("failed to receive 'i like' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert!(output.message.starts_with("Output:")); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } // TODO(fnichol): until we've determined how to handle processing the result server side, // we're going to see a heartbeat come back when a request is processed match progress.next().await { Some(Ok(ProgressMessage::Heartbeat)) => assert!(true), Some(Ok(unexpected)) => panic!("unexpected msg kind: {unexpected:?}"), Some(Err(err)) => panic!("failed to receive heartbeat: err={err:?}"), None => panic!("output stream ended early"), } match progress.next().await { None => assert!(true), Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { assert!(!success.unset); assert_eq!(success.data, json!({"a": "b"})); } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn uds_execute_resolver() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_resolver(true), &tmp_socket).await; let req = ResolverFunctionRequest { execution_id: "1234".to_string(), handler: "doit".to_string(), component: ResolverFunctionComponent { data: ComponentView { properties: serde_json::json!({"salt": "n", "peppa": "pig"}), kind: ComponentKind::Standard, }, parents: vec![ ComponentView { properties: serde_json::json!({}), kind: ComponentKind::Standard, }, ComponentView { properties: serde_json::json!({}), kind: ComponentKind::Standard, }, ], }, response_type: cyclone_core::ResolverFunctionResponseType::Object, code_base64: base64_encode( r#"function doit(input) { console.log(`${Object.keys(input).length}`); console.log('my butt'); const v = { a: 'b' }; return v; }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "2") } Some(Ok(unexpected)) => panic!("unexpected msg kind: {unexpected:?}"), Some(Err(err)) => panic!("failed to receive 'i like' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "my butt") } Some(Ok(unexpected)) => panic!("unexpected msg kind: {unexpected:?}"), Some(Err(err)) => panic!("failed to receive 'i like' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert!(output.message.starts_with("Output:")); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { assert!(!success.unset); assert_eq!(success.data, json!({"a": "b"})); } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } async fn execute_validation<C, Strm>(mut client: C) where Strm: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, C: CycloneClient<Strm>, { let req = ValidationRequest { execution_id: "31337".to_string(), handler: "".to_string(), value: Some(33.into()), validation_format: r#"{"type":"number","flags":{"presence":"required"},"rules":[{"name":"integer"},{"name":"min","args":{"limit":33}},{"name":"max","args":{"limit":33}}]}"#.to_string(), code_base64: "".to_string(), before: vec![], }; let mut progress = client .prepare_execution(CycloneRequest::from_parts(req, Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { assert!(success.error.is_none()); } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn http_execute_validation() { let mut builder = Config::builder(); let client = http_client_for_running_server(builder.enable_validation(true)).await; execute_validation(client).await } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn uds_execute_validation() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let client = uds_client_for_running_server(builder.enable_validation(true), &tmp_socket).await; execute_validation(client).await } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn http_execute_action_run() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.enable_action_run(true)).await; let req = ActionRunRequest { execution_id: "1234".to_string(), handler: "workit".to_string(), args: Default::default(), code_base64: base64_encode( r#"function workit() { console.log('first'); console.log('second'); return { status: 'ok' }; }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } } } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert!(output.message.starts_with("Output:")); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive Output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(_success) => { // TODO(fnichol): assert some result data } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn uds_execute_action_run() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_action_run(true), &tmp_socket).await; let req = ActionRunRequest { execution_id: "1234".to_string(), handler: "workit".to_string(), args: Default::default(), code_base64: base64_encode( r#"function workit() { console.log('first'); console.log('second'); return { status: 'ok' }; }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert!(output.message.starts_with("Output:")); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(_success) => { // TODO(fnichol): assert some result data } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn http_execute_schema_variant_definition() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server( builder.enable_schema_variant_definition(true), &tmp_socket, ) .await; let req = SchemaVariantDefinitionRequest { execution_id: "1234".to_string(), handler: "createAsset".to_string(), code_base64: base64_encode( r#"function createAsset() { console.log('first'); console.log('second'); return new AssetBuilder().build(); }"#, ), }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(_success) => { // TODO(fnichol): assert some result data } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn uds_execute_schema_variant_definition() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server( builder.enable_schema_variant_definition(true), &tmp_socket, ) .await; let req = SchemaVariantDefinitionRequest { execution_id: "1234".to_string(), handler: "createAsset".to_string(), code_base64: base64_encode( r#"function createAsset() { console.log('first'); console.log('second'); return new AssetBuilder().build(); }"#, ), }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(_success) => { // TODO(fnichol): assert some result data } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn http_execute_management_func() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_management(true), &tmp_socket).await; let req = ManagementRequest { execution_id: "1234".to_string(), handler: "manage".to_string(), current_view: "DEFAULT".to_string(), this_component: ComponentViewWithGeometry { kind: None, properties: serde_json::json!({"it": "is", "a": "principle", "of": "music", "to": "repeat the theme"}), sources: serde_json::json!({}), geometry: serde_json::json!({"x": "1", "y": "2"}), incoming_connections: serde_json::json!({}), }, components: HashMap::new(), variant_socket_map: HashMap::new(), code_base64: base64_encode( r#"function manage(input) { console.log('first'); console.log('second'); return { status: 'ok', message: input.thisComponent.properties.to, } }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { assert_eq!(Some("repeat the theme"), success.message.as_deref()); } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))] async fn uds_execute_management_func() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_management(true), &tmp_socket).await; let req = ManagementRequest { execution_id: "1234".to_string(), handler: "manage".to_string(), current_view: "DEFAULT".to_string(), this_component: ComponentViewWithGeometry { kind: None, properties: serde_json::json!({"it": "is", "a": "principle", "of": "music", "to": "repeat the theme"}), sources: serde_json::json!({}), geometry: serde_json::json!({"x": "1", "y": "2"}), incoming_connections: serde_json::json!({}), }, components: HashMap::new(), variant_socket_map: HashMap::new(), code_base64: base64_encode( r#"function manage({ thisComponent }) { console.log('first'); console.log('second'); return { status: 'ok', message: thisComponent.properties.to, } }"#, ), before: vec![], }; // Start the protocol let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); // Consume the output messages loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "first"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'first' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "second"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => panic!("failed to receive 'second' output: err={err:?}"), None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } // Get the result let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { assert_eq!(Some("repeat the theme"), success.message.as_deref()); } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn http_execute_debug_func() { let mut builder = Config::builder(); let mut client = http_client_for_running_server(builder.enable_debug(true)).await; let req = DebugRequest { execution_id: "debug-1234".to_string(), handler: "debug".to_string(), component: ComponentView { kind: ComponentKind::Standard, properties: serde_json::json!({ "value": 42, }), }, debug_input: Some(serde_json::json!({ "some": true, "other": false, "data": ["hello", "goodbye"] })), code_base64: base64_encode( r#"function debug({ component, debugInput }) { console.log('Debugging'); return { a: component.properties.value, b: debugInput }; }"#, ), before: vec![], }; let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "Debugging"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => { panic!("failed to receive 'Starting debug analysis' output: err={err:?}") } None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { let output = &success.output; assert_eq!(output["a"], 42); assert_eq!(output["b"]["data"], serde_json::json!(["hello", "goodbye"])) } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } #[allow(clippy::disallowed_methods)] // `$RUST_LOG` is checked for in macro #[test(tokio::test)] async fn uds_execute_debug_func() { let tmp_socket = rand_uds(); let mut builder = Config::builder(); let mut client = uds_client_for_running_server(builder.enable_debug(true), &tmp_socket).await; let req = DebugRequest { execution_id: "debug-1234".to_string(), handler: "debug".to_string(), component: ComponentView { kind: ComponentKind::Standard, properties: serde_json::json!({ "value": 42, }), }, debug_input: Some(serde_json::json!({ "some": true, "other": false, "data": ["hello", "goodbye"] })), code_base64: base64_encode( r#"function debug({ component, debugInput }) { console.log('Debugging'); return { a: component.properties.value, b: debugInput }; }"#, ), before: vec![], }; let mut progress = client .prepare_execution(CycloneRequest::from_parts(req.clone(), Default::default())) .await .expect("failed to establish websocket stream") .start() .await .expect("failed to start protocol"); loop { match progress.next().await { Some(Ok(ProgressMessage::OutputStream(output))) => { assert_eq!(output.message, "Debugging"); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(Err(err)) => { panic!("failed to receive 'Starting debug analysis' output: err={err:?}") } None => { dbg!(req); panic!("output stream ended early") } }; } loop { match progress.next().await { None => { assert!(true); break; } Some(Ok(ProgressMessage::Heartbeat)) => continue, Some(unexpected) => panic!("output stream should be done: {unexpected:?}"), }; } let result = progress.finish().await.expect("failed to return result"); match result { FunctionResult::Success(success) => { let output = &success.output; assert_eq!(output["a"], 42); assert_eq!(output["b"]["data"], serde_json::json!(["hello", "goodbye"])) } FunctionResult::Failure(failure) => { panic!("result should be success; failure={failure:?}") } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server