Skip to main content
Glama
croit
by croit
ARCHITECTURE.log-search-execution.md10.2 kB
# Log Search Execution ## Overview The log search execution workflow processes natural language queries through intent parsing, VictoriaLogs query execution, and intelligent summarization. It supports dual-path execution (WebSocket/HTTP) with automatic fallback. **Module**: croit_log_tools.py **Functions**: `handle_log_search()`, `_execute_croit_websocket()`, `_execute_croit_http_export()` **Duration**: 500ms - 10s depending on query scope ## Purpose Executes log searches with: - Natural language to structured query conversion - Direct VictoriaLogs JSON query support - Dual-path execution (WebSocket preferred, HTTP fallback) - Response summarization and optimization - Critical event extraction ## Execution Paths ### Path Selection Logic ``` User Query ↓ Is Direct JSON Query? ├── Yes → Skip intent parsing └── No → Parse with LogSearchIntentParser ↓ Execution Strategy Selection ├── Small query (< 1000 logs expected) → WebSocket └── Large query (> 1000 logs) → HTTP Export ↓ Try WebSocket ├── Success → Return results └── Failure → Fall back to HTTP Export ``` ## Complete Workflow ### Phase 1: Intent Parsing (Optional) **Condition**: Query is natural language, not JSON **Method**: `LogSearchIntentParser.parse(search_intent)` ``` Input: "Find OSD errors in the last hour" ↓ Pattern Detection ├── Detect "OSD" → services: ["ceph-osd"] ├── Detect "errors" → levels: ["ERROR"] └── Detect "last hour" → time_range ↓ Service Translation └── "OSD" → "ceph-osd" ↓ Output: { "type": "query", "services": ["ceph-osd"], "levels": ["ERROR"], "keywords": ["failed", "error"], "time_range": {"start": "...", "end": "..."} } ``` ### Phase 2: Query Building **Two paths**: #### A. From Intent (Natural Language) **Method**: `LogsQLBuilder.build(intent)` ``` intent → LogsQL string Example: _time:[2024-01-01T00:00:00Z, 2024-01-01T01:00:00Z] AND service:(ceph-osd) AND level:(ERROR) AND _msg:"failed" ``` #### B. Direct JSON (Advanced) **User provides VictoriaLogs JSON directly**: ```json { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-osd"}}, {"PRIORITY": {"_lte": 3}} ] }, "hours_back": 1, "limit": 1000 } ``` ### Phase 3: WebSocket Execution **Method**: `_execute_croit_websocket()` ``` Step 1: Connection Establishment ├── Connect to wss://{host}/api/log/websocket ├── SSL verification based on protocol └── Connection timeout: 30s Step 2: Binary Authentication ├── Send API token as binary message └── Wait for acknowledgment Step 3: Query Transmission ├── Convert query to JSON ├── Add current timestamp context ├── Send as text message Step 4: Control Message Processing ├── "empty" → No logs found ├── "too_wide" → Time range too large ├── "hits: N" → Result count └── "error: msg" → Error occurred Step 5: Log Entry Streaming ├── Receive JSON log entries ├── Parse and accumulate ├── Track progress └── Handle connection drops Step 6: Completion └── Connection closes → Return accumulated logs ``` **Error Conditions**: - Connection timeout → Fall back to HTTP - Authentication failure → Raise error - "too_wide" control message → Suggest narrower range - WebSocket error → Fall back to HTTP ### Phase 4: HTTP Export Execution (Fallback) **Method**: `_execute_croit_http_export()` ``` Step 1: Build Export Request ├── POST to /api/log/export ├── Body: VictoriaLogs JSON query └── Headers: Authorization, Accept Step 2: Download ZIP Archive ├── Response: application/zip ├── Read into memory └── Size can be 1MB - 100MB+ Step 3: Extract Logs ├── Open ZIP in memory ├── Read logs.json file ├── Parse JSON array └── Extract log entries Step 4: Return Results └── Same format as WebSocket path ``` **Advantages over WebSocket**: - Handles large result sets - Reliable for bulk operations - No streaming complexity **Disadvantages**: - Slower (2-10s vs 500ms-2s) - Higher memory usage - No progress updates ### Phase 5: Response Processing #### Server Auto-Discovery **Method**: `ServerIDDetector.detect_servers(logs)` ``` Scan logs for CROIT_SERVER_ID field ↓ Group by server ID ↓ Extract: ├── Server ID ├── Hostnames ├── Log counts ├── Activity percentages └── Service distribution ``` #### Transport Analysis **Method**: `LogTransportAnalyzer.analyze(logs)` ``` Scan logs for _TRANSPORT field ↓ Count by transport type: ├── kernel: Direct kernel messages ├── syslog: Syslog-forwarded └── journal: Systemd journal ↓ Calculate percentages and provide samples ``` #### Summarization **Method**: `LogSummaryEngine.generate_summary(logs)` ``` Step 1: Priority Breakdown └── Count by PRIORITY field (0-7) Step 2: Service Breakdown └── Count by _SYSTEMD_UNIT field Step 3: Critical Event Extraction ├── Score each log entry │ ├── Priority weight (ERROR=-10, WARN=-5) │ ├── Keyword penalties (fail=-3, timeout=-2) │ └── Sort by score (lower = more critical) └── Extract top N events Step 4: Trend Analysis ├── Group logs by hour ├── Identify peak periods └── Find busiest services Step 5: Recommendations ├── Check for patterns └── Generate actionable guidance ``` #### Response Optimization **Method**: `CroitLogSearchClient.optimize_response_size()` ``` If optimize_response=True: ├── Truncate to max_log_entries (default: 50) ├── Shorten messages to max_message_length (default: 150) ├── Prioritize critical events └── Add truncation metadata ``` ### Phase 6: Result Assembly **Complete Response Structure**: ```python { "logs": [...], # Log entries (possibly truncated) "total_count": 1247, "returned_count": 50, "summary": { "priority_breakdown": {...}, "service_breakdown": {...}, "critical_events": [...], "trends": {...}, "recommendations": [...] }, "server_info": {...}, # Auto-discovered servers "transport_analysis": {...}, # If kernel logs requested "query_info": { "execution_method": "websocket|http_export", "execution_time_ms": 1234, "cache_hit": false }, "_optimization_applied": true, # If truncated "_truncation_metadata": {...} # If truncated } ``` ## Query Examples ### Natural Language Queries **OSD Issues**: ``` Input: "Find OSD failures in the last 24 hours" → services: ["ceph-osd"] → levels: ["ERROR", "CRITICAL"] → keywords: ["failed", "down", "crashed"] → hours_back: 24 ``` **Slow Requests**: ``` Input: "Show slow requests in the past hour" → keywords: ["slow request", "blocked"] → services: ["ceph-osd", "ceph-mon", "ceph-mds"] → levels: ["WARN", "ERROR"] → hours_back: 1 ``` **Kernel Logs**: ``` Input: "Get kernel errors related to Ceph" → transport: "kernel" → levels: ["ERROR", "WARNING"] → keywords: ["ceph"] ``` ### Direct JSON Queries **Monitor Logs on Server 1**: ```json { "where": { "_and": [ {"_SYSTEMD_UNIT": {"_contains": "ceph-mon"}}, {"CROIT_SERVER_ID": {"_eq": "1"}}, {"PRIORITY": {"_lte": 6}} ] }, "hours_back": 24 } ``` **Kernel Errors with Text Search**: ```json { "where": { "_and": [ {"_TRANSPORT": {"_eq": "kernel"}}, {"PRIORITY": {"_lte": 4}} ] }, "_search": "error", "hours_back": 48 } ``` ## Caching Strategy **Cache Key Generation**: ```python query_hash = hashlib.md5(json.dumps(query, sort_keys=True).encode()).hexdigest() ``` **Cache Lookup**: ```python if query_hash in cache and cache_age < 5 minutes: return cached_result ``` **Cache Population**: ```python cache[query_hash] = { "result": response, "timestamp": datetime.now() } ``` **Cache Invalidation**: - Time-based: 5 minute TTL - No manual invalidation - No size limits (in-memory only) ## Performance Characteristics **WebSocket Path**: - Connection: 200-500ms - Streaming: 50-200ms per 100 logs - Small query (< 100 logs): 500-1000ms total - Medium query (100-1000 logs): 1-3s total - Large query (> 1000 logs): Falls back to HTTP **HTTP Export Path**: - Request: 1-5s - ZIP download: 1-10s (size dependent) - Extraction: 0.5-2s - **Total**: 2-17s **Summarization**: - 100 logs: <50ms - 1,000 logs: 100-300ms - 10,000 logs: 1-3s ## Error Handling **Query Errors**: - Invalid syntax → Parse error to LLM - Invalid field names → VictoriaLogs error to LLM - Too wide time range → "too_wide" message to LLM **Connection Errors**: - WebSocket timeout → Fall back to HTTP - HTTP failure → Raise error to LLM - Authentication failure → Auth error to LLM **Data Processing Errors**: - Invalid JSON → Logged, empty result - Missing fields → Use defaults - Corrupt ZIP → Raise error ## Design Patterns **Strategy Pattern**: WebSocket vs HTTP execution **Chain of Responsibility**: Fallback mechanism **Template Method**: Common processing structure **Builder Pattern**: Query construction **Observer Pattern**: Streaming log reception ## Extension Points 1. **New Query Patterns**: Extend LogSearchIntentParser.PATTERNS 2. **Custom Summarization**: Modify LogSummaryEngine 3. **Additional Transports**: Extend transport analysis 4. **Caching Strategies**: Replace in-memory cache 5. **Execution Paths**: Add new execution methods ## Relevance Read this document when: - Understanding log search flow - Debugging query issues - Implementing new search patterns - Optimizing log search performance - Troubleshooting WebSocket issues - Understanding summarization logic ## Related Documentation - [ARCHITECTURE.log-search-system.md](ARCHITECTURE.log-search-system.md) - System overview - [ARCHITECTURE.intent-parsing.md](ARCHITECTURE.intent-parsing.md) - Intent parsing - [ARCHITECTURE.victorialogs-websocket-protocol.md](ARCHITECTURE.victorialogs-websocket-protocol.md) - WebSocket protocol - [ARCHITECTURE.log-summary-engine.md](ARCHITECTURE.log-summary-engine.md) - Summarization

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/croit/mcp-croit-ceph'

If you have feedback or need assistance with the MCP directory API, please join our Discord server