Skip to main content
Glama
stream.rs4.47 kB
use std::path::{ Path, PathBuf, }; use nix::unistd::{ Gid, Uid, chown, }; use thiserror::Error; use tokio::{ fs, io::{ AsyncRead, AsyncWrite, copy_bidirectional, }, net::{ TcpListener, TcpStream, UnixListener, }, task::JoinHandle, }; use tokio_vsock::{ VMADDR_CID_HOST, VsockAddr, VsockStream, }; use tracing::debug; const UID_BASE: u32 = 5000; const GID: u32 = 10000; const DEFAULT_OTEL_PORT: u32 = 4317; #[remain::sorted] #[derive(Debug, Error)] pub enum StreamForwarderError { #[error("failed to accept stream: {0}")] Accept(#[source] std::io::Error), #[error("failed to bind to socket: {1}")] Bind(#[source] std::io::Error, PathBuf), #[error("chown error: {0}")] Chown(#[source] nix::errno::Errno), #[error("stream copy error: {0}")] Copy(#[source] std::io::Error), #[error("Unix Read error: {0}")] Read(#[source] std::io::Error), #[error("TCP Stream error: {0}")] Tcp(#[source] std::io::Error), #[error("Vsock Stream error: {0}")] Vsock(#[source] std::io::Error), } type Result<T> = std::result::Result<T, StreamForwarderError>; #[derive(Debug)] pub struct UnixStreamForwarder { source: UnixListener, port: u32, } impl UnixStreamForwarder { pub async fn new(in_path: impl AsRef<Path>, id: u32) -> Result<Self> { let port = DEFAULT_OTEL_PORT; let mut path = in_path.as_ref().to_path_buf(); path.push(format!("v.sock_{port}")); // cleanup just in case the socket already exists somehow let _ignored = fs::remove_file(path.clone()).await; let source = UnixListener::bind(&path) .map_err(|err| StreamForwarderError::Bind(err, path.clone()))?; // the jailer runs as a specific user and that user must own the socket chown_for_id(path, id)?; Ok(Self { source, port }) } #[allow(clippy::let_underscore_future)] // These needs to just run in the background forever. pub async fn start(self) -> Result<()> { debug!(port = %self.port, "starting uds -> tcp forwarder"); let _: JoinHandle<Result<()>> = tokio::spawn(async move { loop { let (uds_stream, _) = self .source .accept() .await .map_err(StreamForwarderError::Accept)?; let tcp_stream = TcpStream::connect(format!("127.0.0.1:{}", self.port)) .await .map_err(StreamForwarderError::Tcp)?; tokio::spawn(read_and_forward(uds_stream, tcp_stream)); } }); Ok(()) } } #[derive(Debug)] pub struct TcpStreamForwarder { source: TcpListener, port: u32, } impl TcpStreamForwarder { pub async fn new() -> Result<Self> { let port = DEFAULT_OTEL_PORT; let addr = format!("127.0.0.1:{port}"); let source = TcpListener::bind(addr) .await .map_err(StreamForwarderError::Tcp)?; Ok(Self { source, port }) } #[allow(clippy::let_underscore_future)] // These needs to just run in the background forever. pub async fn start(self) -> Result<()> { debug!(port = %self.port, "starting tcp -> vsock forwarder"); let _: JoinHandle<Result<()>> = tokio::spawn(async move { loop { let (tcp_stream, _) = self .source .accept() .await .map_err(StreamForwarderError::Accept)?; let vsock_stream = VsockStream::connect(VsockAddr::new(VMADDR_CID_HOST, self.port)) .await .map_err(StreamForwarderError::Vsock)?; tokio::spawn(read_and_forward(tcp_stream, vsock_stream)); } }); Ok(()) } } async fn read_and_forward<StreamA, StreamB>(mut a: StreamA, mut b: StreamB) -> Result<()> where StreamA: AsyncRead + AsyncWrite + Unpin + 'static, StreamB: AsyncRead + AsyncWrite + Unpin + 'static, { copy_bidirectional(&mut a, &mut b) .await .map_err(StreamForwarderError::Copy)?; Ok(()) } fn chown_for_id(path: PathBuf, id: u32) -> Result<()> { let uid = Uid::from_raw(UID_BASE + id); let gid = Gid::from_raw(GID); chown(&path, Some(uid), Some(gid)).map_err(StreamForwarderError::Chown)?; Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server