Skip to main content
Glama

Observe MCP Server

by rustomax
observe_server.py58.4 kB
#!/usr/bin/env python3 """ Observe MCP Server A Model Context Protocol server that provides access to Observe API functionality using organized modules for better maintainability and reusability. """ import os import sys from typing import Dict, Any, Optional, List, Union, Tuple try: from typing_extensions import TypedDict except ImportError: from typing import TypedDict # Type definitions for better type safety class ErrorResponse(TypedDict): error: bool message: str try: from dotenv import load_dotenv pass except Exception as e: pass raise # Load environment variables from .env file first load_dotenv() # Initialize OpenTelemetry instrumentation early from src.telemetry import initialize_telemetry, initialize_metrics from src.telemetry.decorators import trace_mcp_tool, trace_observe_api_call, trace_database_operation from src.telemetry.utils import add_mcp_context, add_observe_context, add_database_context telemetry_enabled = initialize_telemetry() # Initialize metrics if telemetry is enabled if telemetry_enabled: metrics_enabled = initialize_metrics() else: metrics_enabled = False # Import Gemini-powered document search from src.observe.gemini_search import search_docs_gemini as search_docs # Import organized Observe API modules from src.observe import ( execute_opal_query as observe_execute_opal_query ) # Import organized auth modules from src.auth import ( create_authenticated_mcp, requires_scopes, initialize_auth_middleware, setup_auth_provider ) # Import standardized logging from src.logging import ( get_logger, set_session_context, log_session_context, session_logger, semantic_logger, opal_logger ) from fastmcp import Context # Create FastMCP instance with authentication mcp = create_authenticated_mcp(server_name="observe-community") # Initialize auth middleware for statistics and logging auth_provider = setup_auth_provider() initialize_auth_middleware(auth_provider) # Configure FastAPI instrumentation if telemetry is enabled if telemetry_enabled: from src.telemetry.config import instrument_fastapi_app # Note: FastMCP wraps FastAPI, so we'll instrument the underlying app if hasattr(mcp, 'app'): instrument_fastapi_app(mcp.app) # Input validation utility to prevent DoS via large payloads (H-INPUT-2) def validate_input_size(value: Optional[str], param_name: str, max_bytes: int) -> None: """ Validate input parameter size to prevent DoS attacks. Args: value: Input string to validate param_name: Parameter name for error messages max_bytes: Maximum allowed size in bytes Raises: ValueError: If input exceeds maximum size """ if value is None: return size_bytes = len(value.encode('utf-8')) if size_bytes > max_bytes: max_kb = max_bytes / 1024 actual_kb = size_bytes / 1024 raise ValueError( f"{param_name} exceeds maximum size limit. " f"Maximum: {max_kb:.1f}KB, Actual: {actual_kb:.1f}KB. " f"Please reduce the size of your input." ) # Import OPAL query validation from shared module from src.observe.opal_validation import validate_opal_query_structure # Legacy comment for reference: # OPAL query validation utility to prevent injection and catch errors early (H-INPUT-1) # Function has been moved to src/observe/opal_validation.py to avoid circular imports @mcp.tool() @requires_scopes(['admin', 'write', 'read']) @trace_mcp_tool(tool_name="execute_opal_query", record_args=True, record_result=False) async def execute_opal_query(ctx: Context, query: str, dataset_id: str = None, primary_dataset_id: str = None, secondary_dataset_ids: Optional[str] = None, dataset_aliases: Optional[str] = None, time_range: Optional[str] = "1h", start_time: Optional[str] = None, end_time: Optional[str] = None, format: Optional[str] = "csv", timeout: Optional[float] = None) -> str: """ Execute OPAL (Observe Processing and Analytics Language) queries on datasets and metrics in Observe. ## Overview Observe stores three fundamental types of observability data, each requiring different OPAL approaches: 1. **Events** (Logs) - Point-in-time occurrences with no duration 2. **Intervals** (Spans, Resources) - Time-bounded entities with start/end times 3. **Metrics** - Pre-aggregated measurements collected at regular intervals **Critical Rule:** Always use `discover_context()` BEFORE writing queries to get exact field names, metric types, and available dimensions. --- ## Understanding Data Types ### Events (Logs) **Characteristics:** - Point-in-time entries (single timestamp) - High volume, text-heavy - Examples: Application logs, system logs, audit logs **Interface:** `log` **Best for:** Searching text, error analysis, event counting **Query approach:** Direct filtering and aggregation with `statsby` --- ### Intervals (Spans, Resources) **Characteristics:** - Time-bounded with start and end timestamps - Have duration - Examples: Distributed traces (spans), Kubernetes pods, database connections **Interface:** `otel_span`, `resource` **Best for:** Latency analysis, request tracing, resource lifecycle tracking **Query approach:** Calculate duration, filter by time ranges, percentile analysis --- ### Metrics **Characteristics:** - Pre-aggregated data points - Collected at regular intervals (e.g., every 5 minutes) - Efficient for time-series analysis - Types: `gauge`, `counter`, `delta`, `tdigest` **Best for:** Performance dashboards, trending over time, efficient aggregations **Query approach:** MUST use `align` verb to work with time buckets --- ## OPAL Patterns by Data Type ### Pattern 1: Events (Logs) **Use `filter` → `make_col` → `statsby`** ```opal # Count errors by service filter body ~ "error" | make_col svc:string(resource_attributes."service.name") | statsby error_count:count(), group_by(svc) | sort desc(error_count) | limit 20 # Search with multiple conditions filter contains(body, "error") or contains(body, "exception") | filter string(resource_attributes."k8s.namespace.name") = "production" | statsby count(), group_by(string(resource_attributes."service.name")) ``` **Key points:** - Use `statsby` for aggregations (NOT `aggregate`) - Quote nested field names with dots: `resource_attributes."k8s.namespace.name"` - Results: One row per group across entire time range --- ### Pattern 2: Intervals (Spans) **Use `filter` → `make_col` (calculate duration) → `statsby`** ```opal # Service latency percentiles make_col svc:service_name, dur_ms:float64(duration)/1000000 | statsby p50:percentile(dur_ms, 0.50), p95:percentile(dur_ms, 0.95), p99:percentile(dur_ms, 0.99), group_by(svc) | sort desc(p95) | limit 20 # Count requests by service make_col svc:service_name | statsby request_count:count(), group_by(svc) | sort desc(request_count) # Error analysis filter error = true | make_col svc:service_name, error_msg:string(error_message) | statsby error_count:count(), group_by(svc, error_msg) | sort desc(error_count) ``` **Key points:** - Duration is typically in nanoseconds - divide by 1,000,000 for milliseconds - Use `percentile()` function for latency analysis - `statsby` aggregates across the entire time range - Results: Summary statistics, one row per group --- ### Pattern 3: Metrics (Pre-aggregated Data) **CRITICAL: Metrics require `align` verb** Metrics queries produce different output based on binning strategy: #### Option A: Summary Output (One Row Per Service) **Use `align options(bins: 1)` for single summary across time range** ```opal # Total request count per service align options(bins: 1), rate:sum(m("span_call_count_5m")) aggregate total_requests:sum(rate), group_by(service_name) fill total_requests:0 # Average error rate per service align options(bins: 1), errors:sum(m("span_error_count_5m")) aggregate total_errors:sum(errors), group_by(service_name) filter total_errors > 0 ``` **Result:** One row per service (summary across entire time window) **Note:** No pipe `|` between `align` and `aggregate` when using `options(bins: 1)` #### Option B: Time-Series Output (Multiple Rows Per Service) **Use `align 5m` or `align` (auto bins) for trending over time** ```opal # Request rate trending over time align 5m, rate:sum(m("span_call_count_5m")) | aggregate total_requests:sum(rate), group_by(service_name) | sort desc(total_requests) ``` **Result:** Multiple rows per service (one per time bucket) **Output includes:** - `_c_bucket` - Time bucket identifier - `valid_from`, `valid_to` - Time bucket boundaries - One row per (service, time_bucket) combination --- ## Working with Metrics ### Metric Types and Functions **1. Counter/Gauge/Delta Metrics** Use `m()` function: ```opal # Summary (single row per service) align options(bins: 1), total:sum(m("span_call_count_5m")) aggregate total_calls:sum(total), group_by(service_name) # Time-series (multiple rows) align total:sum(m("span_call_count_5m")) | aggregate total_calls:sum(total), group_by(service_name) ``` **2. TDigest Metrics (Percentile/Latency)** Use `m_tdigest()` with special combine/quantile pattern: ```opal # Summary (single row per service) align options(bins: 1), combined:tdigest_combine(m_tdigest("span_duration_tdigest_5m")) aggregate p50:tdigest_quantile(tdigest_combine(combined), 0.50), p95:tdigest_quantile(tdigest_combine(combined), 0.95), p99:tdigest_quantile(tdigest_combine(combined), 0.99), group_by(service_name) make_col p50_ms:p50/1000000, p95_ms:p95/1000000, p99_ms:p99/1000000 ``` **Critical pattern for tdigest:** 1. `align` → `tdigest_combine(m_tdigest("metric"))` 2. `aggregate` → `tdigest_quantile(tdigest_combine(column), percentile)` 3. Note: `tdigest_combine` appears TWICE - once in align, once nested in aggregate **How to know which function to use:** ```bash # Check metric type in discover_context() output: # - type: "tdigest" → use m_tdigest() # - type: "gauge", "counter", "delta" → use m() ``` --- ## Time Bucketing and Aggregation ### Understanding align options(bins: N) | Pattern | Output | Use Case | |---------|--------|----------| | `align options(bins: 1)` | One row per group | Summary reports, totals, single percentiles | | `align 5m` | Many rows per group | Time-series charts, trending | | `align 1h` | Many rows per group | Hourly trends | | `align` (default) | Auto-sized buckets | Dashboards (Observe picks optimal size) | **Important syntax note:** When using `options(bins: 1)`, do NOT use pipe `|` between `align` and `aggregate` ### Decision Tree ``` ┌─────────────────────────────────────┐ │ What do you need? │ └─────────────────────────────────────┘ │ ├─ Single summary per service (e.g., "total requests in 24h") │ └─> Use: Metrics + align options(bins: 1) │ Result: 1 row per service │ ├─ Trends over time (e.g., "requests per hour") │ └─> Use: Metrics + align 5m + aggregate │ Result: Multiple rows per service (time-series) │ └─ Raw detailed analysis (no metrics available) └─> Use: Raw dataset + statsby (no align) Result: 1 row per group ``` --- ## Complete Examples ### Example 1: RED Methodology (Summary) **Goal:** Get Rate, Error, Duration summary for all services over 24 hours ```opal # RATE - Total requests per service align options(bins: 1), rate:sum(m("span_call_count_5m")) aggregate total_requests:sum(rate), group_by(service_name) fill total_requests:0 # ERRORS - Total errors per service align options(bins: 1), errors:sum(m("span_error_count_5m")) aggregate total_errors:sum(errors), group_by(service_name) filter total_errors > 0 # DURATION - Latency percentiles per service align options(bins: 1), combined:tdigest_combine(m_tdigest("span_sn_service_node_duration_tdigest_5m")) aggregate p50:tdigest_quantile(tdigest_combine(combined), 0.50), p95:tdigest_quantile(tdigest_combine(combined), 0.95), p99:tdigest_quantile(tdigest_combine(combined), 0.99), group_by(service_name) make_col p50_ms:p50/1000000, p95_ms:p95/1000000, p99_ms:p99/1000000 ``` **Output:** One row per service with summary statistics **Important:** No pipe `|` between verbs when using `options(bins: 1)` --- ### Example 2: RED Methodology (Alternative - Raw Spans) **When to use:** Metrics don't exist, or you need span-level details ```opal # RATE - Request count make_col svc:service_name | statsby request_count:count(), group_by(svc) | sort desc(request_count) # ERRORS - Error count filter error = true | make_col svc:service_name | statsby error_count:count(), group_by(svc) | sort desc(error_count) # DURATION - Latency percentiles make_col svc:service_name, dur_ms:float64(duration)/1000000 | statsby p50:percentile(dur_ms, 0.50), p95:percentile(dur_ms, 0.95), p99:percentile(dur_ms, 0.99), group_by(svc) | sort desc(p95) ``` **Note:** Slower than metrics approach but works on raw data --- ### Example 3: Top Errors with Details ```opal # Find top errors from spans with full details filter error = true | make_col svc:service_name, error_msg:string(error_message), span:span_name, status:string(status_code) | statsby error_count:count(), group_by(svc, error_msg, span) | sort desc(error_count) | limit 20 ``` --- ### Example 4: Time-Series for Dashboard ```opal # Request rate over time (for charting) align 5m, rate:sum(m("span_call_count_5m")) | aggregate requests_per_5min:sum(rate), group_by(service_name) # Error rate over time align 5m, errors:sum(m("span_error_count_5m")) | aggregate errors_per_5min:sum(errors), group_by(service_name) | filter errors_per_5min > 0 ``` **Result:** Time-series data suitable for line charts --- ## Common Patterns ### Pattern: Conditional Counting (No count_if!) OPAL doesn't have `count_if()`. Use this pattern instead: ```opal # Count errors vs total requests make_col svc:service_name, is_error:if(error = true, 1, 0) | statsby total:count(), error_count:sum(is_error), group_by(svc) | make_col error_rate:float64(error_count)/float64(total) | sort desc(error_rate) ``` --- ### Pattern: Nested Field Access ```opal # Fields with dots MUST be quoted make_col namespace:string(resource_attributes."k8s.namespace.name"), service:string(resource_attributes."service.name") | filter namespace = "production" ``` **Rule:** `object."field.with.dots"` - quote only the field name, not the whole path --- ### Pattern: Time Unit Conversion ```opal # Nanoseconds to milliseconds make_col dur_ms:duration/1000000 # Nanoseconds to seconds make_col dur_sec:duration/1000000000 # Check field samples in discover_context() to identify units! # 19 digits (1760201545280843522) = nanoseconds # 13 digits (1758543367916) = milliseconds ``` --- ## Troubleshooting ### Issue: "Same service appears multiple times!" **Cause:** Using metrics with `align` produces time-series data **Solution:** - For summary: Use `align options(bins: 1)` - For trends: This is correct behavior - each row is a time bucket --- ### Issue: "Only getting one service in results" **Diagnosis:** 1. Check if metric has data for other services: `discover_context(metric_name="...")` 2. Verify dimensions available in metric 3. Try querying raw dataset to confirm other services exist **Solution:** You might be using a metric that only captures one service, or need to filter differently --- ### Issue: "Field not found" error **Cause:** - Field name spelled incorrectly (case-sensitive!) - Missing quotes around nested fields with dots - Using wrong dataset **Solution:** 1. Run `discover_context(dataset_id="...")` to get exact field names 2. Copy field names exactly as shown 3. Quote nested fields: `resource_attributes."k8s.namespace.name"` --- ### Issue: "Percentiles look wrong" **Check:** 1. **Time units** - Duration often in nanoseconds (divide by 1M for ms) 2. **TDigest pattern** - Must use `tdigest_combine` twice (align + aggregate) 3. **Correct syntax:** ```opal aggregate p95:tdigest_quantile(tdigest_combine(combined), 0.95) ``` NOT: ```opal aggregate agg:tdigest_combine(combined) | make_col p95:tdigest_quantile(agg, 0.95) ``` --- ### Issue: "Unknown function" error **Common mistakes:** - Using `count_if()` - doesn't exist, use `if()` + `sum()` pattern - Using `pick` - doesn't exist, use `make_col` - Using SQL syntax like `CASE/WHEN` - use `if(condition, true_val, false_val)` **Solution:** See OPAL documentation or use `get_relevant_docs()` to find correct syntax --- ### Issue: Metric query fails with "column has to be aggregated or grouped" **Cause:** Trying to use a column from `align` directly in `aggregate` without re-combining **Solution for tdigest:** ```opal # WRONG align combined:tdigest_combine(m_tdigest("metric")) | aggregate p95:tdigest_quantile(combined, 0.95) ❌ # CORRECT align combined:tdigest_combine(m_tdigest("metric")) | aggregate p95:tdigest_quantile(tdigest_combine(combined), 0.95) ✓ ``` --- ## Parameters Reference ### Required Parameters **`query`** (string) - The OPAL query to execute - Must be valid OPAL syntax - See patterns above for examples **`primary_dataset_id`** (string) - Dataset ID from `discover_context()` - Use `dataset_id` for backward compatibility - Get from Phase 2 of discovery (detailed lookup) ### Optional Parameters **`time_range`** (string) - Relative time range: `"1h"`, `"24h"`, `"7d"`, `"30d"` - Defaults to `"1h"` - Alternative: Use `start_time` and `end_time` **`start_time`** / **`end_time`** (string) - ISO format: `"2024-01-20T16:20:00Z"` - For absolute time ranges - Overrides `time_range` if specified **`secondary_dataset_ids`** (string) - JSON array for joins: `'["44508111"]'` - Used with `dataset_aliases` **`dataset_aliases`** (string) - JSON object for joins: `'{"volumes": "44508111"}'` - Use with `@alias` in join syntax **`format`** (string) - `"csv"` (default) or `"ndjson"` - CSV limited to first 1000 rows **`timeout`** (number) - Seconds to wait for query completion - Default: 30s --- ## Best Practices Summary ### 1. Always discover_context() first ``` Phase 1: Search for datasets/metrics Phase 2: Get detailed schema with field names and types Phase 3: Write query using exact field names ``` ### 2. Choose the right approach | Need | Approach | Why | |------|----------|-----| | Summary stats | Metrics + `options(bins: 1)` | Fastest, one row per group | | Time trends | Metrics + `align 5m` | Efficient time-series | | Raw analysis | Dataset + `statsby` | Full details, slower | ### 3. Know your data type - **Events (logs):** `filter` → `statsby` - **Intervals (spans):** Calculate duration → `statsby` - **Metrics:** `align` → `aggregate` (REQUIRED) ### 4. Metric type matters - **Counter/Gauge:** `m("metric_name")` - **TDigest:** `m_tdigest("metric_name")` + double combine pattern ### 5. Field naming - Copy exactly from `discover_context()` - Quote nested fields with dots: `object."field.with.dots"` - Case-sensitive! --- ## Additional Resources - **Unknown syntax:** `get_relevant_docs("opal <keyword>")` - **Error debugging:** Check error message keywords in docs - **Examples:** Search docs for specific use cases (e.g., "OPAL join syntax") --- **Remember:** When in doubt about OPAL syntax or seeing unexpected results, use `get_relevant_docs()` to search official Observe documentation. """ import json # Validate input sizes to prevent DoS attacks (H-INPUT-2) validate_input_size(query, "query", 10 * 1024) # 10KB max for OPAL queries validate_input_size(dataset_id, "dataset_id", 1024) # 1KB max validate_input_size(primary_dataset_id, "primary_dataset_id", 1024) # 1KB max validate_input_size(secondary_dataset_ids, "secondary_dataset_ids", 100 * 1024) # 100KB max for JSON validate_input_size(dataset_aliases, "dataset_aliases", 100 * 1024) # 100KB max for JSON # Validate OPAL query structure and apply auto-fix transformations (H-INPUT-1) validation_result = validate_opal_query_structure(query, time_range=time_range) if not validation_result.is_valid: return f"OPAL Query Validation Error: {validation_result.error_message}" # Use transformed query if auto-fixes were applied original_query = query if validation_result.transformed_query: opal_logger.info(f"Using auto-fixed query (applied {len(validation_result.transformations)} transformations)") query = validation_result.transformed_query # Log the OPAL query operation with sanitized query (truncated for security) query_preview = query[:100] + "..." if len(query) > 100 else query dataset_info = primary_dataset_id or dataset_id opal_logger.info(f"query execution | dataset:{dataset_info} | query:'{query_preview}' | time_range:{time_range}") # Parse JSON string parameters if provided parsed_secondary_dataset_ids = None parsed_dataset_aliases = None if secondary_dataset_ids: try: parsed_secondary_dataset_ids = json.loads(secondary_dataset_ids) except (json.JSONDecodeError, TypeError) as e: return f"Error parsing secondary_dataset_ids: {e}. Expected JSON array like ['44508111']" if dataset_aliases: try: parsed_dataset_aliases = json.loads(dataset_aliases) except (json.JSONDecodeError, TypeError) as e: return f"Error parsing dataset_aliases: {e}. Expected JSON object like {{\"volumes\": \"44508111\"}}" # Normalize time_range: accept bare numbers (assume hours) and convert days to hours # Examples: "24" -> "24h", "7d" -> "168h", "1.5" -> "1.5h" normalized_time_range = time_range if time_range: time_str = str(time_range).strip() try: # Handle days conversion: "7d" -> "168h" (7 * 24) if time_str.endswith('d'): days = float(time_str[:-1]) hours = days * 24 normalized_time_range = f"{hours}h" opal_logger.info(f"time_range normalization | original:'{time_range}' | normalized:'{normalized_time_range}' | reason:days_to_hours") # Handle bare numbers: "24" -> "24h" elif time_str and not any(time_str.endswith(unit) for unit in ['h', 'm', 's', 'w']): float(time_str) # Validate it's numeric normalized_time_range = f"{time_str}h" opal_logger.info(f"time_range normalization | original:'{time_range}' | normalized:'{normalized_time_range}' | reason:bare_number") except ValueError: # Not a valid number, keep as-is (might be a valid format or will error downstream) pass result = await observe_execute_opal_query( query=query, dataset_id=dataset_id, primary_dataset_id=primary_dataset_id, secondary_dataset_ids=parsed_secondary_dataset_ids, dataset_aliases=parsed_dataset_aliases, time_range=normalized_time_range, start_time=start_time, end_time=end_time, format=format, timeout=timeout ) # Append transformation feedback if auto-fixes were applied at this level # (Note: inner function may also apply transformations, which will have their own feedback) if validation_result.transformations: transformation_notes = "\n\n" + "="*60 + "\n" transformation_notes += "AUTO-FIX APPLIED - Query Transformations\n" transformation_notes += "="*60 + "\n\n" for i, transformation in enumerate(validation_result.transformations, 1): transformation_notes += f"{transformation}\n\n" transformation_notes += "The query above was automatically corrected and executed successfully.\n" transformation_notes += "Please use the corrected syntax in future queries.\n" transformation_notes += "="*60 result = result + transformation_notes return result @mcp.tool() @requires_scopes(['admin', 'read']) @trace_mcp_tool(tool_name="get_relevant_docs", record_args=True, record_result=False) async def get_relevant_docs(ctx: Context, query: str, n_results: int = 5) -> str: """ Search Observe documentation using Gemini Search for OPAL syntax and platform guidance. This tool uses Google's Gemini AI with search grounding to find relevant, up-to-date documentation from docs.observeinc.com about OPAL syntax, functions, features, and best practices. WHEN YOU MUST USE THIS TOOL ═══════════════════════════════════════════════════════════════════════════════════ MANDATORY: Call this tool if you receive ANY of these errors from execute_opal_query: • "field not found" → Search for field access syntax • "invalid syntax" → Search for the OPAL construct you're trying to use • "unknown function" → Search for function name and proper usage • "parse error" → Search for syntax of the operation that failed • Any other query execution failure → Search for error keywords RECOMMENDED: Call BEFORE attempting these complex operations: • Multi-dataset joins • Time bucketing or window functions • Advanced aggregations beyond statsby • Regex or pattern matching • Custom operators or functions you haven't used before ERROR RECOVERY WORKFLOW ───────────────────────────────────────────────────────────────────────────────── execute_opal_query() fails ↓ get_relevant_docs("error message keywords" or "feature name") ↓ Review official syntax from documentation ↓ Retry execute_opal_query() with corrected syntax SEARCH TIPS: ─────────────── • Use specific error keywords: "statsby syntax", "join datasets" • Include OPAL in your search: "OPAL filter operators" • Search for function names directly: "make_col examples" TYPICAL USE CASES ──────────────── - "OPAL filter syntax" → Learn filtering operators and patterns - "OPAL time functions" → Understand time manipulation functions - "kubernetes resource attributes" → Find available K8s fields - "statsby group_by" → Learn aggregation syntax - "OPAL join syntax" → Multi-dataset join patterns Args: query: Documentation search query describing what you need to learn n_results: Number of documents to return (default: 5, recommended: 3-10) Returns: Relevant documentation sections with: - Full document content - Source filename for reference - Relevance score indicating match quality Examples: # Learn OPAL syntax get_relevant_docs("OPAL filter syntax") get_relevant_docs("time range functions") # Find schema information get_relevant_docs("kubernetes resource attributes") get_relevant_docs("opentelemetry span fields") # Advanced features get_relevant_docs("OPAL join multiple datasets", n_results=3) get_relevant_docs("aggregation functions statsby") Performance: - Search time: 1-3 seconds (includes web search + AI processing) - Returns AI-curated documentation excerpts with citations - Rate limited to 400 requests per day (Gemini Tier 1 limit) """ # Validate input sizes to prevent DoS attacks (H-INPUT-2) validate_input_size(query, "query", 1024) # 1KB max for search queries try: # Import required modules import os from collections import defaultdict # Log the documentation search operation semantic_logger.info(f"docs search | query:'{query}' | n_results:{n_results}") chunk_results = await search_docs(query, n_results=max(n_results * 3, 15)) # Get more chunks to ensure we have enough from relevant docs if not chunk_results: return f"No relevant documents found for: '{query}'" # Group chunks by source document docs_by_source = defaultdict(list) for result in chunk_results: source = result.get("source", "") if source and source != "error": docs_by_source[source].append(result) # Calculate average score for each document doc_scores = {} for source, chunks in docs_by_source.items(): avg_score = sum(chunk.get("score", 0.0) for chunk in chunks) / len(chunks) doc_scores[source] = avg_score # Sort documents by average score and limit to requested number sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[:n_results] if not sorted_docs: return f"No valid documents found for: '{query}'" response = f"Found {len(sorted_docs)} relevant documents for: '{query}'\\n\\n" # Read and format each full document for i, (source, score) in enumerate(sorted_docs, 1): try: # Read the entire document file with open(source, 'r', encoding='utf-8') as f: document_content = f.read() # Get metadata from the first chunk of this source first_chunk = docs_by_source[source][0] title = first_chunk.get("title", os.path.basename(source).replace(".md", "").replace("_", " ").title()) source_filename = os.path.basename(source) response += f"### Document {i}: {title}\\n" response += f"Source: {source_filename}\\n" response += f"Relevance Score: {score:.2f}\\n\\n" response += f"{document_content}\\n\\n\\n" response += "----------------------------------------\\n\\n" except Exception as e: # Use the chunk text as fallback if we can't read the file chunks_text = "\\n\\n".join([chunk.get("text", "") for chunk in docs_by_source[source]]) title = os.path.basename(source).replace(".md", "").replace("_", " ").title() response += f"### Document {i}: {title}\\n" response += f"Source: {os.path.basename(source)}\\n" response += f"Relevance Score: {score:.2f}\\n" response += f"Note: Could not read the full document file. Showing available chunks.\\n\\n" response += f"{chunks_text}\\n\\n\\n" response += "----------------------------------------\\n\\n" # Log successful documentation search semantic_logger.info(f"docs search complete | found:{len(sorted_docs)} documents | chunks:{len(chunk_results)}") return response except Exception as e: return f"Error retrieving relevant documents: {str(e)}. Make sure GEMINI_API_KEY is set in your environment." @mcp.tool() @requires_scopes(['admin', 'read']) @trace_mcp_tool(tool_name="discover_context", record_args=True, record_result=False) async def discover_context( ctx: Context, query: str = "", dataset_id: Optional[str] = None, dataset_name: Optional[str] = None, metric_name: Optional[str] = None, result_type: Optional[str] = None, max_results: int = 20, business_category_filter: Optional[str] = None, technical_category_filter: Optional[str] = None, interface_filter: Optional[str] = None ) -> str: """ Unified discovery tool for datasets and metrics in the Observe platform. ⚠️ CRITICAL: 2-PHASE WORKFLOW REQUIRED ⚠️ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Phase 1: SEARCH MODE (lightweight browsing) discover_context("error service") → Returns names, IDs, purposes NO field names, NO dimensions shown - context efficient! Phase 2: DETAIL MODE (complete schema) - ⚠️ REQUIRED BEFORE QUERIES discover_context(dataset_id="...") → ALL fields with types and samples discover_context(metric_name="...") → ALL dimensions with cardinality This phase is MANDATORY before writing any queries! YOU MUST COMPLETE PHASE 2 BEFORE CALLING execute_opal_query()! ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ WHY 2 PHASES? • Context efficiency: Browse 20+ options without bloat • Natural workflow: Search → Select → Detail → Query • Complete information: Get full schemas only when needed • Prevents errors: Field names verified before query construction WHAT YOU GET IN EACH MODE: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ SEARCH MODE (Phase 1): 📊 Datasets: Names, IDs, categories, purposes 📈 Metrics: Names, IDs, categories, purposes ⛔ NOT included: Fields, dimensions, schemas (use Phase 2!) DETAIL MODE (Phase 2): 📊 Datasets: Complete field list with types, samples, nested paths 📈 Metrics: ALL dimensions with cardinality, value ranges, examples ✅ Everything needed to write queries correctly Args: query: Search term (e.g., "error service", "kubernetes logs", "cpu usage") dataset_id: Exact dataset ID for detailed lookup dataset_name: Exact dataset name for lookup metric_name: Exact metric name for detailed lookup result_type: Filter results - "dataset", "metric", or None (both) max_results: Maximum results to return (default: 20) business_category_filter: Infrastructure, Application, Database, etc. technical_category_filter: Logs, Metrics, Traces, Events, etc. interface_filter: log, metric, otel_span, etc. Returns: Formatted results with clear sections for datasets and metrics Examples: # PHASE 1: Search mode (browse available options) discover_context("error service") # See what exists discover_context("latency", result_type="metric") # Only metrics discover_context("kubernetes", business_category_filter="Infrastructure") # PHASE 2: Detail mode (REQUIRED before queries - get complete schema) discover_context(dataset_id="42161740") # ALL fields for this dataset discover_context(metric_name="span_error_count_5m") # ALL dimensions for this metric # Typical workflow: # 1. discover_context("errors") → browse options # 2. discover_context(dataset_id="42161740") → get field list # 3. execute_opal_query(...) → write query with correct fields Performance: - Search queries: 200-500ms, shows 10-20 results - Detail lookups: <100ms, shows complete schemas """ # Validate input sizes to prevent DoS attacks (H-INPUT-2) validate_input_size(query, "query", 1024) validate_input_size(dataset_id, "dataset_id", 1024) validate_input_size(dataset_name, "dataset_name", 1024) validate_input_size(metric_name, "metric_name", 1024) validate_input_size(result_type, "result_type", 1024) validate_input_size(business_category_filter, "business_category_filter", 1024) validate_input_size(technical_category_filter, "technical_category_filter", 1024) validate_input_size(interface_filter, "interface_filter", 1024) try: import asyncpg import json from typing import List, Dict, Any # Log the discovery operation semantic_logger.info(f"unified discovery | query:'{query}' | dataset_id:{dataset_id} | dataset_name:{dataset_name} | metric_name:{metric_name} | result_type:{result_type} | max_results:{max_results}") # Database connection db_password = os.getenv('SEMANTIC_GRAPH_PASSWORD') if not db_password: raise ValueError("SEMANTIC_GRAPH_PASSWORD environment variable must be set") db_config = { 'host': os.getenv('POSTGRES_HOST', 'localhost'), 'port': int(os.getenv('POSTGRES_PORT', '5432')), 'database': os.getenv('POSTGRES_DB', 'semantic_graph'), 'user': os.getenv('POSTGRES_USER', 'semantic_graph'), 'password': db_password } # Validate and normalize parameters max_results = min(max(1, max_results), 50) should_fetch_datasets = (result_type is None or result_type == "dataset") should_fetch_metrics = (result_type is None or result_type == "metric") # Connect to database conn = await asyncpg.connect(**db_config) try: dataset_results = [] metric_results = [] is_detail_mode = False # EXACT LOOKUPS (Detail Mode) if dataset_id is not None: is_detail_mode = True semantic_logger.info(f"exact dataset lookup | dataset_id:{dataset_id}") dataset_results = await conn.fetch(""" SELECT di.dataset_id::TEXT, di.dataset_name::TEXT, di.inferred_purpose, di.typical_usage, di.business_categories, di.technical_category, di.interface_types, di.key_fields, di.query_patterns, di.nested_field_paths, di.nested_field_analysis, di.common_use_cases, di.data_frequency, 1.0::REAL as rank FROM datasets_intelligence di WHERE di.dataset_id::TEXT = $1 AND di.excluded = FALSE """, dataset_id) elif dataset_name is not None: is_detail_mode = True semantic_logger.info(f"exact dataset lookup | dataset_name:{dataset_name}") dataset_results = await conn.fetch(""" SELECT di.dataset_id::TEXT, di.dataset_name::TEXT, di.inferred_purpose, di.typical_usage, di.business_categories, di.technical_category, di.interface_types, di.key_fields, di.query_patterns, di.nested_field_paths, di.nested_field_analysis, di.common_use_cases, di.data_frequency, 1.0::REAL as rank FROM datasets_intelligence di WHERE di.dataset_name = $1 AND di.excluded = FALSE """, dataset_name) elif metric_name is not None: is_detail_mode = True semantic_logger.info(f"exact metric lookup | metric_name:{metric_name}") metric_results = await conn.fetch(""" SELECT mi.dataset_id::TEXT, mi.metric_name, mi.dataset_name, mi.metric_type, mi.description, mi.common_dimensions, mi.sample_dimensions, mi.value_type, mi.value_range, mi.data_frequency, mi.last_seen, mi.inferred_purpose, mi.typical_usage, mi.business_categories, mi.technical_category, mi.query_patterns, mi.common_fields, mi.nested_field_paths, 1.0::REAL as rank FROM metrics_intelligence mi WHERE mi.metric_name = $1 AND mi.excluded = FALSE LIMIT 1 """, metric_name) # SEARCH MODE (query provided) elif query: # Search datasets if requested if should_fetch_datasets: search_terms = query.lower().split() search_conditions = [] params = [] param_idx = 1 for term in search_terms: search_conditions.append(f"di.search_vector @@ plainto_tsquery('english', ${param_idx})") params.append(term) param_idx += 1 if not search_conditions: search_conditions = [f"di.search_vector @@ plainto_tsquery('english', ${param_idx})"] params.append(query) param_idx += 1 where_clause = f"({' OR '.join(search_conditions)})" # Add filters if business_category_filter: where_clause += f" AND di.business_categories ? ${param_idx}" params.append(business_category_filter) param_idx += 1 if technical_category_filter: where_clause += f" AND di.technical_category = ${param_idx}" params.append(technical_category_filter) param_idx += 1 if interface_filter: where_clause += f" AND ${param_idx} = ANY(di.interface_types)" params.append(interface_filter) param_idx += 1 params.append(max_results) limit_param = param_idx query_sql = f""" SELECT di.dataset_id::TEXT, di.dataset_name::TEXT, di.inferred_purpose, di.typical_usage, di.business_categories, di.technical_category, di.interface_types, di.key_fields, di.query_patterns, di.nested_field_paths, di.nested_field_analysis, di.common_use_cases, di.data_frequency, ts_rank(di.search_vector, plainto_tsquery('english', $1))::REAL as rank FROM datasets_intelligence di WHERE di.excluded = FALSE AND {where_clause} ORDER BY rank DESC LIMIT ${limit_param} """ dataset_results = await conn.fetch(query_sql, *params) # Search metrics if requested if should_fetch_metrics: metric_results = await conn.fetch(""" SELECT * FROM search_metrics_enhanced($1, $2, $3, $4, $5) """, query, max_results, business_category_filter, technical_category_filter, 0.2) else: return """# Discovery Error **Issue**: No search criteria provided **Required**: At least one of the following: - `query`: Search term - `dataset_id`: Exact dataset ID - `dataset_name`: Exact dataset name - `metric_name`: Exact metric name **Examples**: ```python discover_context("error service") discover_context(dataset_id="42161740") discover_context(metric_name="span_error_count_5m") ```""" # Check if we found anything if not dataset_results and not metric_results: search_term = query or dataset_id or dataset_name or metric_name total_datasets = await conn.fetchval("SELECT COUNT(*) FROM datasets_intelligence WHERE excluded = FALSE") total_metrics = await conn.fetchval("SELECT COUNT(*) FROM metrics_intelligence WHERE excluded = FALSE") return f"""# Discovery Results **Search**: "{search_term}" **Found**: 0 results **Suggestions**: - Try broader search terms - Remove filters to see all results - Check spelling and try alternative terms **Available in database**: - {total_datasets} datasets - {total_metrics} metrics **Examples**: ```python discover_context("error") # Broad search discover_context("kubernetes") # Infrastructure search discover_context("latency") # Performance metrics ```""" # Format results output_parts = [] # Header mode_indicator = "**Mode**: Detail (Complete Schema)" if is_detail_mode else "**Mode**: Search (Lightweight Browsing - NO schemas shown)" if query: output_parts.append(f"# Discovery Results for \"{query}\"\n") else: output_parts.append(f"# Discovery Results\n") output_parts.append(f"{mode_indicator}\n") output_parts.append(f"**Found**: {len(dataset_results)} datasets, {len(metric_results)} metrics\n") # DATASETS SECTION if dataset_results: output_parts.append("\n" + "=" * 80) output_parts.append("\n## 📊 Datasets (LOG/SPAN/RESOURCE Interfaces)\n") output_parts.append("**Query Pattern**: Standard OPAL (filter, make_col, statsby)\n") for i, row in enumerate(dataset_results, 1): if is_detail_mode: output_parts.append(_format_dataset_detail(row, i, json)) else: output_parts.append(_format_dataset_summary(row, i, json)) # METRICS SECTION if metric_results: output_parts.append("\n" + "=" * 80) output_parts.append("\n## 📈 Metrics (METRIC Interface)\n") output_parts.append("**Query Pattern**: align + m() + aggregate (REQUIRED!)\n") for i, row in enumerate(metric_results, 1): if is_detail_mode: output_parts.append(_format_metric_detail(row, i, json)) else: output_parts.append(_format_metric_summary(row, i, json)) # NEXT STEPS output_parts.append("\n" + "=" * 80) output_parts.append("\n## Next Steps\n") if is_detail_mode: output_parts.append(""" **For Datasets**: 1. Use `execute_opal_query(query="...", primary_dataset_id="dataset_id")` 2. Copy field names exactly as shown (case-sensitive!) 3. Quote nested fields with dots: `resource_attributes."k8s.namespace.name"` **For Metrics**: 1. Use `execute_opal_query()` with align + m() + aggregate pattern 2. Use dimensions shown above for group_by operations 3. See example queries in each metric's details """) else: output_parts.append(f""" 💡 **Remember**: Get complete schema before querying → `discover_context(dataset_id="...")` or `discover_context(metric_name="...")` """) result = "\n".join(output_parts) semantic_logger.info(f"unified discovery complete | datasets:{len(dataset_results)} | metrics:{len(metric_results)}") return result finally: await conn.close() except Exception as e: import traceback error_details = traceback.format_exc() semantic_logger.error(f"discovery error | {str(e)} | {error_details}") return f"""# Discovery Error **Error**: {str(e)} **Troubleshooting**: - Check database connectivity - Verify SEMANTIC_GRAPH_PASSWORD is set - Try simpler search terms - Check the server logs for details **Support**: If the issue persists, contact support with this error message.""" # Helper functions for formatting (defined at module level for use by discover tool) def _format_dataset_summary(row: Dict, index: int, json) -> str: """Format lightweight dataset summary for search/discovery.""" combined_score = row.get('rank', 0.0) interfaces_str = ', '.join(row['interface_types']) if row.get('interface_types') else 'unknown' business_cats = json.loads(row['business_categories']) if row.get('business_categories') else ['Unknown'] return f""" ### {index}. {row['dataset_name']} **ID**: `{row['dataset_id']}` **Category**: {', '.join(business_cats)} / {row.get('technical_category', 'Unknown')} **Interfaces**: {interfaces_str} **Purpose**: {row.get('inferred_purpose', 'N/A')} **Relevance**: {combined_score:.3f} """ def _format_dataset_detail(row: Dict, index: int, json) -> str: """Format complete dataset details with full schema.""" try: query_patterns = json.loads(row.get('query_patterns', '[]')) if row.get('query_patterns') else [] nested_field_paths = json.loads(row.get('nested_field_paths', '{}')) if row.get('nested_field_paths') else {} common_use_cases = row.get('common_use_cases', []) or [] except (json.JSONDecodeError, TypeError): query_patterns = [] nested_field_paths = {} common_use_cases = [] interfaces_str = ', '.join(row['interface_types']) if row.get('interface_types') else 'unknown' business_cats = json.loads(row['business_categories']) if row.get('business_categories') else ['Unknown'] # Build schema information schema_str = "\n**COMPLETE SCHEMA**:\n" all_fields_info = {} # Add top-level fields if row.get('key_fields'): for field in row['key_fields']: if not field.startswith('link_'): all_fields_info[field] = {"type": "unknown", "samples": []} # Add nested fields if nested_field_paths: for field_path, field_info in nested_field_paths.items(): if not field_path.startswith('link_'): if isinstance(field_info, dict): all_fields_info[field_path] = { "type": field_info.get("type", "unknown"), "samples": field_info.get("sample_values", [])[:2] } # Format fields top_level = [f for f in all_fields_info.keys() if '.' not in f] nested = [f for f in all_fields_info.keys() if '.' in f] if top_level: schema_str += "\n**Top-Level Fields**:\n" for field in sorted(top_level): info = all_fields_info[field] type_str = f"({info['type']})" if info['type'] != 'unknown' else "" samples_str = f" → {info['samples']}" if info['samples'] else "" schema_str += f" • `{field}` {type_str}{samples_str}\n" if nested: schema_str += "\n**Nested Fields (MUST QUOTE!)**:\n" for field in sorted(nested): info = all_fields_info[field] type_str = f"({info['type']})" if info['type'] != 'unknown' else "" samples_str = f" → {info['samples']}" if info['samples'] else "" schema_str += f" • `{field}` {type_str}{samples_str}\n" # Query example query_ex = "" if query_patterns and len(query_patterns) > 0: pattern = query_patterns[0] if isinstance(pattern, dict) and pattern.get('pattern'): query_ex = f"\n**Query Example**:\n```\n{pattern['pattern']}\n```\n" # Usage scenarios usage_str = "" if common_use_cases: usage_str = f"\n**Common Uses**: {', '.join(common_use_cases[:2])}\n" return f""" ### {index}. {row['dataset_name']} **ID**: `{row['dataset_id']}` **Category**: {', '.join(business_cats)} / {row.get('technical_category', 'Unknown')} **Interfaces**: {interfaces_str} **Purpose**: {row.get('inferred_purpose', 'N/A')} **Usage**: {row.get('typical_usage', 'N/A')} {schema_str}{query_ex}{usage_str}**Frequency**: {row.get('data_frequency', 'unknown')} """ def _format_metric_summary(row: Dict, index: int, json) -> str: """Format minimal metric summary for search/discovery - NO dimensions shown.""" combined_score = max(row.get('rank', 0.0), row.get('similarity_score', 0.0)) business_cats = json.loads(row.get('business_categories', '[]')) if row.get('business_categories') else ['Unknown'] return f""" ### {index}. {row['metric_name']} **Dataset**: {row.get('dataset_name', 'Unknown')} **ID**: `{row['dataset_id']}` **Category**: {', '.join(business_cats)} / {row.get('technical_category', 'Unknown')} **Purpose**: {row.get('inferred_purpose', 'N/A')} **Relevance**: {combined_score:.3f} """ def _format_metric_detail(row: Dict, index: int, json) -> str: """Format complete metric details with full dimensions.""" try: dimensions = json.loads(row.get('common_dimensions', '{}')) if row.get('common_dimensions') else {} value_range = json.loads(row.get('value_range', '{}')) if row.get('value_range') else {} query_patterns = json.loads(row.get('query_patterns', '[]')) if row.get('query_patterns') else [] except (json.JSONDecodeError, TypeError): dimensions = {} value_range = {} query_patterns = [] business_cats = json.loads(row.get('business_categories', '[]')) if row.get('business_categories') else ['Unknown'] # Format dimensions with cardinality (CRITICAL - addresses #1 pain point!) dim_text = "\n**✨ AVAILABLE DIMENSIONS (for group_by)**:\n" dim_keys = [k for k in dimensions.keys() if not k.startswith('link_')] if dim_keys: for dim in sorted(dim_keys): dim_info = dimensions[dim] if isinstance(dim_info, dict): cardinality = dim_info.get('unique_count', 'unknown') dim_text += f" • `{dim}` ({cardinality} unique values)\n" else: dim_text += f" • `{dim}`\n" else: dim_text += " • No dimensions (metric is pre-aggregated)\n" # Value range range_text = "" if value_range and isinstance(value_range, dict): if 'min' in value_range and 'max' in value_range: range_text = f"\n**Value Range**: {value_range.get('min', 'N/A')} - {value_range.get('max', 'N/A')}\n" # Query example query_ex = "" if query_patterns and len(query_patterns) > 0: pattern = query_patterns[0] pattern_text = pattern.get('pattern', '') if isinstance(pattern, dict) else str(pattern) if pattern_text: query_ex = f"\n**Query Example**:\n```\n{pattern_text}\n```\n" # Last seen last_seen = row.get('last_seen', 'Unknown') if hasattr(last_seen, 'strftime'): last_seen = last_seen.strftime('%Y-%m-%d %H:%M') metric_type = row.get('metric_type', 'unknown') return f""" ### {index}. {row['metric_name']} **Dataset**: {row.get('dataset_name', 'Unknown')} **ID**: `{row['dataset_id']}` **Category**: {', '.join(business_cats)} / {row.get('technical_category', 'Unknown')} **Type**: {metric_type} **Purpose**: {row.get('inferred_purpose', 'N/A')} **Usage**: {row.get('typical_usage', 'N/A')} {dim_text}{range_text}{query_ex}**Frequency**: {row.get('data_frequency', 'unknown')} | **Last Seen**: {last_seen} """ if __name__ == "__main__": import signal import atexit # Register shutdown handler for telemetry def shutdown_handler(): if telemetry_enabled: from src.telemetry.config import shutdown_telemetry shutdown_telemetry() # Register shutdown on exit and signal atexit.register(shutdown_handler) signal.signal(signal.SIGTERM, lambda signum, frame: shutdown_handler()) signal.signal(signal.SIGINT, lambda signum, frame: shutdown_handler()) # Run the MCP server mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server