Skip to main content
Glama

Doris MCP Server

Official
by apache
logger.py24.6 kB
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ Enhanced Logging configuration for Doris MCP Server. Features: - Log level-based file separation - Timestamped log entries - Automatic log rotation - Comprehensive logging coverage """ import logging import logging.config import logging.handlers import sys import os import asyncio import time from pathlib import Path from typing import Any, Optional from datetime import datetime, timedelta import threading class TimestampedFormatter(logging.Formatter): """Custom formatter with enhanced timestamp and structured format""" def __init__(self, fmt=None, datefmt=None, style='%'): if fmt is None: fmt = "%(asctime)s.%(msecs)03d %(level_aligned)s %(name)s:%(lineno)d - %(message)s" if datefmt is None: datefmt = "%Y-%m-%d %H:%M:%S" super().__init__(fmt, datefmt, style) def format(self, record): """Format log record with enhanced information and proper alignment""" # Add process info if available if hasattr(record, 'process') and record.process: record.process_info = f"[PID:{record.process}]" else: record.process_info = "" # Add thread info if available if hasattr(record, 'thread') and record.thread: record.thread_info = f"[TID:{record.thread}]" else: record.thread_info = "" # Format with proper alignment after the level name # Calculate padding needed for alignment level_name = record.levelname max_level_length = 8 # Length of "CRITICAL" padding = max_level_length - len(level_name) record.level_aligned = f"[{level_name}]{' ' * padding}" return super().format(record) class LevelBasedFileHandler(logging.Handler): """Custom handler that writes different log levels to different files""" def __init__(self, log_dir: str, base_name: str = "doris_mcp_server", max_bytes: int = 10*1024*1024, backup_count: int = 5): super().__init__() self.log_dir = Path(log_dir) self.base_name = base_name self.max_bytes = max_bytes self.backup_count = backup_count # Ensure log directory exists self.log_dir.mkdir(parents=True, exist_ok=True) # Create handlers for different log levels self.handlers = {} self._setup_level_handlers() def _setup_level_handlers(self): """Setup rotating file handlers for different log levels""" level_files = { 'DEBUG': 'debug.log', 'INFO': 'info.log', 'WARNING': 'warning.log', 'ERROR': 'error.log', 'CRITICAL': 'critical.log' } formatter = TimestampedFormatter() for level, filename in level_files.items(): file_path = self.log_dir / f"{self.base_name}_{filename}" handler = logging.handlers.RotatingFileHandler( file_path, maxBytes=self.max_bytes, backupCount=self.backup_count, encoding='utf-8' ) handler.setFormatter(formatter) handler.setLevel(getattr(logging, level)) self.handlers[level] = handler def emit(self, record): """Emit log record to appropriate level-based file""" level_name = record.levelname if level_name in self.handlers: try: self.handlers[level_name].emit(record) except Exception: self.handleError(record) def close(self): """Close all handlers""" for handler in self.handlers.values(): handler.close() super().close() class LogCleanupManager: """Log file cleanup manager for automatic maintenance""" def __init__(self, log_dir: str, max_age_days: int = 30, cleanup_interval_hours: int = 24): """ Initialize log cleanup manager. Args: log_dir: Directory containing log files max_age_days: Maximum age of log files in days (default: 30 days) cleanup_interval_hours: Cleanup interval in hours (default: 24 hours) """ self.log_dir = Path(log_dir) self.max_age_days = max_age_days self.cleanup_interval_hours = cleanup_interval_hours self.cleanup_thread = None self.stop_event = threading.Event() self.logger = None def start_cleanup_scheduler(self): """Start the cleanup scheduler in a background thread""" if self.cleanup_thread and self.cleanup_thread.is_alive(): return self.stop_event.clear() self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self.cleanup_thread.start() # Get logger for this class if not self.logger: self.logger = logging.getLogger("doris_mcp_server.log_cleanup") self.logger.info(f"Log cleanup scheduler started - cleanup every {self.cleanup_interval_hours}h, max age {self.max_age_days} days") def stop_cleanup_scheduler(self): """Stop the cleanup scheduler""" if self.cleanup_thread and self.cleanup_thread.is_alive(): self.stop_event.set() self.cleanup_thread.join(timeout=5) if self.logger: self.logger.info("Log cleanup scheduler stopped") def _cleanup_loop(self): """Background loop for periodic cleanup""" while not self.stop_event.is_set(): try: self.cleanup_old_logs() # Sleep for the specified interval, but check stop event every 60 seconds for _ in range(self.cleanup_interval_hours * 60): # Convert hours to minutes if self.stop_event.wait(60): # Wait 60 seconds or until stop event break except Exception as e: if self.logger: self.logger.error(f"Error in log cleanup loop: {e}") # Sleep for 5 minutes before retrying self.stop_event.wait(300) def cleanup_old_logs(self): """Clean up old log files based on age""" if not self.log_dir.exists(): return current_time = datetime.now() cutoff_time = current_time - timedelta(days=self.max_age_days) cleaned_files = [] cleaned_size = 0 # Pattern for log files (including backup files) log_patterns = [ "doris_mcp_server_*.log", "doris_mcp_server_*.log.*" # Backup files ] for pattern in log_patterns: for log_file in self.log_dir.glob(pattern): try: # Get file modification time file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime) if file_mtime < cutoff_time: file_size = log_file.stat().st_size log_file.unlink() # Delete the file cleaned_files.append(log_file.name) cleaned_size += file_size except Exception as e: if self.logger: self.logger.warning(f"Failed to cleanup log file {log_file}: {e}") if cleaned_files and self.logger: size_mb = cleaned_size / (1024 * 1024) self.logger.info(f"Cleaned up {len(cleaned_files)} old log files, freed {size_mb:.2f} MB") self.logger.debug(f"Cleaned files: {', '.join(cleaned_files)}") def get_cleanup_stats(self) -> dict: """Get statistics about log files and cleanup status""" if not self.log_dir.exists(): return {"error": "Log directory does not exist"} stats = { "log_directory": str(self.log_dir.absolute()), "max_age_days": self.max_age_days, "cleanup_interval_hours": self.cleanup_interval_hours, "scheduler_running": self.cleanup_thread and self.cleanup_thread.is_alive(), "total_files": 0, "total_size_mb": 0, "files_by_age": {"recent": 0, "old": 0}, "oldest_file": None, "newest_file": None } current_time = datetime.now() cutoff_time = current_time - timedelta(days=self.max_age_days) oldest_time = None newest_time = None log_patterns = ["doris_mcp_server_*.log", "doris_mcp_server_*.log.*"] for pattern in log_patterns: for log_file in self.log_dir.glob(pattern): try: file_stat = log_file.stat() file_mtime = datetime.fromtimestamp(file_stat.st_mtime) stats["total_files"] += 1 stats["total_size_mb"] += file_stat.st_size / (1024 * 1024) if file_mtime < cutoff_time: stats["files_by_age"]["old"] += 1 else: stats["files_by_age"]["recent"] += 1 if oldest_time is None or file_mtime < oldest_time: oldest_time = file_mtime stats["oldest_file"] = {"name": log_file.name, "age_days": (current_time - file_mtime).days} if newest_time is None or file_mtime > newest_time: newest_time = file_mtime stats["newest_file"] = {"name": log_file.name, "age_days": (current_time - file_mtime).days} except Exception: continue stats["total_size_mb"] = round(stats["total_size_mb"], 2) return stats class DorisLoggerManager: """Centralized logger manager for Doris MCP Server""" def __init__(self): self.is_initialized = False self.log_dir = None self.config = None self.loggers = {} self.cleanup_manager = None def setup_logging(self, level: str = "INFO", log_dir: str = "logs", enable_console: bool = True, enable_file: bool = True, enable_audit: bool = True, audit_file: Optional[str] = None, max_file_size: int = 10*1024*1024, backup_count: int = 5, enable_cleanup: bool = True, max_age_days: int = 30, cleanup_interval_hours: int = 24) -> None: """ Setup comprehensive logging configuration. Args: level: Base logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) log_dir: Directory for log files enable_console: Enable console output enable_file: Enable file logging enable_audit: Enable audit logging audit_file: Custom audit log file path max_file_size: Maximum size per log file (bytes) backup_count: Number of backup files to keep enable_cleanup: Enable automatic log cleanup max_age_days: Maximum age of log files in days (default: 30) cleanup_interval_hours: Cleanup interval in hours (default: 24) """ if self.is_initialized: return self.log_dir = Path(log_dir) log_dir_writable = True # Initialize the variable # Try to create log directory, fallback to console-only if fails try: self.log_dir.mkdir(parents=True, exist_ok=True) except (OSError, PermissionError) as e: # If we can't create log directory (e.g., read-only filesystem in stdio mode), # fall back to console-only logging log_dir_writable = False enable_file = False enable_audit = False enable_cleanup = False # Don't use print() in stdio mode as it interferes with MCP JSON protocol # Log the warning through the logging system instead, which will be handled after setup # Clear existing handlers root_logger = logging.getLogger() for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # Set root logger level root_logger.setLevel(logging.DEBUG) # Allow all levels, handlers will filter handlers = [] # Console handler if enable_console: console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, level.upper())) console_formatter = TimestampedFormatter( fmt="%(asctime)s.%(msecs)03d %(level_aligned)s %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) console_handler.setFormatter(console_formatter) handlers.append(console_handler) # Level-based file handlers if enable_file: level_handler = LevelBasedFileHandler( log_dir=str(self.log_dir), base_name="doris_mcp_server", max_bytes=max_file_size, backup_count=backup_count ) level_handler.setLevel(logging.DEBUG) # Accept all levels handlers.append(level_handler) # Combined application log (all levels in one file) if enable_file: app_log_file = self.log_dir / "doris_mcp_server_all.log" app_handler = logging.handlers.RotatingFileHandler( app_log_file, maxBytes=max_file_size, backupCount=backup_count, encoding='utf-8' ) app_handler.setLevel(getattr(logging, level.upper())) app_formatter = TimestampedFormatter() app_handler.setFormatter(app_formatter) handlers.append(app_handler) # Audit logger (separate from main logging) if enable_audit: audit_file_path = audit_file or str(self.log_dir / "doris_mcp_server_audit.log") audit_logger = logging.getLogger("audit") audit_logger.setLevel(logging.INFO) # Clear existing audit handlers for handler in audit_logger.handlers[:]: audit_logger.removeHandler(handler) audit_handler = logging.handlers.RotatingFileHandler( audit_file_path, maxBytes=max_file_size, backupCount=backup_count, encoding='utf-8' ) audit_formatter = TimestampedFormatter( fmt="%(asctime)s.%(msecs)03d [AUDIT] %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) audit_handler.setFormatter(audit_formatter) audit_logger.addHandler(audit_handler) audit_logger.propagate = False # Don't propagate to root logger # Add all handlers to root logger for handler in handlers: root_logger.addHandler(handler) # Setup package-specific loggers self._setup_package_loggers(level) # Setup log cleanup manager if enable_cleanup and enable_file: self.cleanup_manager = LogCleanupManager( log_dir=str(self.log_dir), max_age_days=max_age_days, cleanup_interval_hours=cleanup_interval_hours ) self.cleanup_manager.start_cleanup_scheduler() self.is_initialized = True # Log initialization message logger = self.get_logger("doris_mcp_server.logger") logger.info("=" * 80) logger.info("Doris MCP Server Logging System Initialized") logger.info(f"Log Level: {level}") if log_dir_writable: logger.info(f"Log Directory: {self.log_dir.absolute()}") else: logger.info("Log Directory: Not available (console-only mode)") logger.info(f"Console Logging: {'Enabled' if enable_console else 'Disabled'}") logger.info(f"File Logging: {'Enabled' if enable_file else 'Disabled (fallback mode)'}") logger.info(f"Audit Logging: {'Enabled' if enable_audit else 'Disabled (fallback mode)'}") logger.info(f"Log Cleanup: {'Enabled' if enable_cleanup and enable_file else 'Disabled (fallback mode)'}") if enable_cleanup and enable_file: logger.info(f"Cleanup Settings: Max age {max_age_days} days, interval {cleanup_interval_hours}h") if not log_dir_writable: logger.warning("Running in console-only logging mode due to filesystem permissions") logger.warning(f"Could not create log directory '{log_dir}' - stdio mode fallback enabled") logger.info("=" * 80) def _setup_package_loggers(self, level: str): """Setup specific loggers for different modules""" package_loggers = [ "doris_mcp_server", "doris_mcp_server.main", "doris_mcp_server.utils", "doris_mcp_server.tools", "doris_mcp_client" ] for logger_name in package_loggers: logger = logging.getLogger(logger_name) logger.setLevel(getattr(logging, level.upper())) # Don't add handlers here - they inherit from root logger def get_logger(self, name: str) -> logging.Logger: """ Get a logger instance with proper configuration. Args: name: Logger name (usually __name__) Returns: Configured logger instance """ if name not in self.loggers: logger = logging.getLogger(name) self.loggers[name] = logger return self.loggers[name] def get_audit_logger(self) -> logging.Logger: """Get the audit logger""" return logging.getLogger("audit") def log_system_info(self): """Log system information for debugging""" logger = self.get_logger("doris_mcp_server.system") logger.info("System Information:") logger.info(f"Python Version: {sys.version}") logger.info(f"Platform: {sys.platform}") logger.info(f"Working Directory: {os.getcwd()}") logger.info(f"Process ID: {os.getpid()}") # Log environment variables (filtered) env_vars = ["LOG_LEVEL", "LOG_FILE_PATH", "ENABLE_AUDIT", "AUDIT_FILE_PATH"] for var in env_vars: value = os.getenv(var, "Not Set") logger.info(f"Environment {var}: {value}") def get_cleanup_stats(self) -> dict: """Get log cleanup statistics""" if self.cleanup_manager: return self.cleanup_manager.get_cleanup_stats() else: return {"error": "Log cleanup is not enabled"} def manual_cleanup(self) -> dict: """Manually trigger log cleanup and return statistics""" if self.cleanup_manager: self.cleanup_manager.cleanup_old_logs() return self.cleanup_manager.get_cleanup_stats() else: return {"error": "Log cleanup is not enabled"} def shutdown(self): """Shutdown logging system""" if not self.is_initialized: return logger = self.get_logger("doris_mcp_server.logger") logger.info("Shutting down logging system...") # Stop cleanup manager if self.cleanup_manager: self.cleanup_manager.stop_cleanup_scheduler() # Close all handlers root_logger = logging.getLogger() for handler in root_logger.handlers[:]: try: handler.close() except Exception as e: print(f"Error closing handler: {e}") # Close audit logger handlers audit_logger = logging.getLogger("audit") for handler in audit_logger.handlers[:]: try: handler.close() except Exception as e: print(f"Error closing audit handler: {e}") self.is_initialized = False # Global logger manager instance _logger_manager = DorisLoggerManager() def setup_logging(level: str = "INFO", log_dir: str = "logs", enable_console: bool = True, enable_file: bool = True, enable_audit: bool = True, audit_file: Optional[str] = None, max_file_size: int = 10*1024*1024, backup_count: int = 5, enable_cleanup: bool = True, max_age_days: int = 30, cleanup_interval_hours: int = 24) -> None: """ Setup logging configuration (convenience function). Args: level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) log_dir: Directory for log files enable_console: Enable console output enable_file: Enable file logging enable_audit: Enable audit logging audit_file: Custom audit log file path max_file_size: Maximum size per log file (bytes) backup_count: Number of backup files to keep enable_cleanup: Enable automatic log cleanup max_age_days: Maximum age of log files in days (default: 30) cleanup_interval_hours: Cleanup interval in hours (default: 24) """ _logger_manager.setup_logging( level=level, log_dir=log_dir, enable_console=enable_console, enable_file=enable_file, enable_audit=enable_audit, audit_file=audit_file, max_file_size=max_file_size, backup_count=backup_count, enable_cleanup=enable_cleanup, max_age_days=max_age_days, cleanup_interval_hours=cleanup_interval_hours ) def get_logger(name: str) -> logging.Logger: """ Get a logger instance. Args: name: Logger name (usually __name__) Returns: Configured logger instance """ return _logger_manager.get_logger(name) def get_audit_logger() -> logging.Logger: """Get the audit logger""" return _logger_manager.get_audit_logger() def log_system_info(): """Log system information for debugging""" _logger_manager.log_system_info() def get_cleanup_stats() -> dict: """Get log cleanup statistics""" return _logger_manager.get_cleanup_stats() def manual_cleanup() -> dict: """Manually trigger log cleanup and return statistics""" return _logger_manager.manual_cleanup() def shutdown_logging(): """Shutdown logging system""" _logger_manager.shutdown() # Compatibility function for existing code def setup_logging_old(level: str = "INFO", log_file: str | None = None, log_format: str | None = None) -> None: """ Legacy setup function for backward compatibility. Args: level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Optional log file path (deprecated - use log_dir instead) log_format: Optional custom log format (deprecated) """ # Extract directory from log_file if provided log_dir = "logs" if log_file: log_dir = str(Path(log_file).parent) setup_logging( level=level, log_dir=log_dir, enable_console=True, enable_file=True, enable_audit=True )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apache/doris-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server