Skip to main content
Glama

Spark EventLog MCP Server

by yhyyz
helpers.py8.12 kB
""" Helper utilities for Spark Event Log Analysis MCP Server """ import os import json import logging from typing import Dict, Any, Optional from pathlib import Path def setup_logging(log_level: str = "INFO") -> logging.Logger: """ Setup logging configuration for the MCP server Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) Returns: Configured logger instance """ # 详细的日志格式,包含时间、日志级别、文件名、行号、函数名和消息 log_format = ( '%(asctime)s - %(levelname)-8s - ' '[%(filename)s:%(lineno)d:%(funcName)s] - ' '%(name)s - %(message)s' ) # 时间格式 date_format = '%Y-%m-%d %H:%M:%S' logging.basicConfig( level=getattr(logging, log_level.upper()), format=log_format, datefmt=date_format ) logger = logging.getLogger("spark-eventlog-mcp") return logger def load_config_from_env() -> Dict[str, Any]: """ Load configuration from environment variables with optimized parsing Returns: Configuration dictionary with properly typed values """ # Helper function for boolean conversion def get_bool(key: str, default: str = "false") -> bool: return os.getenv(key, default).lower() in ("true", "1", "yes", "on") # Helper function for integer conversion with validation def get_int(key: str, default: int) -> int: try: return int(os.getenv(key, str(default))) except (ValueError, TypeError): return default # Server settings server_config = { "server_name": os.getenv("MCP_SERVER_NAME", "Spark EventLog Analyzer"), "server_version": os.getenv("MCP_SERVER_VERSION", "1.0.0"), "log_level": os.getenv("LOG_LEVEL", "INFO").upper(), "mcp_host": os.getenv("MCP_HOST"), "mcp_port": os.getenv("MCP_PORT"), } # AWS settings with S3 as default aws_config = { "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), "aws_region": os.getenv("AWS_DEFAULT_REGION", "us-east-1"), "default_source_type": os.getenv("DEFAULT_SOURCE_TYPE", "s3"), # 默认使用 S3 } # Cache settings cache_config = { "cache_enabled": get_bool("CACHE_ENABLED", "true"), "cache_ttl": get_int("CACHE_TTL", 300), } # Analysis settings (暂无实际使用的分析配置) analysis_config = { # 所有分析配置都已移除,因为它们在业务逻辑中未被使用 # 分析深度由用户请求参数控制,不使用环境变量默认值 } # Performance settings (未实现的功能,保留配置结构) performance_config = { "enable_metrics": get_bool("ENABLE_METRICS", "false"), "metrics_port": get_int("METRICS_PORT", 9090), } # Merge all configurations config = { **server_config, **aws_config, **cache_config, **analysis_config, **performance_config, } return config def validate_file_path(file_path: str) -> bool: """ Validate if a file path exists and is accessible Args: file_path: Path to validate Returns: True if valid, False otherwise """ try: path = Path(file_path) return path.exists() and path.is_file() except Exception: return False def validate_s3_path(s3_path: str) -> bool: """ Validate S3 path format Args: s3_path: S3 path to validate Returns: True if valid format, False otherwise """ return s3_path.startswith("s3://") and len(s3_path.split("/")) >= 4 def validate_url(url: str) -> bool: """ Validate URL format Args: url: URL to validate Returns: True if valid format, False otherwise """ return url.startswith(("http://", "https://")) def format_bytes(bytes_value: int) -> str: """ Format bytes to human readable format Args: bytes_value: Number of bytes Returns: Formatted string (e.g., "1.5 GB") """ if bytes_value == 0: return "0 B" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_value < 1024: return f"{bytes_value:.2f} {unit}" bytes_value /= 1024 return f"{bytes_value:.2f} PB" def format_duration_ms(duration_ms: int) -> str: """ Format duration in milliseconds to human readable format Args: duration_ms: Duration in milliseconds Returns: Formatted string (e.g., "2m 30s") """ if duration_ms == 0: return "0ms" seconds = duration_ms / 1000 if seconds < 60: return f"{seconds:.2f}s" minutes = int(seconds // 60) remaining_seconds = seconds % 60 if minutes < 60: if remaining_seconds > 0: return f"{minutes}m {remaining_seconds:.0f}s" else: return f"{minutes}m" hours = int(minutes // 60) remaining_minutes = minutes % 60 if remaining_minutes > 0: return f"{hours}h {remaining_minutes}m" else: return f"{hours}h" def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float: """ Safely divide two numbers, returning default if denominator is zero Args: numerator: Numerator value denominator: Denominator value default: Default value to return if division by zero Returns: Result of division or default """ if denominator == 0: return default return numerator / denominator def extract_application_id_from_path(path: str) -> Optional[str]: """ Extract Spark application ID from file path Args: path: File path that might contain application ID Returns: Application ID if found, None otherwise """ # Common patterns for Spark application IDs import re # Pattern: application_1234567890123_0001 pattern = r'application_\d+_\d+' match = re.search(pattern, path) if match: return match.group(0) return None def create_error_response(error_type: str, message: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Create standardized error response Args: error_type: Type of error (e.g., "ValidationError", "ProcessingError") message: Error message details: Optional additional error details Returns: Standardized error response dictionary """ response = { "success": False, "error": { "type": error_type, "message": message } } if details: response["error"]["details"] = details return response def create_success_response(data: Any, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Create standardized success response Args: data: Response data metadata: Optional metadata Returns: Standardized success response dictionary """ response = { "success": True, "data": data } if metadata: response["metadata"] = metadata return response def sanitize_filename(filename: str) -> str: """ Sanitize filename by removing/replacing invalid characters Args: filename: Original filename Returns: Sanitized filename """ import re # Remove or replace invalid characters sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename) # Remove multiple underscores sanitized = re.sub(r'_{2,}', '_', sanitized) # Remove leading/trailing underscores and dots sanitized = sanitized.strip('_.') return sanitized or "unnamed_file" def get_file_size_mb(file_path: str) -> float: """ Get file size in megabytes Args: file_path: Path to file Returns: File size in MB, 0 if file doesn't exist """ try: size_bytes = Path(file_path).stat().st_size return size_bytes / (1024 * 1024) except Exception: return 0.0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yhyyz/spark-eventlog-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server