Skip to main content
Glama

Adaptive Graph of Thoughts MCP Server

config.pyβ€’29.8 kB
import json import logging import os from dataclasses import asdict, dataclass, field from pathlib import Path import threading from contextlib import contextmanager from typing import Any, Optional, Union logger = logging.getLogger(__name__) import yaml from pydantic import BaseModel, Field, ValidationError from pydantic_settings import BaseSettings, SettingsConfigDict from adaptive_graph_of_thoughts.services.secret_manager import load_external_secrets class AGoTSettings(BaseSettings): """Application settings loaded from environment variables or `.env`.""" llm_provider: str = Field( default="openai", description="LLM provider identifier: 'openai' or 'claude'", ) openai_api_key: Optional[str] = Field( default=None, description="API key for OpenAI completions" ) anthropic_api_key: Optional[str] = Field( default=None, description="API key for Anthropic Claude" ) rate_limit_enabled: bool = Field( default=False, description="Enable basic API rate limiting", ) rate_limit_per_minute: int = Field( default=60, description="Allowed requests per minute per client", ) model_config = SettingsConfigDict(env_file=".env") class EnvSettings(AGoTSettings): """Backward-compatible alias used in tests.""" # Load secrets from external manager before reading environment variables load_external_secrets() env_settings = EnvSettings() # Thread safety lock _config_lock = threading.RLock() class ConfigLimits: """Centralized configuration limits used for validation.""" LEARNING_RATE_MIN = 0.0 LEARNING_RATE_MAX = 1.0 BATCH_SIZE_MIN = 1 BATCH_SIZE_MAX = 10_000 MAX_STEPS_MIN = 1 MAX_STEPS_MAX = 1_000_000 # Database limits MAX_QUERY_LENGTH = 100_000 MAX_BATCH_SIZE = 10_000 CONNECTION_TIMEOUT = 30 # LLM limits MAX_PROMPT_LENGTH = 100_000 MAX_RESPONSE_LENGTH = 50_000 def validate_learning_rate(lr: float) -> None: """Validate learning rate is in valid range.""" if not isinstance(lr, (int, float)): raise TypeError(f"Learning rate must be a number, got {type(lr)}") if not (ConfigLimits.LEARNING_RATE_MIN < lr <= ConfigLimits.LEARNING_RATE_MAX): raise ValueError( f"Learning rate must be between {ConfigLimits.LEARNING_RATE_MIN} and " f"{ConfigLimits.LEARNING_RATE_MAX}, got {lr}" ) def validate_batch_size(batch_size: int) -> None: """Validate batch size is within allowed limits.""" if not isinstance(batch_size, int): raise TypeError(f"Batch size must be an integer, got {type(batch_size)}") if not ( ConfigLimits.BATCH_SIZE_MIN <= batch_size <= ConfigLimits.BATCH_SIZE_MAX ): raise ValueError( f"Batch size must be between {ConfigLimits.BATCH_SIZE_MIN} and " f"{ConfigLimits.BATCH_SIZE_MAX}, got {batch_size}" ) def validate_max_steps(max_steps: int) -> None: """Validate max steps is within allowed limits.""" if not isinstance(max_steps, int): raise TypeError(f"Max steps must be an integer, got {type(max_steps)}") if not (ConfigLimits.MAX_STEPS_MIN <= max_steps <= ConfigLimits.MAX_STEPS_MAX): raise ValueError( f"Max steps must be between {ConfigLimits.MAX_STEPS_MIN} and " f"{ConfigLimits.MAX_STEPS_MAX}, got {max_steps}" ) def validate_config_schema(_config_data: dict) -> bool: """Validate settings data against the Pydantic schema.""" try: SettingsFileModel.model_validate(_config_data) except ValidationError as exc: raise ValueError(str(exc)) from exc return True # Simple data classes for configuration class AppConfig: def __init__( self, name: str = "Adaptive Graph of Thoughts", version: str = "0.1.0", host: str = "0.0.0.0", port: int = 8000, reload: bool = True, log_level: str = "INFO", cors_allowed_origins_str: str = "*", auth_token: str | None = None, ) -> None: self.name = name self.version = version self.host = host self.port = port self.reload = reload self.log_level = log_level self.cors_allowed_origins_str = cors_allowed_origins_str self.auth_token = auth_token class ASRGoTDefaultParams: def __init__( self, initial_confidence=0.8, confidence_threshold=0.75, max_iterations=10, evidence_max_iterations=5, convergence_threshold=0.05, ): self.initial_confidence = initial_confidence self.confidence_threshold = confidence_threshold self.max_iterations = max_iterations self.evidence_max_iterations = evidence_max_iterations self.convergence_threshold = convergence_threshold class PubMedConfig: def __init__( self, api_key=None, base_url: str | None = None, email: str | None = None, max_results: int = 20, rate_limit_delay: float = 0.5, ): self.api_key = api_key self.base_url = base_url self.email = email self.max_results = max_results self.rate_limit_delay = rate_limit_delay class GoogleScholarConfig: def __init__( self, api_key: str | None = None, base_url: str | None = None, max_results: int = 10, rate_limit_delay: float = 1.0, ): self.api_key = api_key self.base_url = base_url self.max_results = max_results self.rate_limit_delay = rate_limit_delay class ExaSearchConfig: def __init__( self, api_key=None, base_url: str | None = None, max_results: int = 10, ): self.api_key = api_key self.base_url = base_url self.max_results = max_results class KnowledgeDomain: def __init__(self, name, description="", keywords=None): self.name = name self.description = description self.keywords = keywords or [] class LegacyConfig: def __init__( self, learning_rate=0.01, batch_size=32, max_steps=1000, frozen=False, **kwargs ): with _config_lock: # Main configuration attributes expected by tests validate_learning_rate(learning_rate) validate_batch_size(batch_size) validate_max_steps(max_steps) self.learning_rate = learning_rate self.batch_size = batch_size self.max_steps = max_steps self._frozen = frozen # Legacy configuration structure self.app = AppConfig() self.asr_got = ASRGoTDefaultParams() self.google_scholar = None self.pubmed = None self.exa_search = None self.knowledge_domains = [] # Load YAML configuration if exists config_file_path = ( Path(__file__).parent.parent.parent / "config" / "settings.yaml" ) if config_file_path.exists(): try: with open(config_file_path) as f: yaml.safe_load(f) # Just validate, don't store except Exception: pass # Apply additional kwargs for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) def __setattr__(self, name, value): if hasattr(self, "_frozen") and self._frozen and hasattr(self, name): raise AttributeError("Cannot modify frozen config") super().__setattr__(name, value) def __eq__(self, other): if not isinstance(other, LegacyConfig): return False return ( self.learning_rate == other.learning_rate and self.batch_size == other.batch_size and self.max_steps == other.max_steps ) def __repr__(self): return f"Config(learning_rate={self.learning_rate}, batch_size={self.batch_size}, max_steps={self.max_steps})" def model_dump(self) -> dict[str, Any]: """Convert to dictionary for pydantic v2 compatibility.""" return { "learning_rate": self.learning_rate, "batch_size": self.batch_size, "max_steps": self.max_steps, } def copy(self) -> "LegacyConfig": """Create a deep copy of the config.""" return LegacyConfig( learning_rate=self.learning_rate, batch_size=self.batch_size, max_steps=self.max_steps, frozen=False, ) def update(self, updates: dict[str, Any]) -> None: """Update config with new values.""" with _config_lock: for key, value in updates.items(): if hasattr(self, key): if key == "learning_rate": validate_learning_rate(value) elif key == "batch_size": validate_batch_size(value) elif key == "max_steps": validate_max_steps(value) setattr(self, key, value) def merge(self, other: "LegacyConfig") -> "LegacyConfig": """Merge with another config, other takes precedence.""" return LegacyConfig( learning_rate=( other.learning_rate if hasattr(other, "learning_rate") else self.learning_rate ), batch_size=( other.batch_size if hasattr(other, "batch_size") else self.batch_size ), max_steps=( other.max_steps if hasattr(other, "max_steps") else self.max_steps ), ) @classmethod def load(cls, file_path: str) -> "LegacyConfig": """Load config from file.""" path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Config file not found: {file_path}") content = path.read_text().strip() if not content: raise ValueError("Empty configuration file") try: if path.suffix.lower() in [".yaml", ".yml"]: data = yaml.safe_load(content) elif path.suffix.lower() == ".json": data = json.loads(content) else: raise ValueError(f"Unsupported file format: {path.suffix}") except yaml.YAMLError as err: raise yaml.YAMLError(f"Invalid YAML: {err}") from err except json.JSONDecodeError as err: raise json.JSONDecodeError(err.msg, err.doc, err.pos) from err if not data: raise ValueError("Empty configuration file") # Check for required keys required_keys = ["learning_rate", "batch_size"] for key in required_keys: if key not in data: raise ValueError(f"Missing required key: {key}") # Validate data types if not isinstance(data.get("learning_rate"), (int, float)): raise ValueError("learning_rate must be a number") if not isinstance(data.get("batch_size"), int): raise ValueError("batch_size must be an integer") if "max_steps" in data and not isinstance(data["max_steps"], int): raise ValueError("max_steps must be an integer") return cls(**data) def save(self, file_path: str) -> None: """Save config to file.""" path = Path(file_path) data = self.model_dump() if path.suffix.lower() in [".yaml", ".yml"]: content = yaml.dump(data, default_flow_style=False) elif path.suffix.lower() == ".json": content = json.dumps(data, indent=2) else: raise ValueError(f"Unsupported file format: {path.suffix}") try: with _config_lock: with open(file_path, "w") as f: f.write(content) except PermissionError as err: raise PermissionError(f"Permission denied writing to: {file_path}") from err @classmethod def from_env(cls) -> "LegacyConfig": """Load config from environment variables.""" data = {} if "LEARNING_RATE" in os.environ: data["learning_rate"] = float(os.environ["LEARNING_RATE"]) else: data["learning_rate"] = 0.01 if "BATCH_SIZE" in os.environ: data["batch_size"] = int(os.environ["BATCH_SIZE"]) else: data["batch_size"] = 32 if "MAX_STEPS" in os.environ: data["max_steps"] = int(os.environ["MAX_STEPS"]) else: data["max_steps"] = 1000 return cls(**data) @classmethod def load_with_overrides( cls, base_file: str, override_file: str, *, _loading_stack: Optional[set[Path]] = None, ) -> "LegacyConfig": """Load config with hierarchical overrides. Parameters ---------- base_file: Path to the base configuration file. override_file: Path to the file containing values that override the base configuration. _loading_stack: Internal set used to detect circular dependencies when configuration files reference each other recursively. """ if _loading_stack is None: _loading_stack = set() base_path = Path(base_file).resolve() override_path = Path(override_file).resolve() if base_path == override_path: raise ValueError(f"Circular dependency detected: config file '{base_path}' cannot override itself.") if base_path in _loading_stack or override_path in _loading_stack: # Add current base_path to the stack for a more informative error message current_chain = list(_loading_stack) + [base_path] raise ValueError( f"Circular dependency detected in config files: {' -> '.join(map(str, current_chain))}" ) _loading_stack.add(base_path) try: # Load base config normally base_config = cls.load(str(base_path)) finally: _loading_stack.remove(base_path) # Load override data without validation path = Path(override_file) if not path.exists(): raise FileNotFoundError(f"Override file not found: {override_file}") content = path.read_text().strip() if not content: return base_config # No overrides, return base try: if path.suffix.lower() in [".yaml", ".yml"]: override_data = yaml.safe_load(content) elif path.suffix.lower() == ".json": override_data = json.loads(content) else: raise ValueError(f"Unsupported file format: {path.suffix}") except yaml.YAMLError as err: raise yaml.YAMLError(f"Invalid YAML: {err}") from err except json.JSONDecodeError as err: raise json.JSONDecodeError(err.msg, err.doc, err.pos) from err if not override_data: return base_config # No overrides, return base # Apply overrides manually new_config = base_config.copy() new_config.update(override_data) return new_config # Create some aliases for backward compatibility class Settings(LegacyConfig): pass # Global config instance config = LegacyConfig() legacy_settings = config # For backward compatibility # --------------------------------------------------------------------------- # New Pydantic-based settings # --------------------------------------------------------------------------- class Neo4jSettingsModel(BaseSettings): """Connection details for Neo4j database.""" uri: str = "neo4j://localhost:7687" user: str = "neo4j" password: str = "password" database: str = "neo4j" model_config = SettingsConfigDict(env_prefix="NEO4J_", env_file=".env") class AppSettingsModel(BaseModel): """Application runtime settings loaded from YAML and env vars.""" name: str = "Adaptive Graph of Thoughts" version: str = "0.1.0" host: str = "0.0.0.0" port: int = 8000 uvicorn_reload: bool = True uvicorn_workers: int = 1 cors_allowed_origins_str: str = "*" auth_token: str | None = None mcp_transport_type: str = "http" mcp_stdio_enabled: bool = True mcp_http_enabled: bool = True log_level: str = "INFO" class MCPSettingsModel(BaseModel): """Model Context Protocol server settings.""" protocol_version: str server_name: str server_version: str vendor_name: str model_config = SettingsConfigDict(extra="forbid") class GoogleScholarSettingsModel(BaseModel): api_key: str | None = None base_url: str = "https://serpapi.com/search" model_config = SettingsConfigDict(extra="forbid") class PubMedSettingsModel(BaseModel): api_key: str | None = None base_url: str = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" email: str | None = None model_config = SettingsConfigDict(extra="forbid") class ExaSearchSettingsModel(BaseModel): api_key: str | None = None base_url: str = "https://api.exa.ai" model_config = SettingsConfigDict(extra="forbid") class KnowledgeDomainModel(BaseModel): name: str keywords: list[str] = Field(default_factory=list) description: str | None = None model_config = SettingsConfigDict(extra="forbid") class SettingsFileModel(BaseModel): """Schema for validating `settings.yaml`.""" app: AppSettingsModel asr_got: dict[str, Any] mcp_settings: MCPSettingsModel google_scholar: GoogleScholarSettingsModel | None = None pubmed: PubMedSettingsModel | None = None exa_search: ExaSearchSettingsModel | None = None knowledge_domains: list[KnowledgeDomainModel] = Field(default_factory=list) model_config = SettingsConfigDict(extra="forbid") class RuntimeSettings(BaseSettings): """Central runtime settings loaded from YAML and environment.""" app: AppSettingsModel = Field( default_factory=AppSettingsModel, description="Web application configuration", ) neo4j: Neo4jSettingsModel = Field( default_factory=Neo4jSettingsModel, description="Neo4j connection options", ) asr_got: dict[str, Any] = Field( default_factory=dict, description="Advanced reasoning parameters", ) model_config = SettingsConfigDict( env_file=".env", env_nested_delimiter="__", extra="ignore" ) def load_runtime_settings() -> RuntimeSettings: """ Load runtime application settings from a YAML file and environment variables. If `config/settings.yaml` exists, its contents are loaded and used as defaults, with environment variables taking precedence. Returns a `RuntimeSettings` instance containing the merged configuration. Returns: RuntimeSettings: The combined runtime settings loaded from file and environment. """ yaml_path = Path(__file__).resolve().parents[2] / "config" / "settings.yaml" data: dict[str, Any] = {} if yaml_path.exists(): try: with open(yaml_path) as fh: data = yaml.safe_load(fh) or {} validate_config_schema(data) except yaml.YAMLError as exc: # pragma: no cover - config may be malformed in tests logger.error(f"Failed to load configuration: {exc}") data = {} return RuntimeSettings(**data) runtime_settings = load_runtime_settings() # Re-export runtime settings for application modules settings = runtime_settings # --------------------------------------------------------------------------- # Simplified configuration used in tests # --------------------------------------------------------------------------- @dataclass class ModelConfig: """Model related configuration.""" name: str = "gpt-4" temperature: float = 0.7 max_tokens: int = 2048 timeout: int = 30 api_key: Optional[str] = None base_url: Optional[str] = None @dataclass class GraphConfig: """Graph processing configuration.""" max_depth: int = 5 max_breadth: int = 3 pruning_threshold: float = 0.1 enable_caching: bool = True cache_size: int = 1000 @dataclass class LoggingConfig: """Logging configuration.""" level: str = "INFO" format: str = "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" file_path: Optional[str] = None enable_console: bool = True @dataclass class Config: """Main application configuration.""" model: ModelConfig = field(default_factory=ModelConfig) graph: GraphConfig = field(default_factory=GraphConfig) logging: LoggingConfig = field(default_factory=LoggingConfig) # ------------------------------------------------------------------ # Factory methods # ------------------------------------------------------------------ @classmethod def from_dict(cls, data: dict[str, Any]) -> "Config": return cls( model=ModelConfig(**data.get("model", {})), graph=GraphConfig(**data.get("graph", {})), logging=LoggingConfig(**data.get("logging", {})), ) def to_dict(self) -> dict[str, Any]: return { "model": asdict(self.model), "graph": asdict(self.graph), "logging": asdict(self.logging), } def to_json(self) -> str: return json.dumps(self.to_dict()) def to_yaml(self) -> str: return yaml.dump(self.to_dict()) @classmethod def from_file(cls, path: Union[str, Path]) -> "Config": p = Path(path) if not p.exists(): raise FileNotFoundError("Configuration file not found") text = p.read_text().strip() if not text: raise ValueError("Failed to parse configuration file") try: if p.suffix.lower() in {".yaml", ".yml"}: data = yaml.safe_load(text) elif p.suffix.lower() == ".json": data = json.loads(text) else: raise ValueError("Unsupported file format") except (json.JSONDecodeError, yaml.YAMLError) as exc: raise ValueError("Failed to parse configuration file") from exc except ValueError: raise return cls.from_dict(data or {}) def save_to_file(self, path: Union[str, Path]) -> None: p = Path(path) data = self.to_dict() if p.suffix.lower() in {".yaml", ".yml"}: content = yaml.dump(data) elif p.suffix.lower() == ".json": content = json.dumps(data) else: raise ValueError("Unsupported file format") try: with open(p, "w") as fh: fh.write(content) except Exception as exc: raise RuntimeError("Failed to save configuration") from exc @classmethod def from_env(cls, prefix: str = "AGOT_") -> "Config": prefix = prefix.upper() def read(key: str, cast, default=None): val = os.getenv(prefix + key) if val is None: return default if cast is bool: return val.lower() in {"1", "true", "yes", "on"} try: return cast(val) except ValueError as exc: raise ValueError(f"Invalid value for {prefix}{key}") from exc model = ModelConfig( name=read("MODEL_NAME", str, ModelConfig.name), temperature=read("MODEL_TEMPERATURE", float, ModelConfig.temperature), max_tokens=read("MODEL_MAX_TOKENS", int, ModelConfig.max_tokens), timeout=read("MODEL_TIMEOUT", int, ModelConfig.timeout), api_key=read("MODEL_API_KEY", str, None), base_url=read("MODEL_BASE_URL", str, None), ) graph = GraphConfig( max_depth=read("GRAPH_MAX_DEPTH", int, GraphConfig.max_depth), max_breadth=read("GRAPH_MAX_BREADTH", int, GraphConfig.max_breadth), pruning_threshold=read( "GRAPH_PRUNING_THRESHOLD", float, GraphConfig.pruning_threshold ), enable_caching=read( "GRAPH_ENABLE_CACHING", bool, GraphConfig.enable_caching ), cache_size=read("GRAPH_CACHE_SIZE", int, GraphConfig.cache_size), ) logging_cfg = LoggingConfig( level=read("LOGGING_LEVEL", str, LoggingConfig.level), format=read("LOGGING_FORMAT", str, LoggingConfig.format), file_path=read("LOGGING_FILE_PATH", str, None), enable_console=read( "LOGGING_ENABLE_CONSOLE", bool, LoggingConfig.enable_console ), ) return cls(model=model, graph=graph, logging=logging_cfg) # ------------------------------------------------------------------ # Validation and update helpers # ------------------------------------------------------------------ def validate(self) -> None: if not 0.0 <= self.model.temperature <= 2.0: raise ValueError("Model temperature must be between 0.0 and 2.0") if self.model.max_tokens <= 0: raise ValueError("Model max_tokens must be positive") if self.model.timeout <= 0: raise ValueError("Model timeout must be positive") if self.graph.max_depth <= 0: raise ValueError("Graph max_depth must be positive") if self.graph.max_breadth <= 0: raise ValueError("Graph max_breadth must be positive") if not 0.0 <= self.graph.pruning_threshold <= 1.0: raise ValueError("Graph pruning_threshold must be between 0.0 and 1.0") if self.graph.cache_size <= 0: raise ValueError("Graph cache_size must be positive") if self.logging.level not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}: raise ValueError( "Logging level must be one of DEBUG, INFO, WARNING, ERROR, CRITICAL" ) def update(self, **sections: dict[str, Any]) -> None: for section, values in sections.items(): target = getattr(self, section, None) if target is None: logging.warning("Unknown config key: %s", section) continue for key, value in values.items(): if hasattr(target, key): setattr(target, key, value) else: logging.warning("Unknown nested config key: %s.%s", section, key) class ThreadSafeConfig: """Thread-safe wrapper around a ``Config`` instance.""" def __init__(self, initial: Optional[Config] = None) -> None: self._lock = threading.RLock() self._config = initial or Config() @contextmanager def _acquire_lock(self): self._lock.acquire() try: yield finally: self._lock.release() def get_config(self) -> Config: with self._acquire_lock(): return self._config def set_config(self, cfg: Config) -> None: with self._acquire_lock(): cfg.validate() self._config = cfg def update_config(self, updates: dict[str, Any]) -> None: with self._acquire_lock(): self._config.update(**updates) _global_config = ThreadSafeConfig() def get_config() -> Config: """Return the process-wide configuration instance.""" return _global_config.get_config() def set_config(cfg: Config) -> None: """Replace the global configuration instance after validation.""" _global_config.set_config(cfg) def load_config( file_path: Optional[Union[str, Path]] = None, env_prefix: str = "AGOT_" ) -> Config: cfg = Config() if file_path: try: cfg = Config.from_file(file_path) except Exception as exc: logging.warning("Failed to load config from file: %s", exc) try: env_cfg = Config.from_env(prefix=env_prefix) updates = {} if ( os.getenv(f"{env_prefix}MODEL_NAME") or os.getenv(f"{env_prefix}MODEL_TEMPERATURE") or os.getenv(f"{env_prefix}MODEL_MAX_TOKENS") or os.getenv(f"{env_prefix}MODEL_TIMEOUT") or os.getenv(f"{env_prefix}MODEL_API_KEY") or os.getenv(f"{env_prefix}MODEL_BASE_URL") ): updates["model"] = { k: v for k, v in asdict(env_cfg.model).items() if v is not None } if ( os.getenv(f"{env_prefix}GRAPH_MAX_DEPTH") or os.getenv(f"{env_prefix}GRAPH_MAX_BREADTH") or os.getenv(f"{env_prefix}GRAPH_PRUNING_THRESHOLD") or os.getenv(f"{env_prefix}GRAPH_ENABLE_CACHING") or os.getenv(f"{env_prefix}GRAPH_CACHE_SIZE") ): updates["graph"] = { k: v for k, v in asdict(env_cfg.graph).items() if v is not None } if ( os.getenv(f"{env_prefix}LOGGING_LEVEL") or os.getenv(f"{env_prefix}LOGGING_FORMAT") or os.getenv(f"{env_prefix}LOGGING_FILE_PATH") or os.getenv(f"{env_prefix}LOGGING_ENABLE_CONSOLE") ): updates["logging"] = { k: v for k, v in asdict(env_cfg.logging).items() if v is not None } if updates: cfg.update(**updates) except Exception as exc: logging.warning("Failed to load config from environment: %s", exc) set_config(cfg) return cfg

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SaptaDey/Adaptive-Graph-of-Thoughts-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server