Skip to main content
Glama
datasets.py12.9 kB
""" Observe dataset operations Provides functions for listing datasets and retrieving dataset information from the Observe platform. """ import sys from typing import List, Dict, Any, Optional from src.logging import get_logger from .client import make_observe_request from .config import validate_observe_config logger = get_logger('DATASETS') async def list_datasets( match: Optional[str] = None, workspace_id: Optional[str] = None, type: Optional[str] = None, interface: Optional[str] = None ) -> str: """ List available datasets in Observe. Args: match: Optional substring to match dataset names workspace_id: Optional workspace ID to filter by type: Optional dataset type to filter by (e.g., 'Event') interface: Optional interface to filter by (e.g., 'metric', 'log') Returns: Formatted string with dataset information """ # Validate configuration config_error = validate_observe_config() if config_error: return config_error # Set up query parameters params = {} if match: params["match"] = match if workspace_id: params["workspaceId"] = workspace_id if type: params["type"] = type if interface: # The API requires this parameter to be passed as a list params["interface"] = [interface] try: # Log the request we're about to make logger.debug(f"requesting datasets | params:{params}") # Make the API call to fetch datasets response = await make_observe_request( method="GET", endpoint="v1/dataset", params=params ) # Handle error responses if isinstance(response, dict) and response.get("error"): return f"Error listing datasets: {response.get('message')}" # Process successful response according to the OpenAPI spec if not isinstance(response, dict): return f"Unexpected response format: {type(response)}. Expected a dictionary." # Extract datasets from the response datasets = [] if response.get("ok") and "data" in response and isinstance(response["data"], list): datasets = response["data"] else: return f"Unexpected response structure: {list(response.keys())}. Expected 'ok' and 'data' fields." if not datasets: return "No datasets found." # Format the response in a user-friendly way return _format_datasets_response(datasets) except Exception as e: import traceback traceback.print_exc(file=sys.stderr) return f"Error in list_datasets function: {str(e)}" async def get_dataset_info(dataset_id: str) -> str: """ Get detailed information about a specific dataset. Args: dataset_id: The ID of the dataset Returns: Formatted string with detailed dataset information """ # Validate configuration config_error = validate_observe_config() if config_error: return config_error try: # Log the request we're about to make logger.debug(f"requesting dataset info | id:{dataset_id}") # Make the API call to fetch dataset information response = await make_observe_request( method="GET", endpoint=f"v1/dataset/{dataset_id}" ) # Handle error responses if isinstance(response, dict) and response.get("error"): return f"Error getting dataset info: {response.get('message')}" # Process successful response according to the OpenAPI spec if not isinstance(response, dict): return f"Unexpected response format: {type(response)}. Expected a dictionary." # Extract dataset from the response if not response.get("ok") or "data" not in response: return f"Unexpected response structure: {list(response.keys())}. Expected 'ok' and 'data' fields." dataset = response["data"] if not dataset: return f"Dataset with ID {dataset_id} not found." # Log the raw dataset for debugging logger.debug(f"processing dataset | data:{dataset}") return _format_dataset_info(dataset, dataset_id) except Exception as e: import traceback traceback.print_exc(file=sys.stderr) return f"Error in get_dataset_info function: {str(e)}" def _format_datasets_response(datasets: List[Dict[str, Any]]) -> str: """ Format the list of datasets into a user-friendly string. Args: datasets: List of dataset dictionaries Returns: Formatted string representation """ result = "Available Datasets:\\n\\n" for i, dataset in enumerate(datasets): try: # Log the raw dataset for debugging logger.debug(f"processing dataset {i+1} | data:{dataset}") # Extract dataset information with robust error handling dataset_id = _extract_dataset_field(dataset, "id", ["id", "meta.id"]) name = _extract_dataset_field(dataset, "name", ["name", "config.name"]) kind = _extract_dataset_field(dataset, "type", ["type", "kind", "state.kind", "state.type"]) workspace_id = _extract_dataset_field(dataset, "workspaceId", ["workspaceId", "meta.workspaceId"]) # Handle interfaces with robust error handling interface_str = _format_dataset_interfaces(dataset) # Add dataset information to the result result += f"Dataset {i+1}:\\n" result += f"ID: {dataset_id}\\n" result += f"Name: {name}\\n" result += f"Type: {kind}\\n" result += f"Workspace ID: {workspace_id}\\n" result += f"Interfaces: {interface_str}\\n" result += "-" * 40 + "\\n" # Limit to 10 datasets to avoid overwhelming output if i >= 9: result += "\\n(Showing first 10 datasets only. Use 'match' parameter to filter results.)\\n" break except Exception as e: logger.error(f"error processing dataset {i+1} | error:{e}") result += f"Error processing dataset {i+1}: {str(e)}\\n" result += "-" * 40 + "\\n" return result def _format_dataset_info(dataset: Dict[str, Any], dataset_id: str) -> str: """ Format detailed dataset information into a user-friendly string. Args: dataset: Dataset dictionary dataset_id: Dataset ID Returns: Formatted string representation """ # Extract dataset information meta = dataset.get("meta", {}) config = dataset.get("config", {}) state = dataset.get("state", {}) # Format the response in a user-friendly way result = f"Dataset Information for ID: {dataset_id}\\n\\n" # Basic information name = config.get("name", "Unnamed dataset") kind = state.get("kind", "Unknown type") workspace_id = meta.get("workspaceId", "Unknown") customer_id = meta.get("customerId", "Unknown") result += f"Name: {name}\\n" result += f"Type: {kind}\\n" result += f"Workspace ID: {workspace_id}\\n" result += f"Customer ID: {customer_id}\\n" # Creation and update information created_by = state.get("createdBy") created_date = state.get("createdDate") updated_by = state.get("updatedBy") updated_date = state.get("updatedDate") if created_by and created_date: result += f"Created: {created_date} (by {created_by})\\n" if updated_by and updated_date: result += f"Updated: {updated_date} (by {updated_by})\\n" # URL path url_path = state.get("urlPath") if url_path: result += f"URL Path: {url_path}\\n" # Interface information interfaces = state.get("interfaces") if interfaces: result += "\\nInterfaces:\\n" result += _format_detailed_interfaces(interfaces) # Schema information columns = state.get("columns") if columns and isinstance(columns, list): result += "\\nSchema:\\n" for column in columns: if isinstance(column, dict): col_name = column.get("name", "Unknown") col_type = column.get("type", "Unknown") result += f" - {col_name} ({col_type})\\n" # Additional configuration label_field = config.get("labelField") if label_field: result += f"\\nLabel Field: {label_field}\\n" primary_key = config.get("primaryKey") if primary_key and isinstance(primary_key, list): result += f"Primary Key: {', '.join(primary_key)}\\n" return result def _extract_dataset_field(dataset: Dict[str, Any], field_name: str, paths: List[str]) -> str: """ Extract a field value from a dataset using multiple possible paths. Args: dataset: Dataset dictionary field_name: Field name for default value paths: List of dotted paths to try Returns: Extracted value or default """ for path in paths: try: value = dataset for part in path.split('.'): if isinstance(value, dict) and part in value: value = value[part] else: break else: # Path succeeded return str(value) if value is not None else f"Unknown {field_name}" except (KeyError, TypeError, AttributeError): continue return f"Unknown {field_name}" def _format_dataset_interfaces(dataset: Dict[str, Any]) -> str: """ Format dataset interfaces for display. Args: dataset: Dataset dictionary Returns: Formatted interfaces string """ # Try different possible locations for interfaces interfaces = None if "interfaces" in dataset and dataset["interfaces"] is not None: interfaces = dataset["interfaces"] elif "state" in dataset and isinstance(dataset["state"], dict) and "interfaces" in dataset["state"] and dataset["state"]["interfaces"] is not None: interfaces = dataset["state"]["interfaces"] if interfaces is None: return "None" if isinstance(interfaces, list): # Handle list of interfaces interface_strings = [] for interface in interfaces: if isinstance(interface, dict): # Extract meaningful value from interface dict if 'name' in interface: interface_strings.append(str(interface['name'])) elif 'type' in interface: interface_strings.append(str(interface['type'])) elif 'path' in interface: interface_strings.append(str(interface['path'])) else: interface_strings.append(str(interface)) elif interface is not None: interface_strings.append(str(interface)) return ", ".join(interface_strings) if interface_strings else "None" elif isinstance(interfaces, dict): return str(interfaces) else: return str(interfaces) def _format_detailed_interfaces(interfaces) -> str: """ Format detailed interface information. Args: interfaces: Interface data structure Returns: Formatted interfaces string """ result = "" if isinstance(interfaces, list): for i, interface in enumerate(interfaces): if isinstance(interface, dict): # If interface is a complex object with path and mapping path = interface.get("path", "Unknown") result += f" {i+1}. {path}\\n" # Show mapping if available mapping = interface.get("mapping") if mapping and isinstance(mapping, list): result += " Mapping:\\n" for map_item in mapping: if isinstance(map_item, dict): interface_field = map_item.get("interfaceField", "Unknown") field = map_item.get("field", "Unknown") result += f" - {interface_field} → {field}\\n" else: # If interface is a simple string result += f" {i+1}. {interface}\\n" else: # Handle case where interfaces is not a list result += f" {interfaces}\\n" return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server