Skip to main content
Glama

Observe Community MCP Server

by rustomax
observe_server.py40.6 kB
#!/usr/bin/env python3 """ Observe MCP Server A Model Context Protocol server that provides access to Observe API functionality using organized modules for better maintainability and reusability. """ import os import sys from typing import Dict, Any, Optional, List, Union try: from typing_extensions import TypedDict except ImportError: from typing import TypedDict # Type definitions for better type safety class ErrorResponse(TypedDict): error: bool message: str try: from dotenv import load_dotenv pass except Exception as e: pass raise # Load environment variables from .env file first load_dotenv() # Initialize OpenTelemetry instrumentation early from src.telemetry import initialize_telemetry, initialize_metrics from src.telemetry.decorators import trace_mcp_tool, trace_observe_api_call, trace_database_operation from src.telemetry.utils import add_mcp_context, add_observe_context, add_database_context telemetry_enabled = initialize_telemetry() # Initialize metrics if telemetry is enabled if telemetry_enabled: metrics_enabled = initialize_metrics() else: metrics_enabled = False # Import BM25 document search try: from src.postgres.doc_search import search_docs_bm25 as search_docs except ImportError as e: # Define fallback search function def search_docs(query: str, n_results: int = 5) -> List[Dict[str, Any]]: return [{ "text": f"Error: PostgreSQL BM25 search not available. The server cannot perform document search because the BM25 modules are not properly installed. Please ensure PostgreSQL is running and the documentation_chunks table exists. Your query was: {query}", "source": "error", "title": "BM25 Search Not Available", "score": 1.0 }] # Import organized Observe API modules from src.observe import ( execute_opal_query as observe_execute_opal_query ) # Import organized auth modules from src.auth import ( create_authenticated_mcp, requires_scopes, initialize_auth_middleware, setup_auth_provider ) # Import standardized logging from src.logging import ( get_logger, set_session_context, log_session_context, session_logger, semantic_logger, opal_logger ) from fastmcp import Context # Create FastMCP instance with authentication mcp = create_authenticated_mcp(server_name="observe-community") # Initialize auth middleware for statistics and logging auth_provider = setup_auth_provider() initialize_auth_middleware(auth_provider) # Track sessions that have called get_system_prompt session_prompt_status = {} def check_system_prompt_called(ctx: Context, tool_name: str) -> Optional[str]: """Check if system prompt has been called for this session""" session_id = ctx.session_id if session_id not in session_prompt_status and tool_name != "get_system_prompt": return f"""🚨 CRITICAL: System prompt not loaded for this session! ⚡ You MUST call get_system_prompt() first to access specialized Observe platform expertise. Without the system prompt, you'll lack: - Verified OPAL syntax patterns - Observe investigation methodology - Performance optimization strategies - Proper tool usage protocols 📝 Please run: get_system_prompt() before proceeding with {tool_name}. """ return None # Configure FastAPI instrumentation if telemetry is enabled if telemetry_enabled: from src.telemetry.config import instrument_fastapi_app # Note: FastMCP wraps FastAPI, so we'll instrument the underlying app if hasattr(mcp, 'app'): instrument_fastapi_app(mcp.app) @mcp.tool() @requires_scopes(['admin', 'write', 'read']) @trace_mcp_tool(tool_name="execute_opal_query", record_args=True, record_result=False) async def execute_opal_query(ctx: Context, query: str, dataset_id: str = None, primary_dataset_id: str = None, secondary_dataset_ids: Optional[str] = None, dataset_aliases: Optional[str] = None, time_range: Optional[str] = "1h", start_time: Optional[str] = None, end_time: Optional[str] = None, format: Optional[str] = "csv", timeout: Optional[float] = None) -> str: """ Execute an OPAL query on single or multiple datasets. Args: query: The OPAL query to execute dataset_id: DEPRECATED: Use primary_dataset_id instead. Kept for backward compatibility. primary_dataset_id: The ID of the primary dataset to query secondary_dataset_ids: Optional JSON string list of secondary dataset IDs (e.g., '["44508111"]') dataset_aliases: Optional JSON string mapping of aliases to dataset IDs (e.g., '{"volumes": "44508111"}') time_range: Time range for the query (e.g., "1h", "1d", "7d"). Used if start_time and end_time are not provided. start_time: Optional start time in ISO format (e.g., "2023-04-20T16:20:00Z") end_time: Optional end time in ISO format (e.g., "2023-04-20T16:30:00Z") format: Output format, either "csv" or "ndjson" (default: "csv") timeout: Request timeout in seconds (default: uses client default of 30s) Examples: # Single dataset query (backward compatible) execute_opal_query(query="filter metric = 'CPUUtilization'", dataset_id="44508123") # Multi-dataset join query execute_opal_query( query="join on(instanceId=@volumes.instanceId), volume_size:@volumes.size", primary_dataset_id="44508123", # EC2 Instance Metrics secondary_dataset_ids='["44508111"]', # EBS Volumes (JSON string) dataset_aliases='{"volumes": "44508111"}' # Aliases (JSON string) ) """ import json # Check if system prompt has been called prompt_check = check_system_prompt_called(ctx, "execute_opal_query") if prompt_check: return prompt_check # Log the OPAL query operation with sanitized query (truncated for security) query_preview = query[:100] + "..." if len(query) > 100 else query dataset_info = primary_dataset_id or dataset_id opal_logger.info(f"query execution | dataset:{dataset_info} | query:'{query_preview}' | time_range:{time_range}") # Parse JSON string parameters if provided parsed_secondary_dataset_ids = None parsed_dataset_aliases = None if secondary_dataset_ids: try: parsed_secondary_dataset_ids = json.loads(secondary_dataset_ids) except (json.JSONDecodeError, TypeError) as e: return f"Error parsing secondary_dataset_ids: {e}. Expected JSON array like ['44508111']" if dataset_aliases: try: parsed_dataset_aliases = json.loads(dataset_aliases) except (json.JSONDecodeError, TypeError) as e: return f"Error parsing dataset_aliases: {e}. Expected JSON object like {{\"volumes\": \"44508111\"}}" return await observe_execute_opal_query( query=query, dataset_id=dataset_id, primary_dataset_id=primary_dataset_id, secondary_dataset_ids=parsed_secondary_dataset_ids, dataset_aliases=parsed_dataset_aliases, time_range=time_range, start_time=start_time, end_time=end_time, format=format, timeout=timeout ) @mcp.tool() @requires_scopes(['admin', 'read']) @trace_mcp_tool(tool_name="get_relevant_docs", record_args=True, record_result=False) async def get_relevant_docs(ctx: Context, query: str, n_results: int = 5) -> str: """Get relevant documentation for a query using PostgreSQL BM25 search""" try: # Import required modules import os from collections import defaultdict # Check if system prompt has been called prompt_check = check_system_prompt_called(ctx, "get_relevant_docs") if prompt_check: return prompt_check # Log the documentation search operation semantic_logger.info(f"docs search | query:'{query}' | n_results:{n_results}") chunk_results = await search_docs(query, n_results=max(n_results * 3, 15)) # Get more chunks to ensure we have enough from relevant docs if not chunk_results: return f"No relevant documents found for: '{query}'" # Group chunks by source document docs_by_source = defaultdict(list) for result in chunk_results: source = result.get("source", "") if source and source != "error": docs_by_source[source].append(result) # Calculate average score for each document doc_scores = {} for source, chunks in docs_by_source.items(): avg_score = sum(chunk.get("score", 0.0) for chunk in chunks) / len(chunks) doc_scores[source] = avg_score # Sort documents by average score and limit to requested number sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[:n_results] if not sorted_docs: return f"No valid documents found for: '{query}'" response = f"Found {len(sorted_docs)} relevant documents for: '{query}'\\n\\n" # Read and format each full document for i, (source, score) in enumerate(sorted_docs, 1): try: # Read the entire document file with open(source, 'r', encoding='utf-8') as f: document_content = f.read() # Get metadata from the first chunk of this source first_chunk = docs_by_source[source][0] title = first_chunk.get("title", os.path.basename(source).replace(".md", "").replace("_", " ").title()) source_filename = os.path.basename(source) response += f"### Document {i}: {title}\\n" response += f"Source: {source_filename}\\n" response += f"Relevance Score: {score:.2f}\\n\\n" response += f"{document_content}\\n\\n\\n" response += "----------------------------------------\\n\\n" except Exception as e: # Use the chunk text as fallback if we can't read the file chunks_text = "\\n\\n".join([chunk.get("text", "") for chunk in docs_by_source[source]]) title = os.path.basename(source).replace(".md", "").replace("_", " ").title() response += f"### Document {i}: {title}\\n" response += f"Source: {os.path.basename(source)}\\n" response += f"Relevance Score: {score:.2f}\\n" response += f"Note: Could not read the full document file. Showing available chunks.\\n\\n" response += f"{chunks_text}\\n\\n\\n" response += "----------------------------------------\\n\\n" # Log successful documentation search semantic_logger.info(f"docs search complete | found:{len(sorted_docs)} documents | chunks:{len(chunk_results)}") return response except Exception as e: return f"Error retrieving relevant documents: {str(e)}. Make sure you've populated the BM25 index by running scripts/populate_docs_bm25.py." @mcp.tool() @requires_scopes(['admin', 'write', 'read']) @trace_mcp_tool(tool_name="get_system_prompt", record_args=False, record_result=False) async def get_system_prompt(ctx: Context) -> str: """ 🚨 CRITICAL: MUST BE CALLED FIRST BEFORE ANY OTHER TOOLS 🚨 This tool provides the specialized Observe platform expertise that transforms generic LLMs into expert Observe analysts. Without this prompt, LLMs will: - Use incorrect OPAL syntax - Make inefficient dataset queries - Provide generic instead of Observe-specific guidance - Query non-existent fields causing errors ⚡ MANDATORY WORKFLOW: get_system_prompt() → discover_datasets/metrics() → execute_opal_query() Returns the complete system prompt that defines: - Observe platform investigation methodology - Schema validation requirements (CRITICAL for query success) - Verified OPAL syntax patterns - Performance optimization strategies - Tool usage protocols Returns: Complete system prompt as plain text (ready for immediate adoption) """ try: # No need to print HTTP request info in get_system_prompt anymore # Try to get the access token from the request for debugging purposes try: from fastmcp.server.dependencies import get_access_token, AccessToken access_token: Optional[AccessToken] = get_access_token() if access_token is None: session_logger.warning("no access token available in get_system_prompt") else: # Extract JWT payload if available jwt_payload = None if hasattr(access_token, 'token'): raw_token = access_token.token # Try to decode the token try: import base64 import json parts = raw_token.split('.') if len(parts) == 3: # Decode payload padded = parts[1] + '=' * (4 - len(parts[1]) % 4) if len(parts[1]) % 4 else parts[1] decoded = base64.urlsafe_b64decode(padded) jwt_payload = json.loads(decoded) except Exception as e: pass # Log session context for correlation effective_scopes = jwt_payload.get('scopes', []) if jwt_payload else access_token.scopes or [] log_session_context( user_id=access_token.client_id, session_id=ctx.session_id, scopes=effective_scopes, action="system_prompt" ) # Mark that this session has loaded the system prompt session_prompt_status[ctx.session_id] = True except Exception as e: set_session_context(ctx.session_id) session_logger.error(f"authentication failed | error:{str(e)[:50]}") # Get the directory where the script is located script_dir = os.path.dirname(os.path.abspath(__file__)) # Construct the path to the prompt file prompt_file = os.path.join(script_dir, "prompts", "Observe MCP System Prompt.md") # Read the prompt from file with open(prompt_file, 'r', encoding='utf-8') as f: system_prompt = f.read().strip() if not system_prompt: raise ValueError("System prompt file is empty") # Return the system prompt directly from the file return system_prompt except Exception as e: session_logger.error(f"exception getting system prompt | error:{e}") import traceback traceback.print_exc(file=sys.stderr) return {"error": True, "message": f"Exception getting system prompt: {str(e)}"} @mcp.tool() @requires_scopes(['admin', 'read']) @trace_mcp_tool(tool_name="discover_datasets", record_args=True, record_result=False) async def discover_datasets(ctx: Context, query: str, max_results: int = 15, business_category_filter: Optional[str] = None, technical_category_filter: Optional[str] = None, interface_filter: Optional[str] = None) -> str: """ Discover datasets using fast full-text search on our dataset intelligence database. This tool searches through analyzed datasets with intelligent categorization and usage guidance. Perfect for finding datasets by name, purpose, business area, or technical type. CRITICAL: This tool returns essential schema information that MUST be analyzed before querying: - Key Fields: Exact field names available for filtering and selection - Nested Fields: JSON structure for complex field access - Dataset Type & Interface: Determines query patterns (log vs metric vs trace) Args: query: Search query (e.g., "kubernetes logs", "service metrics", "error traces", "user sessions") max_results: Maximum number of datasets to return (default: 15, max: 30) business_category_filter: Filter by business category (Infrastructure, Application, Database, User, Security, etc.) technical_category_filter: Filter by technical category (Logs, Metrics, Traces, Events, Resources, etc.) interface_filter: Filter by interface type (log, metric, otel_span, etc.) Returns: Formatted list of matching datasets with their purposes, usage guidance, and SCHEMA INFORMATION Examples: discover_datasets("kubernetes logs errors") discover_datasets("service metrics performance", business_category_filter="Application") discover_datasets("database traces", technical_category_filter="Traces", max_results=10) discover_datasets("infrastructure logs", interface_filter="log") """ try: import asyncpg import json from typing import List, Dict, Any # Check if system prompt has been called prompt_check = check_system_prompt_called(ctx, "discover_datasets") if prompt_check: return prompt_check # Log the semantic search operation semantic_logger.info(f"dataset search | query:'{query}' | max_results:{max_results} | filters:{business_category_filter or technical_category_filter or interface_filter}") # Database connection using environment variables DATABASE_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'semantic_graph')}:{os.getenv('SEMANTIC_GRAPH_PASSWORD', 'g83hbeyB32792r3Gsjnfwe0ihf2')}@{os.getenv('POSTGRES_HOST', 'localhost')}:{os.getenv('POSTGRES_PORT', '5432')}/{os.getenv('POSTGRES_DB', 'semantic_graph')}" # Validate parameters max_results = min(max(1, max_results), 30) # Clamp between 1 and 30 # Connect to database and search conn = await asyncpg.connect(DATABASE_URL) try: # Enhanced manual query with better search capabilities # Split complex queries into individual terms for better matching search_terms = query.lower().split() # Create individual search conditions for better matching search_conditions = [] params = [] param_idx = 1 for term in search_terms: search_conditions.append(f"di.search_vector @@ plainto_tsquery('english', ${param_idx})") params.append(term) param_idx += 1 # If no individual terms match, fall back to full query if not search_conditions: search_conditions = [f"di.search_vector @@ plainto_tsquery('english', ${param_idx})"] params.append(query) param_idx += 1 # Combine search conditions with OR for better matching where_clause = f"({' OR '.join(search_conditions)})" # Add filters if business_category_filter: where_clause += f" AND di.business_categories ? ${param_idx}" params.append(business_category_filter) param_idx += 1 if technical_category_filter: where_clause += f" AND di.technical_category = ${param_idx}" params.append(technical_category_filter) param_idx += 1 if interface_filter: where_clause += f" AND ${param_idx} = ANY(di.interface_types)" params.append(interface_filter) param_idx += 1 # Add limit parameter params.append(max_results) limit_param = param_idx query_sql = f""" SELECT di.dataset_id::TEXT, di.dataset_name::TEXT, di.inferred_purpose, di.typical_usage, di.business_categories, di.technical_category, di.interface_types, di.key_fields, di.query_patterns, di.nested_field_paths, di.nested_field_analysis, di.common_use_cases, di.data_frequency, FALSE as excluded, ts_rank(di.search_vector, plainto_tsquery('english', $1))::REAL as rank, 0.0::REAL as similarity_score FROM datasets_intelligence di WHERE di.excluded = FALSE AND {where_clause} ORDER BY rank DESC LIMIT ${limit_param} """ results = await conn.fetch(query_sql, *params) if not results: return f"""# 🔍 Dataset Discovery Results **Query**: "{query}" **Found**: 0 datasets **No matching datasets found.** **Suggestions**: - Try broader terms (e.g., "logs" instead of "error logs") - Remove filters to see all results - Check available categories: Infrastructure, Application, Database, User, Security, Monitoring **Available datasets**: {await conn.fetchval("SELECT COUNT(*) FROM datasets_intelligence WHERE excluded = FALSE")} total datasets in the database. """ # Format results formatted_results = [] for i, row in enumerate(results, 1): # Parse JSON fields safely try: query_patterns = json.loads(row.get('query_patterns', '[]')) if row.get('query_patterns') else [] nested_field_paths = json.loads(row.get('nested_field_paths', '{}')) if row.get('nested_field_paths') else {} nested_field_analysis = json.loads(row.get('nested_field_analysis', '{}')) if row.get('nested_field_analysis') else {} common_use_cases = row.get('common_use_cases', []) or [] except (json.JSONDecodeError, TypeError): query_patterns = [] nested_field_paths = {} nested_field_analysis = {} common_use_cases = [] # Format interface types interfaces_str = "" if row['interface_types']: interfaces_str = f"**Interfaces**: {', '.join(row['interface_types'])}\n" # Format ALL available fields with complete schema information schema_info_str = "" # Combine all fields from key_fields and nested_field_paths all_fields_info = {} # Add top-level fields from key_fields if row.get('key_fields'): for field in row['key_fields']: all_fields_info[field] = {"type": "unknown", "sample_values": []} # Add detailed nested field information if nested_field_paths: for field_path, field_info in nested_field_paths.items(): if isinstance(field_info, dict): all_fields_info[field_path] = { "type": field_info.get("type", "unknown"), "sample_values": field_info.get("sample_values", [])[:3] # Show 3 samples max } else: all_fields_info[field_path] = {"type": "unknown", "sample_values": []} if all_fields_info: schema_info_str = "🚨 **COMPLETE SCHEMA - USE EXACT FIELD NAMES & TYPES**:\n" # Sort fields: top-level first, then nested top_level_fields = [f for f in all_fields_info.keys() if '.' not in f] nested_fields = [f for f in all_fields_info.keys() if '.' in f] for field_list, header in [(top_level_fields, "📋 **Top-Level Fields**"), (nested_fields, "📍 **Nested Fields**")]: if field_list: schema_info_str += f"\n{header}:\n" for field in sorted(field_list)[:15]: # Limit to 15 per section to manage size field_info = all_fields_info[field] type_info = f"({field_info['type']})" if field_info['type'] != 'unknown' else "" # Show sample values with type hints for duration fields samples_str = "" if field_info['sample_values']: samples = field_info['sample_values'][:2] # Show 2 samples max samples_str = f" → {samples}" # Add duration unit hints if any(keyword in field.lower() for keyword in ['time', 'elapsed', 'duration', 'timestamp']): if any(len(str(s)) >= 15 for s in samples if str(s).isdigit()): samples_str += " (⏱️ likely nanoseconds)" elif any(len(str(s)) == 13 for s in samples if str(s).isdigit()): samples_str += " (⏱️ likely milliseconds)" schema_info_str += f" • `{field}` {type_info}{samples_str}\n" if len(field_list) > 15: schema_info_str += f" • ... (+{len(field_list)-15} more {header.lower()} fields)\n" schema_info_str += "\n" # Format query guidance query_guidance_str = "" if query_patterns and len(query_patterns) > 0: primary_pattern = query_patterns[0] if isinstance(primary_pattern, dict) and primary_pattern.get('pattern'): query_guidance_str = f"**Query Example**: `{primary_pattern['pattern']}`\n" # Format usage scenarios usage_str = "" if common_use_cases: usage_scenarios = common_use_cases[:2] # Show top 2 usage_str = f"**Common Uses**: {', '.join(usage_scenarios)}\n" # Calculate combined relevance score combined_score = max(row['rank'], row.get('similarity_score', 0)) score_details = [] if row['rank'] > 0: score_details.append(f"text-match: {row['rank']:.3f}") if row.get('similarity_score', 0) > 0: score_details.append(f"similarity: {row['similarity_score']:.3f}") result_text = f"""## {i}. {row['dataset_name']} **Dataset ID**: `{row['dataset_id']}` **Category**: {', '.join(json.loads(row['business_categories']) if row['business_categories'] else ['Unknown'])} / {row['technical_category']} {interfaces_str}**Purpose**: {row['inferred_purpose']} **Usage**: {row.get('typical_usage', 'Not specified')} {schema_info_str}{query_guidance_str}{usage_str}**Frequency**: {row.get('data_frequency', 'unknown')} **Relevance Score**: {combined_score:.3f} ({', '.join(score_details) if score_details else 'fuzzy-match'}) """ formatted_results.append(result_text) # Get summary stats total_datasets = await conn.fetchval("SELECT COUNT(*) FROM datasets_intelligence WHERE excluded = FALSE") category_counts = await conn.fetch(""" SELECT jsonb_array_elements_text(business_categories) as business_category, COUNT(*) as count FROM datasets_intelligence WHERE excluded = FALSE GROUP BY jsonb_array_elements_text(business_categories) ORDER BY count DESC LIMIT 5 """) category_summary = ", ".join([f"{row['business_category']} ({row['count']})" for row in category_counts[:3]]) # Log successful results semantic_logger.info(f"dataset search complete | found:{len(results)} datasets | total_available:{total_datasets}") return f"""# 🎯 Dataset Discovery Results **Query**: "{query}" **Found**: {len(results)} datasets (showing top {max_results}) **Search Scope**: {total_datasets} total datasets | Top categories: {category_summary} {chr(10).join(formatted_results)} --- 💡 **Next Steps**: - Use `execute_opal_query()` with the dataset ID to query the data - Use `discover_metrics()` to find related metrics for analysis """ finally: await conn.close() except ImportError as e: return f"""# ❌ Dataset Discovery Error **Issue**: Required database library not available **Error**: {str(e)} **Solution**: The dataset intelligence system requires asyncpg. Please install it with: pip install asyncpg""" except Exception as e: import traceback tb = traceback.format_exc() return f"""# ❌ Dataset Discovery Error **Issue**: Database query failed **Error**: {str(e)} **Type**: {type(e).__name__} **Traceback**: ``` {tb[:1000]} ``` **Query Params**: query='{query}', business_filter='{business_category_filter}', max_results={max_results} **Solution**: Check database connection and ensure dataset intelligence has been populated.""" @mcp.tool() @requires_scopes(['admin', 'read']) @trace_mcp_tool(tool_name="discover_metrics", record_args=True, record_result=False) async def discover_metrics(ctx: Context, query: str, max_results: int = 20, category_filter: Optional[str] = None, technical_filter: Optional[str] = None) -> str: """ Discover observability metrics using fast full-text search on our metrics intelligence database. This tool searches through 491+ analyzed metrics with intelligent categorization and usage guidance. Perfect for finding metrics by name, purpose, dimensions, or use case. IMPORTANT: This tool provides error FREQUENCIES and performance metrics. For complete error analysis (actual error messages, stack traces), follow up with log dataset queries using discover_datasets(). Args: query: Search query (e.g., "error rate", "cpu usage", "database latency", "service performance") max_results: Maximum number of metrics to return (default: 20, max: 50) category_filter: Filter by business category (Infrastructure, Application, Database, Storage, Network, Monitoring) technical_filter: Filter by technical category (Error, Latency, Count, Performance, Resource, Throughput, Availability) Returns: Formatted list of matching metrics with their datasets, purposes, and usage guidance Examples: discover_metrics("error rate service") # Gets error counts - follow with logs for error details discover_metrics("cpu memory usage", category_filter="Infrastructure") discover_metrics("latency duration", technical_filter="Latency", max_results=10) """ try: import asyncpg import json from typing import List, Dict, Any # Check if system prompt has been called prompt_check = check_system_prompt_called(ctx, "discover_metrics") if prompt_check: return prompt_check # Log the semantic search operation semantic_logger.info(f"metrics search | query:'{query}' | max_results:{max_results} | filters:{category_filter or technical_filter}") # Database connection using environment variables DATABASE_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'semantic_graph')}:{os.getenv('SEMANTIC_GRAPH_PASSWORD', 'g83hbeyB32792r3Gsjnfwe0ihf2')}@{os.getenv('POSTGRES_HOST', 'localhost')}:{os.getenv('POSTGRES_PORT', '5432')}/{os.getenv('POSTGRES_DB', 'semantic_graph')}" # Validate parameters max_results = min(max(1, max_results), 50) # Clamp between 1 and 50 # Connect to database and search conn = await asyncpg.connect(DATABASE_URL) try: # Use the enhanced search function with trigram similarity results = await conn.fetch(""" SELECT * FROM search_metrics_enhanced($1, $2, $3, $4, $5) """, query, max_results, category_filter, technical_filter, 0.2) if not results: return f"""# 🔍 Metrics Discovery Results **Query**: "{query}" **Results**: No metrics found **Suggestions**: - Try broader terms (e.g., "error" instead of "error_rate") - Check available categories: Infrastructure, Application, Database, Storage, Network, Monitoring - Use technical categories: Error, Latency, Count, Performance, Resource, Throughput, Availability **Available metrics**: {await conn.fetchval("SELECT COUNT(*) FROM metrics_intelligence WHERE excluded = FALSE")} total metrics in the database. """ # Format results formatted_results = [] for i, row in enumerate(results, 1): # Parse JSON fields safely try: dimensions = json.loads(row['common_dimensions']) if row['common_dimensions'] else {} value_range = json.loads(row['value_range']) if row['value_range'] else {} query_patterns = json.loads(row.get('query_patterns', '[]')) if row.get('query_patterns') else [] nested_field_paths = json.loads(row.get('nested_field_paths', '{}')) if row.get('nested_field_paths') else {} nested_field_analysis = json.loads(row.get('nested_field_analysis', '{}')) if row.get('nested_field_analysis') else {} except (json.JSONDecodeError, TypeError): dimensions = {} value_range = {} query_patterns = [] nested_field_paths = {} nested_field_analysis = {} # Format dimension keys dim_keys = list(dimensions.keys()) if dimensions else [] dim_text = f"**Dimensions**: {', '.join(dim_keys[:5])}" if dim_keys else "**Dimensions**: None" if len(dim_keys) > 5: dim_text += f" (+{len(dim_keys)-5} more)" # Format value range range_text = "" if value_range and isinstance(value_range, dict): if 'min' in value_range and 'max' in value_range: range_text = f"**Range**: {value_range.get('min', 'N/A')} - {value_range.get('max', 'N/A')}" # Format last seen last_seen = row['last_seen'].strftime('%Y-%m-%d %H:%M') if row['last_seen'] else 'Unknown' # Format metric type and query patterns metric_type = row.get('metric_type', 'unknown') common_fields = row.get('common_fields', []) # Create enhanced query guidance section query_guidance = "" if query_patterns and len(query_patterns) > 0: # Show primary query pattern primary_pattern = query_patterns[0] pattern_text = primary_pattern.get('pattern', '') if isinstance(primary_pattern, dict) else str(primary_pattern) if pattern_text: query_guidance = f"**Query Pattern**: `{pattern_text}`\n" # Show use case if available if isinstance(primary_pattern, dict) and primary_pattern.get('use_case'): query_guidance += f"**Use Case**: {primary_pattern['use_case']}\n" # Add nested field information with visual prominence if nested_field_paths: important_fields = nested_field_analysis.get('important_fields', []) if nested_field_analysis else [] if important_fields: nested_text = ', '.join(important_fields[:4]) # Show 4 instead of 3 if len(important_fields) > 4: nested_text += f" (+{len(important_fields)-4} more)" query_guidance += f"📍 **Key Nested Fields (EXACT PATHS)**: {nested_text}\n" if common_fields: field_list = ', '.join(common_fields[:4]) # Show 4 instead of 3 if len(common_fields) > 4: field_list += f" (+{len(common_fields)-4} more)" query_guidance += f"🚨 **Common Fields (USE EXACT NAMES)**: {field_list}\n" # Calculate combined relevance score combined_score = max(row['rank'], row.get('similarity_score', 0)) score_details = [] if row['rank'] > 0: score_details.append(f"text-match: {row['rank']:.3f}") if row.get('similarity_score', 0) > 0: score_details.append(f"similarity: {row['similarity_score']:.3f}") result_text = f"""## {i}. {row['metric_name']} **Dataset**: {row['dataset_name']} **Dataset ID**: `{row['dataset_id']}` **Category**: {', '.join(json.loads(row['business_categories']) if row['business_categories'] else ['Unknown'])} / {row['technical_category']} **Type**: {metric_type} **Purpose**: {row['inferred_purpose']} **Usage**: {row['typical_usage']} {dim_text} {query_guidance}{range_text} **Frequency**: {row['data_frequency']} | **Last Seen**: {last_seen} **Relevance Score**: {combined_score:.3f} ({', '.join(score_details) if score_details else 'fuzzy-match'}) """ formatted_results.append(result_text) # Get summary stats total_metrics = await conn.fetchval("SELECT COUNT(*) FROM metrics_intelligence WHERE excluded = FALSE") category_counts = await conn.fetch(""" SELECT jsonb_array_elements_text(business_categories) as business_category, COUNT(*) as count FROM metrics_intelligence WHERE excluded = FALSE GROUP BY jsonb_array_elements_text(business_categories) ORDER BY count DESC """) category_summary = ", ".join([f"{row['business_category']} ({row['count']})" for row in category_counts[:3]]) # Log successful results semantic_logger.info(f"metrics search complete | found:{len(results)} metrics | total_available:{total_metrics}") return f"""# 🎯 Metrics Discovery Results **Query**: "{query}" **Found**: {len(results)} metrics (showing top {max_results}) **Search Scope**: {total_metrics} total metrics | Top categories: {category_summary} {chr(10).join(formatted_results)} --- 💡 **Next Steps**: - Use `execute_opal_query()` with the dataset ID to query specific metrics - Use `discover_datasets()` to find related datasets for comprehensive analysis """ finally: await conn.close() except ImportError as e: return f"""# ❌ Metrics Discovery Error **Issue**: Required database library not available **Error**: {str(e)} **Solution**: The metrics intelligence system requires asyncpg. Please install it: ```bash pip install asyncpg ``` """ except Exception as e: return f"""# ❌ Metrics Discovery Error **Query**: "{query}" **Error**: {str(e)} **Possible Causes**: - Database connection failed - Metrics intelligence table not initialized - Invalid search parameters **Solution**: Ensure the metrics intelligence system is running and database is accessible. """ if __name__ == "__main__": import signal import atexit # Register shutdown handler for telemetry def shutdown_handler(): if telemetry_enabled: from src.telemetry.config import shutdown_telemetry shutdown_telemetry() # Register shutdown on exit and signal atexit.register(shutdown_handler) signal.signal(signal.SIGTERM, lambda signum, frame: shutdown_handler()) signal.signal(signal.SIGINT, lambda signum, frame: shutdown_handler()) # Run the MCP server mcp.run(transport="sse", host="0.0.0.0", port=8000)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-community-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server