Skip to main content
Glama

MCP Croit Ceph

Official
by croit
croit_log_tools.py99.7 kB
#!/usr/bin/env python3 """ Croit Log Intelligence Tools for MCP Server Provides advanced log search and analysis capabilities """ import json import asyncio import websockets import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import re import hashlib from collections import defaultdict, Counter import aiohttp import zipfile import io logger = logging.getLogger(__name__) class LogSearchIntentParser: """Parse natural language into structured search intents""" PATTERNS = { "osd_issues": { "regex": r"(osd|OSD|object.?storage).*?(fail|down|crash|slow|error|flap|timeout)", "services": ["ceph-osd", "ceph-mon"], "levels": ["ERROR", "WARN", "FATAL"], "keywords": ["OSD", "failed", "down", "crashed", "flapping"], }, "slow_requests": { "regex": r"(slow|blocked|stuck|delayed)\s+(request|operation|op|query|io)", "services": ["ceph-osd", "ceph-mon", "ceph-mds"], "levels": ["WARN", "ERROR"], "keywords": ["slow request", "blocked", "timeout", "stuck"], }, "auth_failures": { "regex": r"(auth|authentication|login|permission).*?(fail|denied|error)", "services": ["ceph-mon", "ceph-mgr"], "levels": ["ERROR", "WARN"], "keywords": ["authentication", "failed", "denied", "unauthorized"], }, "network_problems": { "regex": r"(network|connection|timeout|unreachable|heartbeat|msgr)", "services": ["ceph-mon", "ceph-osd", "ceph-mds", "ceph-mgr"], "levels": ["ERROR", "WARN"], "keywords": [ "connection", "timeout", "network", "unreachable", "heartbeat", ], }, "pool_issues": { "regex": r"pool.*?(full|create|delete|error)", "services": ["ceph-mon", "ceph-mgr"], "levels": ["ERROR", "WARN"], "keywords": ["pool", "full", "quota", "space"], }, } def parse(self, search_intent: str) -> Dict[str, Any]: """Parse natural language search intent""" intent = search_intent.lower() # Detect Ceph service references and translate them ceph_services = CephServiceTranslator.detect_ceph_services_in_text( search_intent ) translated_services = [] for service in ceph_services: translated = CephServiceTranslator.translate_service_name(service) translated_services.append(translated) # Replace in intent for better pattern detection intent = intent.replace(service.lower(), translated.lower()) # Detect patterns detected_patterns = [] for pattern_name, pattern_def in self.PATTERNS.items(): if re.search(pattern_def["regex"], intent, re.IGNORECASE): detected_patterns.append(pattern_name) # Extract components services = set() levels = set() keywords = set() # Add translated Ceph services services.update(translated_services) for pattern_name in detected_patterns: pattern = self.PATTERNS[pattern_name] services.update(pattern["services"]) levels.update(pattern["levels"]) keywords.update(pattern["keywords"]) # Enhanced level detection with kernel-specific handling intent_lower = intent.lower() # Explicit level requests if ( "all level" in intent_lower or "all log" in intent_lower or "everything" in intent_lower ): levels = set() # No level filter elif "critical" in intent_lower or "emergency" in intent_lower: levels.update(["EMERGENCY", "ALERT", "CRITICAL"]) elif "error" in intent_lower and "no error" not in intent_lower: levels.update(["ERROR", "CRITICAL", "ALERT", "EMERGENCY"]) elif "warning" in intent_lower or "warn" in intent_lower: levels.update(["WARNING", "ERROR", "CRITICAL", "ALERT", "EMERGENCY"]) elif "info" in intent_lower and "info" not in " ".join(keywords).lower(): levels.update( ["INFO", "WARNING", "ERROR", "CRITICAL", "ALERT", "EMERGENCY"] ) elif "debug" in intent_lower: levels.update( ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "ALERT", "EMERGENCY"] ) elif "trace" in intent_lower: levels = set() # All levels for trace # Kernel-specific optimizations kernel_mentioned = any( word in intent_lower for word in ["kernel", "hardware", "driver", "system"] ) if kernel_mentioned: # For kernel logs, focus on more critical levels by default if not levels and not any( word in intent_lower for word in ["all", "everything", "debug", "trace", "info"] ): levels.update(["WARNING", "ERROR", "CRITICAL", "ALERT", "EMERGENCY"]) # Smart defaults based on context problem_indicators = [ "error", "fail", "problem", "issue", "crash", "wrong", "slow", "timeout", "stuck", ] if not levels and not any( word in intent_lower for word in problem_indicators + ["all", "everything"] ): # No explicit level and no problem indicators - get reasonable subset if kernel_mentioned: levels.update( ["NOTICE", "WARNING", "ERROR", "CRITICAL", "ALERT", "EMERGENCY"] ) else: levels = set() # For service logs, get all levels # Performance queries often need broader scope performance_indicators = [ "performance", "slow", "fast", "latency", "throughput", "bandwidth", ] if any(word in intent_lower for word in performance_indicators): if not levels or levels == {"ERROR", "WARNING"}: levels.update( ["INFO", "NOTICE", "WARNING", "ERROR"] ) # Include info for performance data # Parse time range time_range = self._parse_time_range(intent) # Determine query type query_type = "tail" if "monitor" in intent or "stream" in intent else "query" return { "type": query_type, "services": list(services), "levels": ( list(levels) if levels else [] ), # Empty list = no level filter = all logs "keywords": list(keywords), "time_range": time_range, } def _parse_time_range(self, text: str) -> Dict[str, str]: """Extract time range from text""" now = datetime.now() text_lower = text.lower() # Pattern matching for time expressions patterns = { "last hour": timedelta(hours=1), "past hour": timedelta(hours=1), "last day": timedelta(days=1), "past day": timedelta(days=1), "last week": timedelta(days=7), "recent": timedelta(minutes=15), } for pattern, delta in patterns.items(): if pattern in text_lower: return { "start": (now - delta).isoformat() + "Z", "end": now.isoformat() + "Z", } # Check for "X ago" pattern (e.g., "one hour ago", "5 minutes ago") match = re.search( r"(\d+|one|two|three|four|five|six|seven|eight|nine|ten)\s+(second|minute|hour|day|week)s?\s+ago", text_lower, ) if match: amount_str = match.group(1) unit = match.group(2) # Convert word numbers to digits word_to_num = { "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, } amount = word_to_num.get( amount_str, int(amount_str) if amount_str.isdigit() else 1 ) if "second" in unit: delta = timedelta(seconds=amount) elif "minute" in unit: delta = timedelta(minutes=amount) elif "hour" in unit: delta = timedelta(hours=amount) elif "day" in unit: delta = timedelta(days=amount) elif "week" in unit: delta = timedelta(weeks=amount) else: delta = timedelta(hours=1) return { "start": (now - delta).isoformat() + "Z", "end": now.isoformat() + "Z", } # Check for relative time with "last/past" match = re.search(r"(last|past)\s+(\d+)\s+(minute|hour|day|week)s?", text_lower) if match: amount = int(match.group(2)) unit = match.group(3) if "minute" in unit: delta = timedelta(minutes=amount) elif "hour" in unit: delta = timedelta(hours=amount) elif "day" in unit: delta = timedelta(days=amount) elif "week" in unit: delta = timedelta(weeks=amount) else: delta = timedelta(hours=1) return { "start": (now - delta).isoformat() + "Z", "end": now.isoformat() + "Z", } # Default to last hour return { "start": (now - timedelta(hours=1)).isoformat() + "Z", "end": now.isoformat() + "Z", } class LogsQLBuilder: """Build LogsQL queries from parsed intents""" def build(self, intent: Dict[str, Any]) -> str: """Build LogsQL query from intent""" conditions = [] # Add time filter first for optimization if intent.get("time_range"): start = intent["time_range"].get("start") end = intent["time_range"].get("end") if start and end: conditions.append(f"_time:[{start}, {end}]") # Add service filters if intent.get("services"): service_conditions = [f"service:{s}" for s in intent["services"]] if len(service_conditions) > 1: conditions.append(f"({' OR '.join(service_conditions)})") else: conditions.append(service_conditions[0]) # Add severity filters if intent.get("levels"): level_conditions = [f"level:{l}" for l in intent["levels"]] if len(level_conditions) > 1: conditions.append(f"({' OR '.join(level_conditions)})") else: conditions.append(level_conditions[0]) # Add keyword search if intent.get("keywords"): keyword_conditions = [f'_msg:"{k}"' for k in intent["keywords"]] if len(keyword_conditions) > 1: conditions.append(f"({' OR '.join(keyword_conditions)})") else: conditions.append(keyword_conditions[0]) return " AND ".join(conditions) if conditions else "" class CroitLogSearchClient: """Client for Croit log searching via WebSocket""" def __init__( self, host: str, port: int = 8080, api_token: Optional[str] = None, use_ssl: bool = False, ): self.host = host self.port = port self.api_token = api_token self.use_ssl = use_ssl # Build URLs with correct protocol ws_protocol = "wss" if use_ssl else "ws" http_protocol = "https" if use_ssl else "http" self.ws_url = f"{ws_protocol}://{host}:{port}/api/logs" self.http_url = f"{http_protocol}://{host}:{port}" self.parser = LogSearchIntentParser() self.builder = LogsQLBuilder() self.server_detector = ServerIDDetector(self) self.transport_analyzer = LogTransportAnalyzer(self) # Cache for results self.cache = {} self.cache_ttl = 300 # 5 minutes async def search_logs(self, search_query: str, limit: int = 1000) -> Dict[str, Any]: """Search logs using natural language query""" # Check cache cache_key = hashlib.md5(f"{search_query}{limit}".encode()).hexdigest() if cache_key in self.cache: cached = self.cache[cache_key] if (datetime.now() - cached["timestamp"]).seconds < self.cache_ttl: return cached["data"] # Parse intent intent = self.parser.parse(search_query) # Build LogsQL query query = self.builder.build(intent) # Prepare request request = { "type": intent.get("type", "query"), "query": {"where": query, "limit": limit}, } if intent.get("time_range"): request["start"] = intent["time_range"].get("start") request["end"] = intent["time_range"].get("end") # Execute query try: logs = await self._execute_websocket_query(request) logger.debug(f"WebSocket query successful: {len(logs)} logs returned") except Exception as e: logger.error(f"WebSocket failed: {e}, falling back to HTTP") logs = await self._execute_http_query(request) logger.debug(f"HTTP fallback completed: {len(logs)} logs returned") # Analyze results with intelligent prioritization patterns = self._analyze_patterns(logs) if logs else [] insights = self._generate_insights(logs, patterns) # Create log summary for better overview summary_engine = LogSummaryEngine() log_summary = summary_engine.summarize_logs(logs, max_details=20) # Intelligent truncation: prioritize critical events if logs and len(logs) > 100: # Get critical events with full log data critical_events = log_summary["critical_events"] critical_logs = [ event["log"] for event in critical_events[:50] ] # Top 50 critical # Fill remaining space with recent logs (avoiding duplicates) critical_log_ids = {id(log) for log in critical_logs} recent_logs = [log for log in logs[-50:] if id(log) not in critical_log_ids] intelligent_results = critical_logs + recent_logs intelligent_results = intelligent_results[:100] # Final limit truncation_info = { "total_logs": len(logs), "shown_logs": len(intelligent_results), "critical_events_shown": len(critical_logs), "recent_logs_shown": len(recent_logs), "truncation_method": "intelligent_priority", } else: intelligent_results = logs[:100] if logs else [] truncation_info = { "total_logs": len(logs) if logs else 0, "shown_logs": len(intelligent_results), "truncation_method": "simple_limit", } result = { "query": query, "intent": intent, "total_count": len(logs) if logs else 0, "results": intelligent_results, "patterns": patterns[:10], # Limit patterns "insights": insights, "summary": log_summary, "truncation_info": truncation_info, } # Cache result self.cache[cache_key] = {"timestamp": datetime.now(), "data": result} return result # Log Level Shortcuts async def search_errors( self, query: str = "", hours_back: int = 24, limit: int = 100 ) -> Dict[str, Any]: """Quick shortcut to search ERROR level logs""" search_query = f"error level priority ≤3 {query}".strip() return await self.search_logs_with_params( search_query=search_query, priority_max=3, hours_back=hours_back, limit=limit, ) async def search_warnings( self, query: str = "", hours_back: int = 24, limit: int = 200 ) -> Dict[str, Any]: """Quick shortcut to search WARNING level logs""" search_query = f"warning level priority ≤4 {query}".strip() return await self.search_logs_with_params( search_query=search_query, priority_max=4, hours_back=hours_back, limit=limit, ) async def search_info( self, query: str = "", hours_back: int = 6, limit: int = 500 ) -> Dict[str, Any]: """Quick shortcut to search INFO level logs""" search_query = f"info level priority ≤6 {query}".strip() return await self.search_logs_with_params( search_query=search_query, priority_max=6, hours_back=hours_back, limit=limit, ) async def search_critical( self, query: str = "", hours_back: int = 48, limit: int = 50 ) -> Dict[str, Any]: """Quick shortcut to search CRITICAL/EMERGENCY level logs""" search_query = f"critical emergency level priority ≤2 {query}".strip() return await self.search_logs_with_params( search_query=search_query, priority_max=2, hours_back=hours_back, limit=limit, ) async def search_logs_with_params( self, search_query: str, priority_max: Optional[int] = None, hours_back: int = 24, limit: int = 1000, server_id: Optional[str] = None, ) -> Dict[str, Any]: """Enhanced search with explicit parameters""" # Build query with specific parameters query_conditions = [] # Priority filter if priority_max is not None: query_conditions.append({"PRIORITY": {"_lte": priority_max}}) # Server filter if server_id: query_conditions.append({"CROIT_SERVERID": {"_eq": server_id}}) # Time range start_time = int((datetime.now() - timedelta(hours=hours_back)).timestamp()) end_time = int(datetime.now().timestamp()) # Search text search_text = search_query.strip() if search_query.strip() else "" # Build the query base_query = { "type": "query", "start": start_time, "end": end_time, "query": {"where": {"_search": search_text}, "limit": limit}, } # Add conditions if any if query_conditions: if len(query_conditions) == 1: # Merge single condition with _search base_query["query"]["where"] = { "_and": [ query_conditions[0], {"_search": search_text} if search_text else {}, ] } # Remove empty _search if not search_text: base_query["query"]["where"]["_and"] = query_conditions else: # Multiple conditions all_conditions = query_conditions.copy() if search_text: all_conditions.append({"_search": search_text}) base_query["query"]["where"] = {"_and": all_conditions} # Execute query try: logs = await self._execute_http_query(base_query) logger.debug(f"Parameterized search completed: {len(logs)} logs returned") except Exception as e: logger.error(f"Parameterized search failed: {e}") logs = [] # Calculate actual hours searched actual_hours_searched = (end_time - start_time) / 3600.0 # Create summary summary_engine = LogSummaryEngine() log_summary = summary_engine.summarize_logs(logs, max_details=15) # Intelligent truncation if logs and len(logs) > limit // 2: # Apply intelligent truncation critical_events = log_summary["critical_events"] critical_logs = [event["log"] for event in critical_events[: limit // 3]] recent_logs = logs[-(limit // 3) :] if len(logs) > limit // 3 else logs # Avoid duplicates critical_log_ids = {id(log) for log in critical_logs} recent_logs = [ log for log in recent_logs if id(log) not in critical_log_ids ] final_logs = critical_logs + recent_logs final_logs = final_logs[: limit // 2] # Final size control else: final_logs = logs return { "query_params": { "search_query": search_query, "priority_max": priority_max, "hours_back": hours_back, "server_id": server_id, "limit": limit, }, "actual_query": base_query, "total_count": len(logs), "displayed_count": len(final_logs), "hours_searched": actual_hours_searched, "results": final_logs, "summary": log_summary, "execution_timestamp": datetime.now().isoformat(), } # Server Discovery async def discover_servers(self, force_refresh: bool = False) -> Dict[str, Any]: """Discover available server IDs from recent logs""" return await self.server_detector.detect_servers(force_refresh) async def get_server_summary(self) -> str: """Get human-readable server summary""" server_info = await self.discover_servers() return self.server_detector.get_server_summary(server_info) # Transport Analysis async def analyze_log_transports(self, hours_back: int = 24) -> Dict[str, Any]: """Analyze available log transport types""" return await self.transport_analyzer.analyze_transports(hours_back) async def find_kernel_logs_debug(self, hours_back: int = 24) -> Dict[str, Any]: """Debug kernel log availability with multiple strategies""" return await self.transport_analyzer.find_kernel_logs(hours_back) # Response Size Optimization def optimize_response_size( self, data: Dict, max_log_entries: int = 50, max_message_length: int = 200 ) -> Dict: """Optimize response size while preserving critical information""" optimized = data.copy() # Optimize log results if "results" in optimized and isinstance(optimized["results"], list): logs = optimized["results"] if len(logs) > max_log_entries: # Keep critical events + recent logs summary = optimized.get("summary", {}) critical_events = summary.get("critical_events", []) # Get critical log entries critical_logs = [] if critical_events: critical_logs = [ event["log"] for event in critical_events[: max_log_entries // 2] ] # Get recent logs (avoiding duplicates) critical_ids = {id(log) for log in critical_logs} recent_logs = [ log for log in logs[-(max_log_entries // 2) :] if id(log) not in critical_ids ] optimized_logs = critical_logs + recent_logs optimized["results"] = optimized_logs[:max_log_entries] # Add optimization info optimized["optimization_applied"] = { "original_count": len(logs), "optimized_count": len(optimized["results"]), "critical_events_kept": len(critical_logs), "recent_logs_kept": len(recent_logs), "method": "critical_events_plus_recent", } # Truncate long messages if "results" in optimized: for log in optimized["results"]: if "MESSAGE" in log and len(log["MESSAGE"]) > max_message_length: log["MESSAGE"] = ( log["MESSAGE"][:max_message_length] + "...[truncated]" ) log["_message_truncated"] = True # Optimize summary critical events if "summary" in optimized and "critical_events" in optimized["summary"]: events = optimized["summary"]["critical_events"] for event in events: if ( "message_preview" in event and len(event["message_preview"]) > max_message_length ): event["message_preview"] = ( event["message_preview"][:max_message_length] + "..." ) # Optimize patterns (keep only top patterns) if "patterns" in optimized and isinstance(optimized["patterns"], list): optimized["patterns"] = optimized["patterns"][:5] # Top 5 patterns only return optimized async def search_optimized( self, search_query: str, limit: int = 1000, optimize_response: bool = True ) -> Dict[str, Any]: """Search with automatic response optimization""" result = await self.search_logs(search_query, limit) if optimize_response: # Apply size optimization result = self.optimize_response_size( result, max_log_entries=50, max_message_length=150 ) return result async def _execute_websocket_query(self, request: Dict) -> List[Dict]: """Execute query via WebSocket""" logs = [] headers = {} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" try: async with websockets.connect( self.ws_url, extra_headers=headers if headers else None, ping_interval=20, ) as websocket: # Send query await websocket.send(json.dumps(request)) # Collect responses start = datetime.now() while (datetime.now() - start).seconds < 30: try: response = await asyncio.wait_for(websocket.recv(), timeout=5.0) if response: try: log_entry = json.loads(response) logs.append(log_entry) except json.JSONDecodeError: logger.warning(f"Non-JSON response: {response[:100]}") except asyncio.TimeoutError: break except websockets.exceptions.ConnectionClosed: break except Exception as e: logger.error(f"WebSocket error: {e}") raise return logs async def _execute_http_query(self, request: Dict) -> List[Dict]: """Fallback HTTP query execution""" logs = [] headers = {"Content-Type": "application/json"} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" try: async with aiohttp.ClientSession() as session: url = f"{self.http_url}/logs/export" params = {"format": "json", "query": json.dumps(request)} logger.debug(f"HTTP GET {url} with params: {params}") logger.debug(f"HTTP headers: {headers}") async with session.get(url, params=params, headers=headers) as response: response_text = await response.text() logger.debug(f"HTTP response status: {response.status}") logger.debug(f"HTTP response headers: {dict(response.headers)}") logger.debug( f"HTTP response body (first 500 chars): {response_text[:500]}" ) if response.status == 200: try: data = json.loads(response_text) logs = data.get("logs", []) logger.debug( f"Successfully parsed JSON: {len(logs)} logs found" ) if not logs: logger.warning( f"HTTP response had no logs. Full response: {data}" ) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response: {e}") logger.error(f"Raw response: {response_text}") else: logger.error(f"HTTP query failed with status {response.status}") logger.error(f"Error response body: {response_text}") except Exception as e: logger.error(f"HTTP query failed: {e}") return logs def _analyze_patterns(self, logs: List[Dict]) -> List[Dict]: """Analyze log patterns""" patterns = [] if not logs: return patterns # Error clustering error_clusters = defaultdict(list) for log in logs: if log.get("level") in ["ERROR", "FATAL"]: msg = log.get("message", "") # Normalize for clustering normalized = re.sub(r"\b\d+\b", "N", msg) normalized = re.sub(r"\b[0-9a-f]{8,}\b", "HEX", normalized)[:100] error_clusters[normalized].append(log) # Create patterns for cluster_key, cluster_logs in error_clusters.items(): if len(cluster_logs) >= 2: patterns.append( { "type": "repeated_error", "pattern": cluster_key[:50], "count": len(cluster_logs), "hosts": list(set(l.get("host", "") for l in cluster_logs)), "services": list( set(l.get("service", "") for l in cluster_logs) ), } ) # Detect bursts time_buckets = defaultdict(list) for log in logs: if "timestamp" in log: try: ts = datetime.fromisoformat(log["timestamp"].replace("Z", "+00:00")) bucket = ts.strftime("%Y-%m-%d %H:%M") time_buckets[bucket].append(log) except: continue for bucket, bucket_logs in time_buckets.items(): if len(bucket_logs) > 50: patterns.append( { "type": "burst", "time": bucket, "count": len(bucket_logs), "error_count": sum( 1 for l in bucket_logs if l.get("level") in ["ERROR", "FATAL"] ), } ) return patterns def _generate_insights(self, logs: List[Dict], patterns: List[Dict]) -> Dict: """Generate insights from logs and patterns""" insights = {"summary": "", "severity": "normal", "recommendations": []} if not logs: insights["summary"] = "No logs found matching the search criteria" return insights # Calculate metrics total = len(logs) errors = sum(1 for l in logs if l.get("level") == "ERROR") fatals = sum(1 for l in logs if l.get("level") == "FATAL") # Determine severity if fatals > 0: insights["severity"] = "critical" insights["summary"] = f"CRITICAL: {fatals} fatal errors found" elif errors > 20: insights["severity"] = "high" insights["summary"] = f"HIGH: {errors} errors detected" elif errors > 5: insights["severity"] = "medium" insights["summary"] = f"MEDIUM: {errors} errors found" else: insights["summary"] = f"Analyzed {total} logs" # Generate recommendations for pattern in patterns[:3]: if pattern["type"] == "repeated_error": insights["recommendations"].append( f"Investigate repeated error on {len(pattern['hosts'])} hosts" ) elif pattern["type"] == "burst": insights["recommendations"].append( f"Check event at {pattern['time']} ({pattern['count']} logs)" ) return insights class CephServiceTranslator: """Translate Ceph service names to systemd service names""" @staticmethod def translate_service_name(service_name: str) -> str: """Translate Ceph service names to systemd service names Examples: - osd.12 -> ceph-osd@12.service - mon.hostname -> ceph-mon@hostname.service - mgr.node1 -> ceph-mgr@node1.service """ import re # Handle Ceph OSD services: osd.12 -> ceph-osd@12.service osd_match = re.match(r"^osd\.(\d+)$", service_name) if osd_match: osd_id = osd_match.group(1) return f"ceph-osd@{osd_id}.service" # Handle Ceph MON services: mon.hostname -> ceph-mon@hostname.service mon_match = re.match(r"^mon\.(.+)$", service_name) if mon_match: mon_id = mon_match.group(1) return f"ceph-mon@{mon_id}.service" # Handle Ceph MGR services: mgr.hostname -> ceph-mgr@hostname.service mgr_match = re.match(r"^mgr\.(.+)$", service_name) if mgr_match: mgr_id = mgr_match.group(1) return f"ceph-mgr@{mgr_id}.service" # Handle Ceph MDS services: mds.hostname -> ceph-mds@hostname.service mds_match = re.match(r"^mds\.(.+)$", service_name) if mds_match: mds_id = mds_match.group(1) return f"ceph-mds@{mds_id}.service" # Handle Ceph RGW services: rgw.hostname -> ceph-radosgw@hostname.service rgw_match = re.match(r"^rgw\.(.+)$", service_name) if rgw_match: rgw_id = rgw_match.group(1) return f"ceph-radosgw@{rgw_id}.service" # If no translation needed, return as-is (might already be systemd format) return service_name @staticmethod def detect_ceph_services_in_text(text: str) -> List[str]: """Detect Ceph service references in natural language text""" import re services = [] # Look for patterns like "osd.12", "mon.host1", etc. ceph_service_pattern = r"\b(osd|mon|mgr|mds|rgw)\.[\w\-\.]+\b" matches = re.findall(ceph_service_pattern, text, re.IGNORECASE) for match in matches: full_match = re.search( rf"\b{re.escape(match)}\.[\w\-\.]+\b", text, re.IGNORECASE ) if full_match: services.append(full_match.group()) return services class CephDebugTemplates: """Pre-built templates for common Ceph debugging scenarios""" @staticmethod def get_templates() -> Dict[str, Dict]: """Get all available debug templates""" return { "osd_health_check": { "name": "OSD Health Check", "description": "Check for OSD failures, flapping, and performance issues", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_regex": "ceph-osd@.*"}}, {"PRIORITY": {"_lte": 4}}, ] } }, "hours_back": 24, "limit": 100, }, "cluster_status_errors": { "name": "Cluster Status Errors", "description": "Find critical cluster-wide errors and warnings", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}}, {"PRIORITY": {"_lte": 3}}, {"MESSAGE": {"_regex": "(error|fail|critical|emergency)"}}, ] } }, "hours_back": 48, "limit": 50, }, "slow_requests": { "name": "Slow Request Analysis", "description": "Identify slow operations and blocked requests", "query": { "where": { "_and": [ {"MESSAGE": {"_contains": "slow request"}}, {"PRIORITY": {"_lte": 5}}, ] } }, "hours_back": 12, "limit": 200, }, "pg_issues": { "name": "Placement Group Issues", "description": "Find PG-related problems: inconsistent, incomplete, degraded", "query": { "where": { "_and": [ {"MESSAGE": {"_regex": "(pg|placement.?group)"}}, { "MESSAGE": { "_regex": "(inconsistent|incomplete|degraded|stuck|unclean)" } }, {"PRIORITY": {"_lte": 4}}, ] } }, "hours_back": 72, "limit": 100, }, "network_errors": { "name": "Network Connectivity Issues", "description": "Detect network timeouts, connection failures, and heartbeat issues", "query": { "where": { "_and": [ { "MESSAGE": { "_regex": "(network|connection|timeout|heartbeat|unreachable)" } }, {"PRIORITY": {"_lte": 4}}, ] } }, "hours_back": 24, "limit": 150, }, "mon_election": { "name": "Monitor Election Issues", "description": "Check for monitor election problems and quorum issues", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}}, {"MESSAGE": {"_regex": "(election|quorum|leader|paxos)"}}, {"PRIORITY": {"_lte": 5}}, ] } }, "hours_back": 24, "limit": 100, }, "storage_errors": { "name": "Storage Hardware Errors", "description": "Find disk errors, SMART failures, and storage subsystem issues", "query": { "where": { "_and": [ { "MESSAGE": { "_regex": "(disk|storage|smart|hardware|device)" } }, {"MESSAGE": {"_regex": "(error|fail|abort|timeout)"}}, {"PRIORITY": {"_lte": 4}}, ] } }, "hours_back": 168, # 1 week for hardware issues "limit": 100, }, "kernel_ceph_errors": { "name": "Kernel Ceph Issues", "description": "Check kernel-level Ceph messages and errors", "query": { "where": { "_and": [ {"_TRANSPORT": {"_eq": "kernel"}}, {"MESSAGE": {"_regex": "(ceph|rbd|rados)"}}, {"PRIORITY": {"_lte": 4}}, ] } }, "hours_back": 48, "limit": 100, }, "rbd_mapping_issues": { "name": "RBD Mapping Problems", "description": "Find RBD image mapping/unmapping issues and client problems", "query": { "where": { "_and": [ {"MESSAGE": {"_contains": "rbd"}}, {"MESSAGE": {"_regex": "(map|unmap|mount|unmount|client)"}}, {"PRIORITY": {"_lte": 5}}, ] } }, "hours_back": 24, "limit": 100, }, "recent_startup": { "name": "Recent Service Startups", "description": "Check recent Ceph service startups and initialization", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_regex": "ceph-.*"}}, {"MESSAGE": {"_regex": "(start|init|boot|mount|active)"}}, {"PRIORITY": {"_lte": 6}}, ] } }, "hours_back": 6, "limit": 200, }, "specific_osd_analysis": { "name": "Specific OSD Analysis (Ceph-friendly syntax)", "description": "Analyze specific OSD using natural Ceph syntax (e.g., 'osd.12')", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@12"}}, {"PRIORITY": {"_lte": 5}}, ] } }, "hours_back": 48, "limit": 150, "user_friendly_example": "Search for 'osd.12 issues' - automatically translates to systemd service name", }, "mon_specific_debugging": { "name": "Monitor Service Debugging (Ceph-friendly syntax)", "description": "Debug specific monitor using natural Ceph syntax (e.g., 'mon.node1')", "query": { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon@node1"}}, {"PRIORITY": {"_lte": 4}}, { "MESSAGE": { "_regex": "(error|warn|fail|election|quorum)" } }, ] } }, "hours_back": 24, "limit": 100, "user_friendly_example": "Search for 'mon.node1 election problems' - auto-translates service names", }, "ceph_service_translation_showcase": { "name": "Ceph Service Translation Examples", "description": "Showcase automatic translation of Ceph service names to systemd format", "examples": { "osd.12": "Translates to ceph-osd@12.service", "mon.hostname": "Translates to ceph-mon@hostname.service", "mgr.node1": "Translates to ceph-mgr@node1.service", "mds.fs-node": "Translates to ceph-mds@fs-node.service", "rgw.gateway": "Translates to ceph-radosgw@gateway.service", }, "usage_examples": [ "Search: 'osd.5 slow requests' → Automatically finds ceph-osd@5.service logs", "Search: 'mon.ceph01 election' → Automatically finds ceph-mon@ceph01.service logs", "Search: 'mgr.primary errors' → Automatically finds ceph-mgr@primary.service logs", ], "query": { "where": { "_and": [ { "_SYSTEMD_UNIT": { "_regex": "ceph-(osd|mon|mgr|mds|radosgw)@.*" } }, {"PRIORITY": {"_lte": 6}}, ] } }, "hours_back": 12, "limit": 100, }, } @staticmethod def get_template_by_scenario(scenario: str) -> Optional[Dict]: """Get a specific template by scenario name""" templates = CephDebugTemplates.get_templates() return templates.get(scenario) @staticmethod def list_scenarios() -> List[str]: """List all available debug scenarios""" return list(CephDebugTemplates.get_templates().keys()) @staticmethod def search_templates(keyword: str) -> List[Dict]: """Search templates by keyword in name or description""" templates = CephDebugTemplates.get_templates() results = [] keyword_lower = keyword.lower() for template_id, template in templates.items(): if ( keyword_lower in template["name"].lower() or keyword_lower in template["description"].lower() ): results.append({"id": template_id, "template": template}) return results class ServerIDDetector: """Auto-detect available server IDs and suggest optimal filters""" def __init__(self, client): self.client = client self.server_cache = {} self.cache_timestamp = None self.cache_ttl = 3600 # 1 hour cache async def detect_servers(self, force_refresh: bool = False) -> Dict[str, Any]: """Detect all available server IDs from recent logs""" # Check cache if ( not force_refresh and self.cache_timestamp and (datetime.now() - self.cache_timestamp).seconds < self.cache_ttl ): return self.server_cache # Query recent logs to find server IDs detection_query = { "type": "query", "start": int((datetime.now() - timedelta(hours=24)).timestamp()), "end": int(datetime.now().timestamp()), "query": { "where": {"_search": ""}, # Get any logs to analyze server distribution "limit": 1000, }, } try: logs = await self.client._execute_http_query(detection_query) server_info = self._analyze_server_distribution(logs) self.server_cache = server_info self.cache_timestamp = datetime.now() return server_info except Exception as e: logger.error(f"Server detection failed: {e}") return {} def _analyze_server_distribution(self, logs: List[Dict]) -> Dict[str, Any]: """Analyze server distribution from logs""" server_counts = Counter() server_services = defaultdict(set) server_hostnames = {} for log in logs: # Try both field names server_id = log.get("CROIT_SERVERID") or log.get("CROIT_SERVER_ID") if server_id: server_counts[str(server_id)] += 1 # Track services per server unit = log.get("_SYSTEMD_UNIT", "unknown") server_services[str(server_id)].add(unit) # Track hostnames hostname = log.get("_HOSTNAME") if hostname and server_id not in server_hostnames: server_hostnames[str(server_id)] = hostname # Generate server analysis total_logs = len(logs) servers = {} for server_id, count in server_counts.items(): servers[server_id] = { "log_count": count, "log_percentage": ( round((count / total_logs) * 100, 1) if total_logs > 0 else 0 ), "services": list(server_services[server_id]), "hostname": server_hostnames.get(server_id, "unknown"), "active": count > 10, # Consider active if > 10 logs in 24h } return { "servers": servers, "total_servers": len(servers), "most_active": ( max(server_counts.keys(), key=server_counts.get) if server_counts else None ), "detection_timestamp": datetime.now().isoformat(), "logs_analyzed": total_logs, } def suggest_server_filter( self, intent: str, server_info: Dict = None ) -> Optional[Dict]: """Suggest server-specific filters based on intent""" if not server_info: return None servers = server_info.get("servers", {}) if not servers: return None intent_lower = intent.lower() # Specific server mentioned for server_id in servers.keys(): if ( f"server {server_id}" in intent_lower or f"node {server_id}" in intent_lower ): return { "type": "specific_server", "server_id": server_id, "filter": {"CROIT_SERVERID": {"_eq": server_id}}, "reason": f"User mentioned server {server_id}", } # Hostname mentioned for server_id, info in servers.items(): hostname = info.get("hostname", "").lower() if hostname and hostname != "unknown" and hostname in intent_lower: return { "type": "hostname_match", "server_id": server_id, "hostname": hostname, "filter": {"CROIT_SERVERID": {"_eq": server_id}}, "reason": f"User mentioned hostname {hostname}", } # Service-specific suggestions if "osd" in intent_lower: osd_servers = [ server_id for server_id, info in servers.items() if any("ceph-osd" in service for service in info.get("services", [])) ] if len(osd_servers) == 1: return { "type": "service_specific", "server_id": osd_servers[0], "filter": {"CROIT_SERVERID": {"_eq": osd_servers[0]}}, "reason": f"Only server {osd_servers[0]} runs OSD services", } # Performance-based suggestions if any( word in intent_lower for word in ["slow", "performance", "issue", "problem"] ): # Suggest the most active server for performance issues most_active = server_info.get("most_active") if most_active: return { "type": "performance_focus", "server_id": most_active, "filter": {"CROIT_SERVERID": {"_eq": most_active}}, "reason": f'Server {most_active} is most active ({servers[most_active]["log_percentage"]}% of logs)', } return None def get_server_summary(self, server_info: Dict = None) -> str: """Generate human-readable server summary""" if not server_info or not server_info.get("servers"): return "No servers detected in recent logs" servers = server_info["servers"] total = server_info["total_servers"] most_active = server_info.get("most_active") lines = [f"🖥️ Detected {total} active server(s):"] for server_id, info in sorted(servers.items()): hostname = info.get("hostname", "unknown") log_count = info.get("log_count", 0) percentage = info.get("log_percentage", 0) services = len(info.get("services", [])) status = "🟢" if info.get("active", False) else "🟡" lines.append( f"{status} Server {server_id} ({hostname}): {log_count:,} logs ({percentage}%), {services} services" ) if most_active: lines.append(f"📈 Most active: Server {most_active}") return "\n".join(lines) class LogTransportAnalyzer: """Analyze available log transports and debug kernel log availability""" def __init__(self, client): self.client = client async def analyze_transports(self, hours_back: int = 24) -> Dict[str, Any]: """Analyze what transport types are available in the logs""" # Query recent logs to analyze transports analysis_query = { "type": "query", "start": int((datetime.now() - timedelta(hours=hours_back)).timestamp()), "end": int(datetime.now().timestamp()), "query": { "where": { "_search": "" # Get any logs to analyze transport distribution }, "limit": 2000, # Larger sample for transport analysis }, } try: logs = await self.client._execute_http_query(analysis_query) return self._analyze_transport_distribution(logs) except Exception as e: logger.error(f"Transport analysis failed: {e}") return {} def _analyze_transport_distribution(self, logs: List[Dict]) -> Dict[str, Any]: """Analyze transport field distribution in logs""" transport_counts = Counter() transport_priorities = defaultdict(Counter) transport_services = defaultdict(set) sample_messages = defaultdict(list) for log in logs: # Check all possible transport field names transport = ( log.get("_TRANSPORT") or log.get("TRANSPORT") or log.get("transport") or "unknown" ) transport_counts[transport] += 1 # Track priority distribution per transport priority = log.get("PRIORITY", 6) transport_priorities[transport][priority] += 1 # Track services per transport service = log.get("_SYSTEMD_UNIT", log.get("SYSLOG_IDENTIFIER", "unknown")) transport_services[transport].add(service) # Collect sample messages (first 3 per transport) if len(sample_messages[transport]) < 3: message = log.get("MESSAGE", "")[:100] if message: sample_messages[transport].append(message) # Generate analysis total_logs = len(logs) analysis = { "total_logs_analyzed": total_logs, "transports_found": len(transport_counts), "transport_distribution": dict(transport_counts), "analysis_timestamp": datetime.now().isoformat(), } # Detailed transport info transport_details = {} for transport, count in transport_counts.items(): priority_dist = dict(transport_priorities[transport]) services = list(transport_services[transport]) transport_details[transport] = { "log_count": count, "percentage": ( round((count / total_logs) * 100, 1) if total_logs > 0 else 0 ), "priority_distribution": priority_dist, "services": services[:10], # Top 10 services "sample_messages": sample_messages[transport], "critical_logs": priority_dist.get(0, 0) + priority_dist.get(1, 0) + priority_dist.get(2, 0) + priority_dist.get(3, 0), } analysis["transport_details"] = transport_details # Kernel log investigation kernel_transports = [ t for t in transport_counts.keys() if "kernel" in t.lower() ] syslog_count = transport_counts.get("syslog", 0) journal_count = transport_counts.get("journal", 0) analysis["kernel_investigation"] = { "kernel_transports_found": kernel_transports, "kernel_direct_count": transport_counts.get("kernel", 0), "syslog_count": syslog_count, "journal_count": journal_count, "recommendation": self._recommend_kernel_query_strategy(transport_counts), } return analysis def _recommend_kernel_query_strategy(self, transport_counts: Counter) -> str: """Recommend the best strategy to find kernel logs""" kernel_direct = transport_counts.get("kernel", 0) syslog_count = transport_counts.get("syslog", 0) journal_count = transport_counts.get("journal", 0) if kernel_direct > 0: return "Use _TRANSPORT: 'kernel' - direct kernel logs found" elif syslog_count > 0: return "Try _TRANSPORT: 'syslog' with SYSLOG_IDENTIFIER: 'kernel' - kernel logs likely in syslog" elif journal_count > 0: return "Try _TRANSPORT: 'journal' with systemd journal filtering - kernel logs in journal" else: available = list(transport_counts.keys()) return f"No kernel transport found. Available: {available}. Try SYSLOG_IDENTIFIER filtering instead." async def find_kernel_logs( self, hours_back: int = 24, limit: int = 100 ) -> Dict[str, Any]: """Try multiple strategies to find kernel logs""" strategies = [ { "name": "Direct kernel transport", "query": { "where": { "_and": [ {"_TRANSPORT": {"_eq": "kernel"}}, {"PRIORITY": {"_lte": 5}}, ] } }, }, { "name": "Syslog with kernel identifier", "query": { "where": { "_and": [ {"_TRANSPORT": {"_eq": "syslog"}}, {"SYSLOG_IDENTIFIER": {"_eq": "kernel"}}, {"PRIORITY": {"_lte": 5}}, ] } }, }, { "name": "Kernel in message content", "query": { "where": { "_and": [ {"MESSAGE": {"_contains": "kernel"}}, {"PRIORITY": {"_lte": 4}}, ] } }, }, { "name": "Hardware/driver messages", "query": { "where": { "_and": [ { "MESSAGE": { "_regex": "(hardware|driver|device|disk|network)" } }, {"PRIORITY": {"_lte": 4}}, ] } }, }, ] results = {} for strategy in strategies: try: query = { "type": "query", "start": int( (datetime.now() - timedelta(hours=hours_back)).timestamp() ), "end": int(datetime.now().timestamp()), "query": {**strategy["query"], "limit": limit}, } logs = await self.client._execute_http_query(query) results[strategy["name"]] = { "success": len(logs) > 0, "log_count": len(logs), "sample_messages": [ log.get("MESSAGE", "")[:100] for log in logs[:3] ], "transports_found": list( set(log.get("_TRANSPORT", "unknown") for log in logs) ), "query_used": strategy["query"], } except Exception as e: results[strategy["name"]] = { "success": False, "error": str(e), "query_used": strategy["query"], } return { "kernel_search_results": results, "recommendations": self._generate_kernel_recommendations(results), "search_timestamp": datetime.now().isoformat(), } def _generate_kernel_recommendations(self, results: Dict) -> List[str]: """Generate recommendations based on kernel search results""" recommendations = [] successful_strategies = [ name for name, result in results.items() if result.get("success", False) ] if successful_strategies: best_strategy = max( successful_strategies, key=lambda x: results[x].get("log_count", 0) ) recommendations.append(f"✅ Best kernel log strategy: {best_strategy}") recommendations.append( f"Found {results[best_strategy]['log_count']} logs with this method" ) else: recommendations.append("❌ No kernel logs found with standard methods") recommendations.append( "💡 Try checking VictoriaLogs configuration for kernel log ingestion" ) recommendations.append( "🔍 Consider using broader searches with hardware/system keywords" ) return recommendations class LogSummaryEngine: """Intelligent log summarization and critical event prioritization""" def __init__(self): self.critical_keywords = [ "failed", "error", "crash", "panic", "fatal", "abort", "exception", "timeout", "unreachable", "down", "offline", "corruption", "loss", ] self.priority_levels = { 0: "EMERGENCY", 1: "ALERT", 2: "CRITICAL", 3: "ERROR", 4: "WARNING", 5: "NOTICE", 6: "INFO", 7: "DEBUG", } def summarize_logs(self, logs: List[Dict], max_details: int = 10) -> Dict: """Create intelligent summary with critical events first""" if not logs: return { "summary": "No logs found", "total_logs": 0, "critical_events": [], "trends": {}, "recommendations": [], } total_logs = len(logs) # Priority analysis priority_stats = self._analyze_priorities(logs) # Service analysis service_stats = self._analyze_services(logs) # Critical events (prioritized) critical_events = self._extract_critical_events(logs, max_details) # Time-based trends trends = self._analyze_trends(logs) # Generate summary text summary_text = self._generate_summary_text( total_logs, priority_stats, service_stats, critical_events ) # Smart recommendations recommendations = self._generate_recommendations( priority_stats, service_stats, critical_events, trends ) return { "summary": summary_text, "total_logs": total_logs, "priority_breakdown": priority_stats, "service_breakdown": service_stats, "critical_events": critical_events, "trends": trends, "recommendations": recommendations, "time_range": self._get_time_range(logs), } def _analyze_priorities(self, logs: List[Dict]) -> Dict: """Analyze log priority distribution""" priority_counts = Counter() for log in logs: priority = log.get("PRIORITY", 6) # Default to INFO if missing priority_name = self.priority_levels.get(priority, f"LEVEL_{priority}") priority_counts[priority_name] += 1 return dict(priority_counts) def _analyze_services(self, logs: List[Dict]) -> Dict: """Analyze service/unit distribution""" service_counts = Counter() for log in logs: unit = log.get("_SYSTEMD_UNIT", log.get("SYSLOG_IDENTIFIER", "unknown")) service_counts[unit] += 1 return dict(service_counts.most_common(10)) # Top 10 services def _extract_critical_events(self, logs: List[Dict], max_events: int) -> List[Dict]: """Extract and prioritize critical events""" critical_logs = [] for log in logs: priority = log.get("PRIORITY", 6) message = log.get("MESSAGE", "").lower() # Score criticality (lower = more critical) criticality_score = priority * 10 # Base on priority # Boost score for critical keywords for keyword in self.critical_keywords: if keyword in message: criticality_score -= 20 # Boost score for OSD-specific issues if "osd" in message and any( word in message for word in ["failed", "down", "crash"] ): criticality_score -= 15 critical_logs.append( { "log": log, "score": criticality_score, "timestamp": log.get("__REALTIME_TIMESTAMP", ""), "service": log.get("_SYSTEMD_UNIT", "unknown"), "priority": self.priority_levels.get(priority, f"LEVEL_{priority}"), "message_preview": ( log.get("MESSAGE", "")[:100] + "..." if len(log.get("MESSAGE", "")) > 100 else log.get("MESSAGE", "") ), } ) # Sort by criticality (lowest score = most critical) critical_logs.sort(key=lambda x: x["score"]) return critical_logs[:max_events] def _analyze_trends(self, logs: List[Dict]) -> Dict: """Analyze time-based trends and patterns""" if not logs: return {} # Group by hour for trend analysis hourly_counts = Counter() service_trends = defaultdict(Counter) for log in logs: timestamp = log.get("__REALTIME_TIMESTAMP") if timestamp: try: # Convert microseconds to datetime dt = datetime.fromtimestamp(int(timestamp) / 1000000) hour_key = dt.strftime("%Y-%m-%d %H:00") hourly_counts[hour_key] += 1 service = log.get("_SYSTEMD_UNIT", "unknown") service_trends[service][hour_key] += 1 except (ValueError, OverflowError): continue # Find peak hours peak_hours = hourly_counts.most_common(3) return { "hourly_distribution": dict(hourly_counts), "peak_hours": peak_hours, "active_services": len(service_trends), "busiest_service": ( max( service_trends.keys(), key=lambda s: sum(service_trends[s].values()) ) if service_trends else None ), } def _generate_summary_text( self, total_logs: int, priority_stats: Dict, service_stats: Dict, critical_events: List, ) -> str: """Generate human-readable summary""" lines = [] lines.append(f"📊 **Log Analysis Summary** - {total_logs:,} total entries") # Priority summary if priority_stats: critical_count = ( priority_stats.get("CRITICAL", 0) + priority_stats.get("EMERGENCY", 0) + priority_stats.get("ALERT", 0) ) error_count = priority_stats.get("ERROR", 0) warning_count = priority_stats.get("WARNING", 0) if critical_count > 0: lines.append(f"🚨 {critical_count} critical/emergency events") if error_count > 0: lines.append(f"❌ {error_count} errors") if warning_count > 0: lines.append(f"⚠️ {warning_count} warnings") # Top services if service_stats: top_service = list(service_stats.keys())[0] top_count = service_stats[top_service] lines.append(f"🔧 Most active: {top_service} ({top_count} logs)") # Critical events preview if critical_events: lines.append(f"⚡ {len(critical_events)} high-priority events identified") return "\n".join(lines) def _generate_recommendations( self, priority_stats: Dict, service_stats: Dict, critical_events: List, trends: Dict, ) -> List[str]: """Generate actionable recommendations""" recommendations = [] # Priority-based recommendations critical_total = ( priority_stats.get("CRITICAL", 0) + priority_stats.get("EMERGENCY", 0) + priority_stats.get("ALERT", 0) ) if critical_total > 5: recommendations.append( f"🚨 Immediate attention needed: {critical_total} critical events" ) error_count = priority_stats.get("ERROR", 0) if error_count > 20: recommendations.append( f"🔍 Investigate error patterns: {error_count} errors found" ) # Service-based recommendations if service_stats: ceph_services = { k: v for k, v in service_stats.items() if "ceph" in k.lower() } if ceph_services: total_ceph_logs = sum(ceph_services.values()) if total_ceph_logs > len(service_stats) * 0.7: # Ceph dominates logs recommendations.append( "🐙 High Ceph activity detected - monitor cluster health" ) # Critical events recommendations if critical_events: osd_issues = [ e for e in critical_events if "osd" in e["message_preview"].lower() ] if len(osd_issues) > 3: recommendations.append( "💾 Multiple OSD issues detected - check storage health" ) # Trends recommendations if trends.get("peak_hours"): recommendations.append( f"📈 Peak activity: {trends['peak_hours'][0][0]} - review load patterns" ) return recommendations def _get_time_range(self, logs: List[Dict]) -> Dict: """Calculate actual time range of logs""" timestamps = [] for log in logs: timestamp = log.get("__REALTIME_TIMESTAMP") if timestamp: try: timestamps.append(int(timestamp) / 1000000) except (ValueError, OverflowError): continue if not timestamps: return {} start_time = min(timestamps) end_time = max(timestamps) return { "start": datetime.fromtimestamp(start_time).isoformat(), "end": datetime.fromtimestamp(end_time).isoformat(), "duration_hours": round((end_time - start_time) / 3600, 2), } async def _extract_logs_from_zip(zip_data: bytes) -> List[Dict]: """Extract log entries from ZIP file""" import zipfile import io import json logs = [] try: with zipfile.ZipFile(io.BytesIO(zip_data)) as zf: for filename in zf.namelist(): logger.debug(f"Processing ZIP file: {filename}") with zf.open(filename) as file: content = file.read().decode("utf-8") lines = content.strip().split("\n") for line_num, line in enumerate(lines): if line.strip(): try: log_entry = json.loads(line) logs.append(log_entry) except json.JSONDecodeError as e: logger.warning( f"Failed to parse log line {line_num}: {e}" ) # Add as raw text if JSON parsing fails logs.append( { "raw_message": line, "parse_error": str(e), "line_number": line_num, } ) except Exception as e: logger.error(f"Failed to extract logs from ZIP: {e}") return logs async def _execute_croit_http_export( host: str, port: int, api_token: str, use_ssl: bool, query: Dict ) -> Dict: """Execute Croit log query via HTTP /api/logs/export endpoint""" import aiohttp # Build HTTP URL http_protocol = "https" if use_ssl else "http" url = f"{http_protocol}://{host}:{port}/api/logs/export" headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json", } params = { "format": "RAW", # Use RAW format as discovered "query": json.dumps(query), } logger.debug(f"HTTP GET {url}") logger.debug(f"Params: {params}") logger.debug(f"Headers: {headers}") try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, headers=headers) as response: logger.debug(f"HTTP response status: {response.status}") logger.debug(f"HTTP response headers: {dict(response.headers)}") if response.status == 200: response_json = await response.json() logger.debug(f"HTTP response JSON: {response_json}") # The response contains a download URL to a ZIP file download_url = response_json.get("url") if download_url: logger.debug(f"Downloading logs from: {download_url}") # Download and extract the ZIP file async with session.get( download_url, headers={"Authorization": f"Bearer {api_token}"}, ) as zip_response: if zip_response.status == 200: zip_data = await zip_response.read() logger.debug( f"Downloaded ZIP file: {len(zip_data)} bytes" ) # Extract logs from ZIP logs = await _extract_logs_from_zip(zip_data) logger.debug( f"Extracted {len(logs)} log entries from ZIP" ) return { "logs": logs, "control_messages": [ { "type": "success", "message": f"Downloaded {len(logs)} logs", } ], "download_info": response_json, } else: logger.error( f"Failed to download logs: {zip_response.status}" ) return { "logs": [], "control_messages": [ { "type": "error", "message": f"Download failed: {zip_response.status}", } ], } else: logger.error("No download URL in response") return { "logs": [], "control_messages": [ {"type": "error", "message": "No download URL"} ], } else: error_text = await response.text() logger.error(f"HTTP query failed: {response.status} - {error_text}") return { "logs": [], "control_messages": [ { "type": "error", "message": f"HTTP {response.status}: {error_text}", } ], } except Exception as e: logger.error(f"HTTP query exception: {e}") return {"logs": [], "control_messages": [{"type": "error", "message": str(e)}]} async def _execute_croit_websocket( host: str, port: int, api_token: str, use_ssl: bool, query: Dict ) -> List[Dict]: """Execute direct Croit WebSocket query with VictoriaLogs JSON format""" import json import asyncio import websockets # Build WebSocket URL with token authentication ws_protocol = "wss" if use_ssl else "ws" if api_token: ws_url = f"{ws_protocol}://{host}:{port}/api/logs?token={api_token}" logger.debug(f"Using query param authentication") else: ws_url = f"{ws_protocol}://{host}:{port}/api/logs" logger.warning("No API token provided for WebSocket authentication") logs = [] control_messages = [] logger.debug(f"Attempting WebSocket connection to: {ws_url}") try: async with websockets.connect(ws_url, ping_interval=20) as websocket: logger.debug(f"WebSocket connection established successfully") # Send Croit JSON query directly (auth via URL params) query_json = json.dumps(query, indent=2) logger.debug(f"Sending WebSocket query: {query_json}") await websocket.send(query_json) logger.debug("Query sent successfully") # Collect responses with longer timeout for query param auth start_time = asyncio.get_event_loop().time() while ( asyncio.get_event_loop().time() - start_time ) < 45: # Increased timeout try: response = await asyncio.wait_for(websocket.recv(), timeout=5.0) if response: logger.debug(f"WebSocket response: {response[:200]}...") # Handle control messages if response == "clear": control_messages.append( {"type": "clear", "message": "Log display cleared"} ) logger.debug("Received 'clear' control message") elif response == "empty": control_messages.append( { "type": "empty", "message": "No logs found for current query", } ) logger.debug( "Received 'empty' control message - no logs found" ) elif response == "too_wide": control_messages.append( { "type": "too_wide", "message": "Query too broad (>1M logs), please add more filters", } ) logger.debug("Received 'too_wide' control message") elif response.startswith("hits:"): try: hits_data = ( json.loads(response[5:].strip()) if response[5:].strip() != "null" else None ) control_messages.append( {"type": "hits", "data": hits_data} ) logger.debug(f"Received hits data: {hits_data}") except json.JSONDecodeError: logger.warning(f"Failed to parse hits data: {response}") elif response.startswith("error:"): error_msg = response[6:].strip() control_messages.append( {"type": "error", "message": error_msg} ) logger.error(f"VictoriaLogs error: {error_msg}") else: # Regular log entry try: log_entry = json.loads(response) logs.append(log_entry) logger.debug( f"Added log entry {len(logs)}: {log_entry.get('timestamp', 'no-timestamp')}" ) except json.JSONDecodeError: logger.warning(f"Non-JSON response: {response[:100]}") except asyncio.TimeoutError: break except websockets.exceptions.ConnectionClosed: break except Exception as e: logger.error(f"WebSocket query failed: {e}") raise logger.debug( f"WebSocket query completed: {len(logs)} logs, {len(control_messages)} control messages" ) return {"logs": logs, "control_messages": control_messages} # Integration functions for MCP Server async def handle_log_search( arguments: Dict, host: str, port: int = 8080 ) -> Dict[str, Any]: """Handle direct VictoriaLogs JSON query""" import time from datetime import datetime, timedelta where_clause = arguments.get("where") search_text = arguments.get("_search", "") limit = arguments.get("limit", 1000) after = arguments.get("after", 0) hours_back = arguments.get("hours_back", 1) start_timestamp = arguments.get("start_timestamp") end_timestamp = arguments.get("end_timestamp") api_token = arguments.get("api_token") use_ssl = arguments.get("use_ssl", False) if not where_clause: return {"code": 400, "error": "VictoriaLogs 'where' clause is required"} try: # Calculate time range if start_timestamp and end_timestamp: start = start_timestamp end = end_timestamp else: end = int(time.time()) start = end - (hours_back * 3600) # Build Croit WebSocket query - match working examples query_where = where_clause.copy() if where_clause else {} # Add _search to the where clause (always, even if empty string) query_where["_search"] = search_text croit_query = { "type": "query", "start": start, "end": end, "query": {"where": query_where, "after": after, "limit": limit}, } # Execute query via HTTP (not WebSocket!) logger.debug(f"Executing HTTP query to {host}:{port}") response = await _execute_croit_http_export( host, port, api_token, use_ssl, croit_query ) logs = response.get("logs", []) control_messages = response.get("control_messages", []) logger.debug( f"HTTP response summary: {len(logs)} logs, {len(control_messages)} control messages" ) if control_messages: logger.debug( f"Control messages received: {[msg.get('type', 'unknown') for msg in control_messages]}" ) if not logs and control_messages: logger.warning(f"No logs returned. Control messages: {control_messages}") # Calculate actual hours searched from timestamp difference actual_hours_searched = (end - start) / 3600.0 return { "code": 200, "result": { "logs": logs, "total_count": len(logs), "control_messages": control_messages, "time_range": { "start_timestamp": start, "end_timestamp": end, "hours_searched": actual_hours_searched, }, }, "debug": { "croit_query": croit_query, "where_clause": where_clause, "time_range_human": f"{datetime.fromtimestamp(start)} to {datetime.fromtimestamp(end)}", "timestamp_diff_seconds": end - start, "calculated_hours": actual_hours_searched, "input_hours_back": hours_back, }, } except Exception as e: logger.error(f"Log search failed: {e}") return { "code": 500, "error": str(e), "debug": { "attempted_query": croit_query if "croit_query" in locals() else None }, } async def handle_log_check( arguments: Dict, host: str, port: int = 8080 ) -> Dict[str, Any]: """ Check log conditions immediately (snapshot) - suitable for LLMs Returns results immediately instead of monitoring for a duration """ conditions = arguments.get("conditions", []) alert_threshold = arguments.get("threshold", 5) time_window = arguments.get("time_window", 300) # Check last 5 minutes by default api_token = arguments.get("api_token") use_ssl = arguments.get("use_ssl", False) if not conditions: return {"code": 400, "error": "Conditions are required"} try: client = CroitLogSearchClient(host, port, api_token, use_ssl) alerts = [] checks = [] # Check each condition ONCE for condition in conditions: # Add time window to condition enhanced_condition = f"{condition} in the last {time_window} seconds" result = await client.search_logs(enhanced_condition, limit=100) check_result = { "condition": condition, "count": result["total_count"], "threshold": alert_threshold, "triggered": result["total_count"] >= alert_threshold, "severity": ( result["insights"]["severity"] if result["total_count"] > 0 else "none" ), "timestamp": datetime.now().isoformat(), } checks.append(check_result) if check_result["triggered"]: alerts.append( { "condition": condition, "count": result["total_count"], "severity": result["insights"]["severity"], "sample_logs": ( result["results"][:3] if result["results"] else [] ), } ) return { "code": 200, "result": { "checks": checks, "alerts": alerts, "summary": f"{len(alerts)} of {len(conditions)} conditions triggered", "time_window": f"Last {time_window} seconds", "recommendation": ( "Run again later to check for changes" if alerts else "All clear" ), }, } except Exception as e: logger.error(f"Log check failed: {e}") return {"code": 500, "error": str(e)} # Keep for backwards compatibility but mark as deprecated async def handle_log_monitor( arguments: Dict, host: str, port: int = 8080 ) -> Dict[str, Any]: """DEPRECATED: Use handle_log_check instead - this blocks for too long""" # Redirect to log_check with a warning logger.warning("handle_log_monitor is deprecated - using handle_log_check instead") return await handle_log_check(arguments, host, port) # Tool definitions for MCP LOG_SEARCH_TOOLS = [ { "name": "croit_log_search", "description": """Search Croit/Ceph cluster logs using comprehensive VictoriaLogs JSON syntax. AVAILABLE FILTER FIELDS: • _SYSTEMD_UNIT: systemd service unit (string) - e.g. "ceph-mon", "ceph-osd@12" • PRIORITY: log priority/severity (integer: 0=emerg, 1=alert, 2=crit, 3=err, 4=warning, 5=notice, 6=info, 7=debug) • CROIT_SERVER_ID: specific Ceph node ID (string/integer) - e.g. "1", "2", "3" • CROIT_SERVERID: alternative field name for server ID (string/integer) - same as CROIT_SERVER_ID • MESSAGE: log message content (string) • _TRANSPORT: log transport method (string) - e.g. "kernel", "syslog", "journal" • _HOSTNAME: server hostname (string) - e.g. "storage-node-01" • _MACHINE_ID: unique machine identifier (string) • SYSLOG_IDENTIFIER: service identifier (string) - e.g. "ceph-osd" • THREAD: thread identifier (string) - e.g. "worker-1" • _search: full-text search across all fields (string) - searches within message content COMPARISON OPERATORS: STRING OPERATORS: • _eq: exact match {"field": {"_eq": "value"}} • _contains: substring match {"field": {"_contains": "substring"}} • _starts_with: prefix match {"MESSAGE": {"_starts_with": "ERROR:"}} • _ends_with: suffix match {"MESSAGE": {"_ends_with": "failed"}} NUMERIC OPERATORS: • _eq: exact equal {"PRIORITY": {"_eq": 4}} • _neq: not equal {"PRIORITY": {"_neq": 7}} • _gt: greater than {"PRIORITY": {"_gt": 3}} • _gte: greater than or equal {"PRIORITY": {"_gte": 4}} • _lt: less than {"PRIORITY": {"_lt": 6}} • _lte: less than or equal {"PRIORITY": {"_lte": 6}} LIST/ARRAY OPERATORS: • _in: value in list {"PRIORITY": {"_in": [0, 1, 2, 3]}} • _nin: value not in list {"CROIT_SERVERID": {"_nin": ["1", "2"]}} PATTERN OPERATORS: • _regex: regular expression match {"MESSAGE": {"_regex": "OSD\\.[0-9]+"}} EXISTENCE OPERATORS: • _exists: field exists {"OPTIONAL_FIELD": {"_exists": true}} • _missing: field missing {"OPTIONAL_FIELD": {"_missing": true}} LOGICAL OPERATORS: • _and: logical AND - all conditions must match • _or: logical OR - at least one condition must match • _not: negation - condition must NOT match CEPH SERVICE MAPPING: • OSD N: {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@N"}} • Monitor: {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}} • Manager: {"_SYSTEMD_UNIT": {"_contains": "ceph-mgr"}} • MDS: {"_SYSTEMD_UNIT": {"_contains": "ceph-mds"}} • RGW: {"_SYSTEMD_UNIT": {"_contains": "ceph-radosgw"}} TRANSPORT TYPES: • kernel: Kernel-level logs (hardware, drivers, low-level system) • syslog: Standard system logs • journal: Systemd journal logs COMPLEX QUERY EXAMPLES: 1. Monitor logs on server 1 (from your example): {"where": {"_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}}, {"PRIORITY": {"_lte": 6}}, {"CROIT_SERVERID": {"_eq": "1"}} ]}} 2. Kernel logs on server 1: {"where": {"_and": [ {"_TRANSPORT": {"_eq": "kernel"}}, {"PRIORITY": {"_lte": 6}}, {"CROIT_SERVERID": {"_eq": "1"}} ]}} 3. Kernel logs with error search (note _search is outside where clause): { "where": {"_and": [ {"_TRANSPORT": {"_eq": "kernel"}}, {"PRIORITY": {"_lte": 6}}, {"CROIT_SERVERID": {"_eq": "1"}} ]}, "_search": "error" } 4. OSD 12 errors and warnings only: {"where": {"_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@12"}}, {"PRIORITY": {"_lte": 4}} ]}} 5. Critical errors on specific server: {"where": {"_and": [ {"PRIORITY": {"_lte": 3}}, {"CROIT_SERVERID": {"_eq": "1"}} ]}} 3. Multiple priority levels (errors + warnings): {"where": {"PRIORITY": {"_in": [3, 4]}}} 4. Exclude specific servers from search: {"where": {"_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd"}}, {"CROIT_SERVERID": {"_not_in": ["1", "2", "3"]}} ]}} 5. Info and above (exclude debug): {"where": {"PRIORITY": {"_ne": 7}}} 6. Monitor OR manager logs with text search: {"where": {"_and": [ {"_or": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}}, {"_SYSTEMD_UNIT": {"_contains": "ceph-mgr"}} ]}, {"_search": "election"} ]}} 7. Range filtering - warnings to critical: {"where": {"_and": [ {"PRIORITY": {"_gte": 2}}, {"PRIORITY": {"_lte": 4}} ]}} 8. Error messages with specific prefix: {"where": {"_and": [ {"PRIORITY": {"_lt": 5}}, {"MESSAGE": {"_starts_with": "failed to"}} ]}} 9. Complex filtering with message content: {"where": {"_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd"}}, {"PRIORITY": {"_lte": 6}}, {"_or": [ {"MESSAGE": {"_contains": "slow"}}, {"MESSAGE": {"_contains": "timeout"}} ]}, {"_not": {"MESSAGE": {"_contains": "heartbeat"}}} ]}} 10. Multiple OSDs with priority filtering: {"where": {"_and": [ {"_or": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@12"}}, {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@13"}}, {"_SYSTEMD_UNIT": {"_contains": "ceph-osd@14"}} ]}, {"PRIORITY": {"_in": [0, 1, 2, 3, 4]}} ]}} PRIORITY LEVELS (syslog standard): • 0: Emergency (system unusable) • 1: Alert (immediate action required) • 2: Critical (critical conditions) • 3: Error (error conditions) • 4: Warning (warning conditions) • 5: Notice (normal but significant) • 6: Info (informational messages) • 7: Debug (debug-level messages) NESTED LOGIC SUPPORT: Use unlimited nesting with _and/_or/_not for complex conditions. TIME CONTROL: • hours_back: number of hours to search back (default: 1) • start_timestamp/end_timestamp: explicit Unix timestamps OUTPUT: Logs + debug info showing exact query sent to VictoriaLogs""", "inputSchema": { "type": "object", "properties": { "where": { "type": "object", "description": "VictoriaLogs JSON where clause (see examples above)", }, "_search": { "type": "string", "default": "", "description": "Full-text search string (optional) - searches within message content", }, "limit": { "type": "integer", "default": 1000, "description": "Maximum number of logs to return", }, "after": { "type": "integer", "default": 0, "description": "Offset for pagination (number of logs to skip)", }, "hours_back": { "type": "integer", "default": 1, "description": "Hours to search back from now (ignored if timestamps provided)", }, "start_timestamp": { "type": "integer", "description": "Unix timestamp start (optional)", }, "end_timestamp": { "type": "integer", "description": "Unix timestamp end (optional)", }, }, "required": ["where"], }, }, { "name": "croit_log_check", "description": """Check specific log conditions instantly (non-blocking snapshot). USE CASES: • Quick health checks: "Are there any OSD failures right now?" • Validation after operations: "Check for errors after pool creation" • Threshold monitoring: "Alert if more than 5 slow requests" CONDITIONS FORMAT: • Natural language conditions to check • Each condition is evaluated separately • Returns matches for each condition EXAMPLES: • conditions: ["OSD failures", "slow requests over 5s", "authentication errors"] • conditions: ["pool full warnings", "network timeouts"] • conditions: ["any ERROR level logs"] PARAMETERS: • threshold: How many logs must match to trigger alert (default: 5) • time_window: Check logs from last N seconds (default: 300 = 5 minutes) RETURNS: • List of triggered conditions with matching log counts • Sample of matching logs for each condition • Overall status (triggered/clear)""", "inputSchema": { "type": "object", "properties": { "conditions": { "type": "array", "items": {"type": "string"}, "description": "List of conditions to check in natural language", }, "threshold": { "type": "integer", "default": 5, "description": "Number of matching logs to trigger alert (default: 5)", }, "time_window": { "type": "integer", "default": 300, "description": "Time window in seconds to check (default: 300 = 5 min)", }, "api_token": { "type": "string", "description": "Optional API token for authentication", }, }, "required": ["conditions"], }, }, ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/croit/mcp-croit-ceph'

If you have feedback or need assistance with the MCP directory API, please join our Discord server