Skip to main content
Glama
config.rs24.5 kB
use std::{ env, net::{ SocketAddr, ToSocketAddrs, }, path::{ Path, PathBuf, }, time::Duration, }; use buck2_resources::Buck2Resources; use derive_builder::Builder; use serde::{ Deserialize, Serialize, }; use si_crypto::VeritechCryptoConfig; use si_data_nats::NatsConfig; use si_pool_noodle::{ Instance, instance::cyclone::{ LocalHttpInstance, LocalHttpInstanceSpec, LocalHttpSocketStrategy, LocalUdsInstance, LocalUdsInstanceSpec, LocalUdsRuntimeStrategy, LocalUdsSocketStrategy, }, }; use si_service_endpoints::ServiceEndpointsConfig; pub use si_settings::{ StandardConfig, StandardConfigFile, }; use si_std::CanonicalFileError; use telemetry::prelude::*; use thiserror::Error; use ulid::Ulid; const DEFAULT_VERITECH_REQUESTS_CONCURRENCY_LIMIT: usize = 1000; const DEFAULT_POOL_SIZE: u32 = 50; const DEFAULT_CYCLONE_CLIENT_EXECUTION_TIMEOUT_SECS: u64 = 60 * 35; const DEFAULT_CYCLONE_CLIENT_EXECUTION_TIMEOUT: Duration = Duration::from_secs(DEFAULT_CYCLONE_CLIENT_EXECUTION_TIMEOUT_SECS); const DEFAULT_HEARTBEAT_APP_SLEEP_SECONDS: u64 = 15; const DEFAULT_HEARTBEAT_APP_PUBLISH_TIMEOUT_SECONDS: u64 = 10; const DEFAULT_HEARTBEAT_APP_SLEEP_DURATION: Duration = Duration::from_secs(DEFAULT_HEARTBEAT_APP_SLEEP_SECONDS); const DEFAULT_HEARTBEAT_APP_PUBLISH_TIMEOUT_DURATION: Duration = Duration::from_secs(DEFAULT_HEARTBEAT_APP_PUBLISH_TIMEOUT_SECONDS); #[remain::sorted] #[derive(Debug, Error)] pub enum ConfigError { #[error("builder error: {0}")] Builder(#[from] ConfigBuilderError), #[error("canonical file error: {0}")] CanonicalFile(#[from] CanonicalFileError), #[error("cyclone spec build error")] CycloneSpecBuild(#[source] Box<dyn std::error::Error + 'static + Sync + Send>), #[error("no socket addrs where resolved")] NoSocketAddrResolved, #[error("settings error: {0}")] Settings(#[from] si_settings::SettingsError), #[error("failed to resolve socket addrs")] SocketAddrResolve(#[source] std::io::Error), } impl ConfigError { fn cyclone_spec_build(err: impl std::error::Error + 'static + Sync + Send) -> Self { Self::CycloneSpecBuild(Box::new(err)) } } type Result<T> = std::result::Result<T, ConfigError>; #[derive(Debug, Builder, Serialize)] pub struct Config { #[builder(default = "NatsConfig::default()")] nats: NatsConfig, #[serde(skip_serializing)] cyclone_spec: CycloneSpec, #[builder(default = "VeritechCryptoConfig::default()")] crypto: VeritechCryptoConfig, #[builder(default = "default_healthcheck_pool()")] healthcheck_pool: bool, #[builder(default = "default_cyclone_client_execution_timeout()")] cyclone_client_execution_timeout: Duration, #[builder(default = "default_veritech_requests_concurrency_limit()")] veritech_requests_concurrency_limit: usize, #[builder(default = "random_instance_id()")] instance_id: String, #[builder(default = "default_heartbeat_app()")] heartbeat_app: bool, #[builder(default = "default_heartbeat_app_sleep_duration()")] heartbeat_app_sleep_duration: Duration, #[builder(default = "default_heartbeat_app_publish_timeout_duration()")] heartbeat_app_publish_timeout_duration: Duration, #[builder(default = "default_service_endpoints_config()")] service_endpoints: ServiceEndpointsConfig, } impl StandardConfig for Config { type Builder = ConfigBuilder; } impl Config { /// Gets a reference to the config's cyclone spec. pub fn cyclone_spec(&self) -> &CycloneSpec { &self.cyclone_spec } /// Gets a reference to the config's nats. #[must_use] pub fn nats(&self) -> &NatsConfig { &self.nats } /// Gets a reference to the config's subject prefix. pub fn subject_prefix(&self) -> Option<&str> { self.nats.subject_prefix.as_deref() } /// Gets a reference to the config's cyclone public key path. pub fn crypto(&self) -> &VeritechCryptoConfig { &self.crypto } /// Gets the config's healthcheck value. pub fn healthcheck_pool(&self) -> bool { self.healthcheck_pool } /// Consumes into a [`CycloneSpec`]. pub fn into_cyclone_spec(self) -> CycloneSpec { self.cyclone_spec } /// Gets the config's cyclone client execution timeout. pub fn cyclone_client_execution_timeout(&self) -> Duration { self.cyclone_client_execution_timeout } /// Gets the config's veritech requests concurrency limit. pub fn veritech_requests_concurrency_limit(&self) -> usize { self.veritech_requests_concurrency_limit } /// Gets the config's instance ID. pub fn instance_id(&self) -> &str { self.instance_id.as_ref() } /// Indicates if the heartbeat app will be enabled. pub fn heartbeat_app(&self) -> bool { self.heartbeat_app } /// Gets the config's sleep duration. pub fn heartbeat_app_sleep_duration(&self) -> Duration { self.heartbeat_app_sleep_duration } /// Gets the config's publish timeout duration. pub fn heartbeat_app_publish_timeout_duration(&self) -> Duration { self.heartbeat_app_publish_timeout_duration } /// Gets a reference to the config's service endpoints configuration. #[must_use] pub fn service_endpoints(&self) -> &ServiceEndpointsConfig { &self.service_endpoints } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ConfigFile { #[serde(default)] pub nats: NatsConfig, pub cyclone: CycloneConfig, #[serde(default)] pub crypto: VeritechCryptoConfig, #[serde(default = "default_healthcheck_pool")] healthcheck_pool: bool, #[serde(default = "default_cyclone_client_execution_timeout_secs")] cyclone_client_execution_timeout_secs: u64, #[serde(default = "default_veritech_requests_concurrency_limit")] veritech_requests_concurrency_limit: usize, #[serde(default = "random_instance_id")] instance_id: String, #[serde(default = "default_heartbeat_app")] pub heartbeat_app: bool, #[serde(default = "default_heartbeat_app_sleep_secs")] heartbeat_app_sleep_secs: u64, #[serde(default = "default_heartbeat_app_publish_timeout_secs")] heartbeat_app_publish_timeout_secs: u64, #[serde(default = "default_service_endpoints_config")] service_endpoints: ServiceEndpointsConfig, } impl Default for ConfigFile { fn default() -> Self { Self::default_local_uds() } } impl ConfigFile { pub fn default_local_http() -> Self { Self { nats: Default::default(), cyclone: CycloneConfig::default_local_http(), crypto: Default::default(), healthcheck_pool: default_healthcheck_pool(), cyclone_client_execution_timeout_secs: default_cyclone_client_execution_timeout_secs(), veritech_requests_concurrency_limit: default_veritech_requests_concurrency_limit(), instance_id: random_instance_id(), heartbeat_app: default_heartbeat_app(), heartbeat_app_sleep_secs: default_heartbeat_app_sleep_secs(), heartbeat_app_publish_timeout_secs: default_heartbeat_app_publish_timeout_secs(), service_endpoints: default_service_endpoints_config(), } } pub fn default_local_uds() -> Self { Self { nats: Default::default(), cyclone: CycloneConfig::default_local_uds(), crypto: Default::default(), healthcheck_pool: default_healthcheck_pool(), cyclone_client_execution_timeout_secs: default_cyclone_client_execution_timeout_secs(), veritech_requests_concurrency_limit: default_veritech_requests_concurrency_limit(), instance_id: random_instance_id(), heartbeat_app: default_heartbeat_app(), heartbeat_app_sleep_secs: default_heartbeat_app_sleep_secs(), heartbeat_app_publish_timeout_secs: default_heartbeat_app_publish_timeout_secs(), service_endpoints: default_service_endpoints_config(), } } } impl StandardConfigFile for ConfigFile { type Error = ConfigError; } impl TryFrom<ConfigFile> for Config { type Error = ConfigError; fn try_from(mut value: ConfigFile) -> Result<Self> { detect_and_configure_development(&mut value)?; let mut config = Config::builder(); config.nats(value.nats); config.cyclone_spec(value.cyclone.try_into()?); config.crypto(value.crypto); config.cyclone_client_execution_timeout(Duration::from_secs( value.cyclone_client_execution_timeout_secs, )); config.veritech_requests_concurrency_limit(value.veritech_requests_concurrency_limit); config.instance_id(value.instance_id); config.heartbeat_app(value.heartbeat_app); config.heartbeat_app_sleep_duration(Duration::from_secs(value.heartbeat_app_sleep_secs)); config.heartbeat_app_publish_timeout_duration(Duration::from_secs( value.heartbeat_app_publish_timeout_secs, )); config.service_endpoints(value.service_endpoints); config.build().map_err(Into::into) } } #[remain::sorted] #[derive(Clone, Debug)] pub enum CycloneSpec { LocalHttp(LocalHttpInstanceSpec), LocalUds(LocalUdsInstanceSpec), } #[remain::sorted] #[derive(Clone, Debug, Eq, PartialEq)] pub enum CycloneStream { HttpSocket(SocketAddr), UnixDomainSocket(PathBuf), } impl Default for CycloneStream { fn default() -> Self { Self::HttpSocket(SocketAddr::from(([0, 0, 0, 0], 5157))) } } impl CycloneStream { pub fn http_socket(socket_addrs: impl ToSocketAddrs) -> Result<Self> { let socket_addr = socket_addrs .to_socket_addrs() .map_err(ConfigError::SocketAddrResolve)? .next() .ok_or(ConfigError::NoSocketAddrResolved)?; Ok(Self::HttpSocket(socket_addr)) } pub fn unix_domain_socket(path: impl Into<PathBuf>) -> Self { let pathbuf = path.into(); Self::UnixDomainSocket(pathbuf) } } #[remain::sorted] #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum CycloneConfig { LocalHttp { #[serde(default = "default_cyclone_cmd_path")] cyclone_cmd_path: String, #[serde(default = "default_lang_server_cmd_path")] lang_server_cmd_path: String, #[serde(default)] lang_server_function_timeout: Option<usize>, #[serde(default)] socket_strategy: LocalHttpSocketStrategy, #[serde(default)] watch_timeout: Option<Duration>, #[serde(default = "default_limit_requests")] limit_requets: Option<u32>, #[serde(default = "default_enable_endpoint")] ping: bool, #[serde(default = "default_enable_endpoint")] resolver: bool, #[serde(default = "default_enable_endpoint")] action: bool, }, LocalUds { #[serde(default = "default_cyclone_cmd_path")] cyclone_cmd_path: String, #[serde(default = "default_lang_server_cmd_path")] lang_server_cmd_path: String, #[serde(default)] lang_server_function_timeout: Option<usize>, #[serde(default)] socket_strategy: LocalUdsSocketStrategy, #[serde(default)] runtime_strategy: LocalUdsRuntimeStrategy, #[serde(default)] watch_timeout: Option<Duration>, #[serde(default = "default_limit_requests")] limit_requets: Option<u32>, #[serde(default = "default_enable_endpoint")] ping: bool, #[serde(default = "default_enable_endpoint")] resolver: bool, #[serde(default = "default_enable_endpoint")] action: bool, #[serde(default)] pool_size: u32, #[serde(default)] connect_timeout: u64, #[serde(default = "default_create_firecracker_setup_scripts")] create_firecracker_setup_scripts: bool, }, } impl CycloneConfig { pub fn default_local_http() -> Self { Self::LocalHttp { cyclone_cmd_path: default_cyclone_cmd_path(), lang_server_cmd_path: default_lang_server_cmd_path(), lang_server_function_timeout: Default::default(), socket_strategy: Default::default(), watch_timeout: Default::default(), limit_requets: default_limit_requests(), ping: default_enable_endpoint(), resolver: default_enable_endpoint(), action: default_enable_endpoint(), } } pub fn default_local_uds() -> Self { Self::LocalUds { cyclone_cmd_path: default_cyclone_cmd_path(), lang_server_cmd_path: default_lang_server_cmd_path(), lang_server_function_timeout: Default::default(), socket_strategy: Default::default(), runtime_strategy: default_runtime_strategy(), watch_timeout: Default::default(), limit_requets: default_limit_requests(), ping: default_enable_endpoint(), resolver: default_enable_endpoint(), action: default_enable_endpoint(), pool_size: default_pool_size(), connect_timeout: default_connect_timeout(), create_firecracker_setup_scripts: default_create_firecracker_setup_scripts(), } } pub fn cyclone_cmd_path(&self) -> &str { match self { CycloneConfig::LocalUds { cyclone_cmd_path, .. } => cyclone_cmd_path, CycloneConfig::LocalHttp { cyclone_cmd_path, .. } => cyclone_cmd_path, } } pub fn set_cyclone_cmd_path(&mut self, value: String) { match self { CycloneConfig::LocalUds { cyclone_cmd_path, .. } => *cyclone_cmd_path = value, CycloneConfig::LocalHttp { cyclone_cmd_path, .. } => *cyclone_cmd_path = value, }; } pub fn lang_server_cmd_path(&self) -> &str { match self { CycloneConfig::LocalUds { lang_server_cmd_path, .. } => lang_server_cmd_path, CycloneConfig::LocalHttp { lang_server_cmd_path, .. } => lang_server_cmd_path, } } pub fn set_lang_server_cmd_path(&mut self, value: String) { match self { CycloneConfig::LocalUds { lang_server_cmd_path, .. } => *lang_server_cmd_path = value, CycloneConfig::LocalHttp { lang_server_cmd_path, .. } => *lang_server_cmd_path = value, }; } pub fn set_limit_requests(&mut self, value: impl Into<Option<u32>>) { match self { CycloneConfig::LocalUds { limit_requets, .. } => *limit_requets = value.into(), CycloneConfig::LocalHttp { limit_requets, .. } => *limit_requets = value.into(), }; } pub fn set_ping(&mut self, value: bool) { match self { CycloneConfig::LocalUds { ping, .. } => *ping = value, CycloneConfig::LocalHttp { ping, .. } => *ping = value, }; } pub fn set_resolver(&mut self, value: bool) { match self { CycloneConfig::LocalUds { resolver, .. } => *resolver = value, CycloneConfig::LocalHttp { resolver, .. } => *resolver = value, }; } pub fn set_action(&mut self, value: bool) { match self { CycloneConfig::LocalUds { action, .. } => *action = value, CycloneConfig::LocalHttp { action, .. } => *action = value, }; } pub fn set_pool_size(&mut self, value: u32) { if let CycloneConfig::LocalUds { pool_size, .. } = self { *pool_size = value }; } } impl Default for CycloneConfig { fn default() -> Self { Self::default_local_uds() } } impl TryFrom<CycloneConfig> for CycloneSpec { type Error = ConfigError; fn try_from(value: CycloneConfig) -> std::result::Result<Self, Self::Error> { match value { CycloneConfig::LocalUds { cyclone_cmd_path, lang_server_cmd_path, lang_server_function_timeout, socket_strategy, runtime_strategy, watch_timeout, limit_requets, ping, resolver, action, pool_size, connect_timeout, create_firecracker_setup_scripts, } => { let mut builder = LocalUdsInstance::spec(); //we only need these if running local process. Maybe the builder should handle //this? if matches!(runtime_strategy, LocalUdsRuntimeStrategy::LocalProcess) { builder .try_cyclone_cmd_path(cyclone_cmd_path) .map_err(ConfigError::cyclone_spec_build)?; builder .try_lang_server_cmd_path(lang_server_cmd_path) .map_err(ConfigError::cyclone_spec_build)?; } builder.lang_server_function_timeout(lang_server_function_timeout); builder.socket_strategy(socket_strategy); builder.runtime_strategy(runtime_strategy); if let Some(watch_timeout) = watch_timeout { builder.watch_timeout(watch_timeout); } builder.limit_requests(limit_requets); if ping { builder.ping(); } if resolver { builder.resolver(); } if action { builder.action(); } builder.pool_size(pool_size); builder.connect_timeout(connect_timeout); builder.create_firecracker_setup_scripts(create_firecracker_setup_scripts); Ok(Self::LocalUds( builder.build().map_err(ConfigError::cyclone_spec_build)?, )) } CycloneConfig::LocalHttp { cyclone_cmd_path, lang_server_cmd_path, lang_server_function_timeout, socket_strategy, watch_timeout, limit_requets, ping, resolver, action, } => { let mut builder = LocalHttpInstance::spec(); builder .try_cyclone_cmd_path(cyclone_cmd_path) .map_err(ConfigError::cyclone_spec_build)?; builder .try_lang_server_cmd_path(lang_server_cmd_path) .map_err(ConfigError::cyclone_spec_build)?; builder.lang_server_function_timeout(lang_server_function_timeout); builder.socket_strategy(socket_strategy); if let Some(watch_timeout) = watch_timeout { builder.watch_timeout(watch_timeout); } builder.limit_requests(limit_requets); if ping { builder.ping(); } if resolver { builder.resolver(); } if action { builder.action(); } Ok(Self::LocalHttp( builder.build().map_err(ConfigError::cyclone_spec_build)?, )) } } } } fn random_instance_id() -> String { Ulid::new().to_string() } fn default_cyclone_cmd_path() -> String { "/usr/local/bin/cyclone".to_string() } fn default_lang_server_cmd_path() -> String { "/usr/local/bin/lang-js".to_string() } fn default_limit_requests() -> Option<u32> { Some(1) } fn default_enable_endpoint() -> bool { true } fn default_runtime_strategy() -> LocalUdsRuntimeStrategy { LocalUdsRuntimeStrategy::default() } fn default_pool_size() -> u32 { DEFAULT_POOL_SIZE } fn default_connect_timeout() -> u64 { 10 } fn default_healthcheck_pool() -> bool { true } fn default_cyclone_client_execution_timeout() -> Duration { DEFAULT_CYCLONE_CLIENT_EXECUTION_TIMEOUT } fn default_cyclone_client_execution_timeout_secs() -> u64 { DEFAULT_CYCLONE_CLIENT_EXECUTION_TIMEOUT_SECS } fn default_veritech_requests_concurrency_limit() -> usize { DEFAULT_VERITECH_REQUESTS_CONCURRENCY_LIMIT } fn default_heartbeat_app() -> bool { true } fn default_heartbeat_app_sleep_duration() -> Duration { DEFAULT_HEARTBEAT_APP_SLEEP_DURATION } fn default_heartbeat_app_sleep_secs() -> u64 { DEFAULT_HEARTBEAT_APP_SLEEP_SECONDS } fn default_heartbeat_app_publish_timeout_duration() -> Duration { DEFAULT_HEARTBEAT_APP_PUBLISH_TIMEOUT_DURATION } fn default_heartbeat_app_publish_timeout_secs() -> u64 { DEFAULT_HEARTBEAT_APP_PUBLISH_TIMEOUT_SECONDS } fn default_create_firecracker_setup_scripts() -> bool { true } fn default_service_endpoints_config() -> ServiceEndpointsConfig { ServiceEndpointsConfig::new(0) } #[allow(clippy::disallowed_methods)] // Used to determine if running in development pub fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { buck2_development(config) } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { cargo_development(dir, config) } else { Ok(()) } } fn buck2_development(config: &mut ConfigFile) -> Result<()> { let resources = Buck2Resources::read().map_err(ConfigError::cyclone_spec_build)?; let cyclone_cmd_path = resources .get_ends_with("cyclone") .map_err(ConfigError::cyclone_spec_build)? .to_string_lossy() .to_string(); let decryption_key_path = resources .get_ends_with("dev.decryption.key") .map_err(ConfigError::cyclone_spec_build)? .to_string_lossy() .to_string(); let lang_server_cmd_path = resources .get_ends_with("lang-js") .map_err(ConfigError::cyclone_spec_build)? .to_string_lossy() .to_string(); warn!( cyclone_cmd_path = cyclone_cmd_path.as_str(), decryption_key_path = decryption_key_path.as_str(), lang_server_cmd_path = lang_server_cmd_path, "detected development run", ); config.cyclone.set_cyclone_cmd_path(cyclone_cmd_path); config.crypto.decryption_key_file = decryption_key_path.parse().ok(); config .cyclone .set_lang_server_cmd_path(lang_server_cmd_path.to_string()); Ok(()) } fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { let cyclone_cmd_path = Path::new(&dir) .join("../../target/debug/cyclone") .canonicalize() .expect("failed to canonicalize local dev build of <root>/target/debug/cyclone") .to_string_lossy() .to_string(); let decryption_key_path = Path::new(&dir) .join("../../lib/veritech-server/src/dev.decryption.key") .canonicalize() .expect( "failed to canonicalize local key at <root>/lib/veritech-server/src/dev.decryption.key", ) .to_string_lossy() .to_string(); let lang_server_cmd_path = Path::new(&dir) .join("../../bin/lang-js/target/lang-js") .canonicalize() .expect("failed to canonicalize local dev build of <root>/bin/lang-js/target/lang-js") .to_string_lossy() .to_string(); warn!( cyclone_cmd_path = cyclone_cmd_path.as_str(), decryption_key_path = decryption_key_path.as_str(), lang_server_cmd_path = lang_server_cmd_path.as_str(), "detected development run", ); config.cyclone.set_cyclone_cmd_path(cyclone_cmd_path); config.crypto.decryption_key_file = decryption_key_path.parse().ok(); config .cyclone .set_lang_server_cmd_path(lang_server_cmd_path); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server