config.py•2.62 kB
"""
Configuration Management for EdgeLake MCP Server
Loads configuration from environment variables with sensible defaults.
License: Mozilla Public License 2.0
"""
import os
import logging
from typing import Optional
logger = logging.getLogger('edgelake-mcp-server.config')
class Config:
"""Server configuration loaded from environment variables"""
def __init__(
self,
edgelake_host: str = "127.0.0.1",
edgelake_port: int = 32049,
request_timeout: int = 20,
max_workers: int = 10,
log_level: str = "INFO"
):
"""
Initialize configuration.
Args:
edgelake_host: EdgeLake node IP/hostname
edgelake_port: EdgeLake REST API port
request_timeout: HTTP request timeout in seconds
max_workers: Maximum concurrent worker threads
log_level: Logging level (DEBUG, INFO, WARNING, ERROR)
"""
self.edgelake_host = edgelake_host
self.edgelake_port = edgelake_port
self.request_timeout = request_timeout
self.max_workers = max_workers
self.log_level = log_level
@classmethod
def from_env(cls) -> 'Config':
"""
Load configuration from environment variables.
Environment Variables:
EDGELAKE_HOST: EdgeLake node IP/hostname (default: 127.0.0.1)
EDGELAKE_PORT: EdgeLake REST API port (default: 32049)
EDGELAKE_TIMEOUT: Request timeout in seconds (default: 20)
EDGELAKE_MAX_WORKERS: Max concurrent threads (default: 10)
LOG_LEVEL: Logging level (default: INFO)
Returns:
Config instance
"""
edgelake_host = os.getenv("EDGELAKE_HOST", "127.0.0.1")
edgelake_port = int(os.getenv("EDGELAKE_PORT", "32049"))
request_timeout = int(os.getenv("EDGELAKE_TIMEOUT", "20"))
max_workers = int(os.getenv("EDGELAKE_MAX_WORKERS", "10"))
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
config = cls(
edgelake_host=edgelake_host,
edgelake_port=edgelake_port,
request_timeout=request_timeout,
max_workers=max_workers,
log_level=log_level
)
logger.info(f"Configuration loaded: host={config.edgelake_host}, port={config.edgelake_port}")
return config
def __repr__(self) -> str:
return (
f"Config(host={self.edgelake_host}, port={self.edgelake_port}, "
f"timeout={self.request_timeout}, workers={self.max_workers}, "
f"log_level={self.log_level})"
)