Skip to main content
Glama
client.py32.8 kB
from __future__ import annotations import time from typing import Any, Dict, Optional, List from functools import lru_cache import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type class SSBError(Exception): pass class SSBClient: def __init__(self, base_url: str, session: requests.Session, timeout_seconds: int = 30, proxy_context_path: Optional[str] = None): self.base_url = base_url.rstrip("/") self.session = session self.timeout = timeout_seconds self.proxy_context_path = proxy_context_path # Add CDP proxy headers if configured if self.proxy_context_path: self.session.headers.update({'X-ProxyContextPath': self.proxy_context_path}) def _url(self, path: str) -> str: return f"{self.base_url}/{path.lstrip('/')}" @retry( retry=retry_if_exception_type((requests.HTTPError, requests.ConnectionError, requests.Timeout)), wait=wait_exponential(multiplier=0.5, min=0.5, max=5), stop=stop_after_attempt(3), reraise=True, ) def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: resp = self.session.get(self._url(path), params=params, timeout=self.timeout) if resp.status_code == 401: raise requests.HTTPError("Unauthorized", response=resp) if resp.status_code == 403: raise requests.HTTPError("Forbidden", response=resp) if not resp.ok: # Try to get detailed error message from response try: error_data = resp.json() error_message = error_data.get('error_message', f'HTTP {resp.status_code} Error') except: error_message = f'HTTP {resp.status_code} Error: {resp.text}' raise SSBError(f"{error_message} for {path}") return resp.json() @retry( retry=retry_if_exception_type((requests.HTTPError, requests.ConnectionError, requests.Timeout)), wait=wait_exponential(multiplier=0.5, min=0.5, max=5), stop=stop_after_attempt(3), reraise=True, ) def _post(self, path: str, data: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: resp = self.session.post(self._url(path), data=data, json=json_data, timeout=self.timeout) if resp.status_code == 401: raise requests.HTTPError("Unauthorized", response=resp) if resp.status_code == 403: raise requests.HTTPError("Forbidden", response=resp) if not resp.ok: # Try to get detailed error message from response try: error_data = resp.json() error_message = error_data.get('error_message', f'HTTP {resp.status_code} Error') except: error_message = f'HTTP {resp.status_code} Error: {resp.text}' raise SSBError(f"{error_message} for {path}") return resp.json() @retry( retry=retry_if_exception_type((requests.HTTPError, requests.ConnectionError, requests.Timeout)), wait=wait_exponential(multiplier=0.5, min=0.5, max=5), stop=stop_after_attempt(3), reraise=True, ) def _put(self, path: str, data: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: resp = self.session.put(self._url(path), data=data, json=json_data, timeout=self.timeout) if resp.status_code == 401: raise requests.HTTPError("Unauthorized", response=resp) if resp.status_code == 403: raise requests.HTTPError("Forbidden", response=resp) resp.raise_for_status() return resp.json() @retry( retry=retry_if_exception_type((requests.HTTPError, requests.ConnectionError, requests.Timeout)), wait=wait_exponential(multiplier=0.5, min=0.5, max=5), stop=stop_after_attempt(3), reraise=True, ) def _delete(self, path: str) -> Dict[str, Any]: resp = self.session.delete(self._url(path), timeout=self.timeout) if resp.status_code == 401: raise requests.HTTPError("Unauthorized", response=resp) if resp.status_code == 403: raise requests.HTTPError("Forbidden", response=resp) resp.raise_for_status() return resp.json() # SSB API Methods def get_ssb_info(self) -> Dict[str, Any]: """Get SSB version and system information.""" # Use jobs endpoint to get SSB information jobs = self._get("jobs") return { "status": "connected", "jobs_count": len(jobs.get("jobs", [])), "message": "SSB MCP Server connected successfully" } def list_streams(self) -> Dict[str, Any]: """List all SQL streams (jobs).""" return self._get("jobs") def get_stream(self, stream_name: str) -> Dict[str, Any]: """Get details of a specific stream (job).""" # For now, return job list and filter by name jobs = self._get("jobs") for job in jobs.get("jobs", []): if job.get("name") == stream_name: return job raise SSBError(f"Stream '{stream_name}' not found") def create_stream(self, stream_name: str, sql_query: str, description: Optional[str] = None) -> Dict[str, Any]: """Create a new SQL stream (job).""" data = { "sql": sql_query, "job_config": { "job_name": stream_name } } return self._post("jobs", json_data=data) def update_stream(self, stream_name: str, sql_query: str, description: Optional[str] = None) -> Dict[str, Any]: """Update an existing SQL stream.""" data = { "name": stream_name, "sql": sql_query, "description": description or "" } return self._put(f"streams/{stream_name}", json_data=data) def delete_stream(self, stream_name: str) -> Dict[str, Any]: """Delete a SQL stream.""" return self._delete(f"streams/{stream_name}") def start_stream(self, stream_name: str) -> Dict[str, Any]: """Start a SQL stream.""" return self._post(f"streams/{stream_name}/start") def stop_stream(self, stream_name: str) -> Dict[str, Any]: """Stop a SQL stream.""" return self._post(f"streams/{stream_name}/stop") def get_stream_status(self, stream_name: str) -> Dict[str, Any]: """Get the status of a SQL stream.""" return self._get(f"streams/{stream_name}/status") def get_stream_metrics(self, stream_name: str) -> Dict[str, Any]: """Get metrics for a SQL stream.""" return self._get(f"streams/{stream_name}/metrics") def list_tables(self) -> Dict[str, Any]: """List all available tables.""" # Use tables endpoint for SSB return self._get("tables") def get_table_schema(self, table_name: str) -> Dict[str, Any]: """Get schema for a specific table.""" # For now, return a placeholder since SSB doesn't have direct schema endpoint return {"message": f"Schema for table '{table_name}' not available via API", "table_name": table_name} def execute_query(self, sql_query: str, limit: Optional[int] = None, sample_interval: int = 1000, sample_count: int = 100, window_size: int = 100, sample_all_messages: bool = False) -> Dict[str, Any]: """Execute a SQL query.""" # Ensure SQL statement ends with semicolon sql_query = sql_query.strip() if not sql_query.endswith(';'): sql_query += ';' data = {"sql": sql_query} # Add job configuration for sampling if sample_all_messages: # For sample all messages, use very frequent sampling data["job_config"] = { "job_name": f"job_by_admin_at_{int(time.time() * 1000)}", "runtime_config": { "execution_mode": "SESSION", "parallelism": 1, "sample_interval": 0, # Sample immediately "sample_count": 10000, # High count "window_size": 10000, # Large window "start_with_savepoint": False } } elif sample_interval != 1000 or sample_count != 100 or window_size != 100: # Custom sampling configuration data["job_config"] = { "job_name": f"job_by_admin_at_{int(time.time() * 1000)}", "runtime_config": { "execution_mode": "SESSION", "parallelism": 1, "sample_interval": sample_interval, "sample_count": sample_count, "window_size": window_size, "start_with_savepoint": False } } response = self._post("sql/execute", json_data=data) # Enhance the response with more context if response.get("type") == "job": response["message"] = f"SQL query executed successfully! A new SSB job has been created." response["job_url"] = f"http://localhost:8081/#/job/{response.get('flink_job_id', 'unknown')}" response["status"] = "success" if sample_all_messages: response["sampling_mode"] = "sample_all_messages" response["sample_interval"] = 0 response["sample_count"] = 10000 response["window_size"] = 10000 else: response["status"] = "completed" return response def execute_query_with_sampling(self, sql_query: str, sample_interval: int = 1000, sample_count: int = 100, window_size: int = 100, sample_all_messages: bool = False) -> Dict[str, Any]: """Execute a SQL query with proper sampling configuration.""" # Execute the query with sampling configuration response = self.execute_query(sql_query, sample_interval=sample_interval, sample_count=sample_count, window_size=window_size, sample_all_messages=sample_all_messages) # Add sampling information to response if response.get("type") == "job": response["sampling_configured"] = True response["sample_interval"] = 0 if sample_all_messages else sample_interval response["sample_count"] = 10000 if sample_all_messages else sample_count response["window_size"] = 10000 if sample_all_messages else window_size response["sample_all_messages"] = sample_all_messages return response def restart_job_with_sampling(self, job_id: int, sql_query: str, sample_interval: int = 1000, sample_all_messages: bool = False) -> Dict[str, Any]: """Restart a job with new SQL and proper sampling configuration.""" # Ensure SQL statement ends with semicolon sql_query = sql_query.strip() if not sql_query.endswith(';'): sql_query += ';' # First try to stop the job try: stop_result = self.stop_job(job_id, savepoint=True) except Exception as e: # If stop fails, continue anyway pass # Create a new job with the same SQL response = self.execute_query_with_sampling(sql_query, sample_interval, sample_all_messages=sample_all_messages) # Add information about the restart response["restarted_from_job_id"] = job_id response["message"] = f"Job {job_id} restarted with new configuration" if sample_all_messages: response["message"] += " (sampling all messages)" return response def list_udfs(self) -> Dict[str, Any]: """List all available user-defined functions.""" # SSB doesn't have a direct UDFs endpoint, return empty list return {"udfs": []} def get_udf(self, udf_name: str) -> Dict[str, Any]: """Get details of a specific UDF.""" return {"message": f"UDF '{udf_name}' not found", "udf_name": udf_name} def list_connectors(self) -> Dict[str, Any]: """List all available connectors.""" return self._get("ddl/connectors") def get_connector(self, connector_name: str) -> Dict[str, Any]: """Get details of a specific connector.""" # For now, return a placeholder since we need to filter from the list connectors = self._get("ddl/connectors") for connector in connectors: if connector.get("type") == connector_name: return connector return {"message": f"Connector '{connector_name}' not found", "connector_name": connector_name} def list_topics(self) -> Dict[str, Any]: """List all Kafka topics.""" # SSB doesn't directly manage Kafka topics, return empty list return {"topics": [], "message": "Kafka topics are managed by the Kafka service, not SSB"} def get_topic(self, topic_name: str) -> Dict[str, Any]: """Get details of a specific Kafka topic.""" return {"message": f"Topic '{topic_name}' details not available via SSB API", "topic_name": topic_name} def get_cluster_info(self) -> Dict[str, Any]: """Get cluster information.""" return self._get("cluster/info") def get_job_status(self, job_id: int) -> Dict[str, Any]: """Get status of a specific job.""" jobs = self._get("jobs") for job in jobs.get("jobs", []): if job.get("job_id") == job_id: return job return {"message": f"Job {job_id} not found", "job_id": job_id} def get_job_sample(self, sample_id: str) -> Dict[str, Any]: """Get sample data from a job execution.""" try: response = self._get(f"samples/{sample_id}") # Add helpful context to the response if response.get("records"): response["message"] = f"Retrieved {len(response['records'])} sample records" else: response["message"] = "No sample data available yet. The job may still be processing or hasn't produced data." return response except Exception as e: return { "error": str(e), "message": f"Failed to retrieve sample data for {sample_id}", "sample_id": sample_id } def get_job_sample_by_id(self, job_id: int) -> Dict[str, Any]: """Get sample data from a job by job ID.""" job = self.get_job_status(job_id) if "sample_id" in job: return self.get_job_sample(job["sample_id"]) else: return {"message": f"No sample data available for job {job_id}", "job_id": job_id} def get_cluster_health(self) -> Dict[str, Any]: """Get cluster health status.""" return self._get("cluster/health") def stop_job(self, job_id: int, savepoint: bool = True) -> Dict[str, Any]: """Stop a specific SSB job.""" data = {"savepoint": savepoint} return self._post(f"jobs/{job_id}/stop", json_data=data) def execute_job(self, job_id: int, sql_query: str) -> Dict[str, Any]: """Execute/restart a specific SSB job with new SQL.""" # Ensure SQL statement ends with semicolon sql_query = sql_query.strip() if not sql_query.endswith(';'): sql_query += ';' data = {"sql": sql_query} return self._post(f"jobs/{job_id}/execute", json_data=data) def configure_sampling(self, sample_id: str, sample_interval: int = 1000, sample_count: int = 100, window_size: int = 100, sample_all_messages: bool = False) -> Dict[str, Any]: """Configure sampling parameters for a job.""" data = { "position": "latest", # Start from latest data "sample_interval": sample_interval, "sample_count": sample_count, "window_size": window_size } # If sampling all messages, set interval to 0 and high count if sample_all_messages: data["sample_interval"] = 0 data["sample_count"] = 10000 # High count to capture all messages data["window_size"] = 10000 return self._post(f"samples/{sample_id}/configure", json_data=data) def list_jobs_with_samples(self) -> Dict[str, Any]: """List all jobs with their sample information.""" jobs = self._get("jobs") job_list = [] for job in jobs.get("jobs", []): job_info = { "job_id": job.get("job_id"), "name": job.get("name"), "state": job.get("state"), "sample_id": job.get("sample_id"), "created_at": job.get("created_at"), "flink_job_id": job.get("flink_job_id") } # Try to get sample data for each job if job.get("sample_id"): try: sample_data = self.get_job_sample(job["sample_id"]) job_info["sample_records_count"] = len(sample_data.get("records", [])) job_info["sample_status"] = sample_data.get("job_status", "unknown") except: job_info["sample_records_count"] = 0 job_info["sample_status"] = "error" else: job_info["sample_records_count"] = 0 job_info["sample_status"] = "no_sample_id" job_list.append(job_info) return { "jobs": job_list, "total_jobs": len(job_list), "running_jobs": len([j for j in job_list if j["state"] == "RUNNING"]), "message": f"Found {len(job_list)} jobs with sample information" } def create_kafka_table(self, table_name: str, topic: str, kafka_connector_type: str = "local-kafka", bootstrap_servers: str = "localhost:9092", format_type: str = "json", scan_startup_mode: str = "latest-offset", additional_properties: Optional[Dict[str, str]] = None) -> Dict[str, Any]: """Create a new table that only uses local-kafka connector.""" # Enforce that only local-kafka connector is used if kafka_connector_type != "local-kafka": raise SSBError(f"Only 'local-kafka' connector is allowed for virtual tables. Provided: '{kafka_connector_type}'") # Build properties for the data source properties = { "connector": kafka_connector_type, "topic": topic, "properties.bootstrap.servers": bootstrap_servers, "scan.startup.mode": scan_startup_mode } # Add format configuration if format_type.lower() == "json": properties["format"] = "json" elif format_type.lower() == "csv": properties["format"] = "csv" elif format_type.lower() == "avro": properties["format"] = "avro" else: properties["format"] = format_type # Add any additional properties if additional_properties: properties.update(additional_properties) # Create the data source data_source = { "name": table_name, "type": kafka_connector_type, "properties": properties } try: response = self._post("data-sources", json_data=data_source) response["message"] = f"Kafka table '{table_name}' created successfully with connector '{kafka_connector_type}'" response["kafka_topic"] = topic response["bootstrap_servers"] = bootstrap_servers response["format"] = format_type response["connector_type"] = kafka_connector_type return response except Exception as e: raise SSBError(f"Failed to create Kafka table '{table_name}': {str(e)}") def validate_kafka_connector(self, kafka_connector_type: str) -> Dict[str, Any]: """Validate that a connector type is the local-kafka connector.""" allowed_connector = "local-kafka" if kafka_connector_type != allowed_connector: return { "valid": False, "message": f"Only '{allowed_connector}' connector is allowed for virtual tables. Provided: '{kafka_connector_type}'", "allowed_connector": allowed_connector } # Get connector details try: connectors = self._get("ddl/connectors") kafka_connector = None for connector in connectors: if connector.get("type") == kafka_connector_type: kafka_connector = connector break if kafka_connector: return { "valid": True, "message": f"Connector '{kafka_connector_type}' is a valid Kafka connector", "connector_type": kafka_connector_type, "properties": kafka_connector.get("properties", []), "supported_formats": kafka_connector.get("supported_formats", []) } else: return { "valid": False, "message": f"Kafka connector '{kafka_connector_type}' not found in available connectors" } except Exception as e: return { "valid": False, "message": f"Error validating connector: {str(e)}" } def register_kafka_table(self, table_name: str, topic: str, schema_fields: Optional[List[Dict[str, str]]] = None, use_ssb_prefix: bool = True, catalog: str = "ssb", database: str = "ssb_default") -> Dict[str, Any]: """Register a Kafka table in the Flink catalog using DDL with the specified template.""" # Check if the requested catalog exists, fallback to default_catalog if not try: catalogs_result = self._post("sql/execute", json_data={"sql": "SHOW CATALOGS;"}) available_catalogs = [] if catalogs_result.get("table_data"): available_catalogs = [cat.get("catalog name", "") for cat in catalogs_result["table_data"]["data"]] if catalog not in available_catalogs: original_catalog = catalog catalog = "default_catalog" print(f"Warning: Catalog '{original_catalog}' not available, using '{catalog}' instead") except: # If we can't check catalogs, use default_catalog as fallback if catalog == "ssb": catalog = "default_catalog" # Add ssb_ prefix if requested and not already present if use_ssb_prefix and not table_name.startswith('ssb_'): full_table_name = f"ssb_{table_name}" else: full_table_name = table_name # Default schema for NVDA-like data if not provided if schema_fields is None: schema_fields = [ {"name": "___open", "type": "VARCHAR(2147483647)"}, {"name": "___high", "type": "VARCHAR(2147483647)"}, {"name": "___low", "type": "VARCHAR(2147483647)"}, {"name": "___close", "type": "VARCHAR(2147483647)"}, {"name": "___volume", "type": "VARCHAR(2147483647)"}, {"name": "eventTimestamp", "type": "TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'"} ] # Build the DDL statement using the specified template columns = [] for field in schema_fields: field_name = field.get("name", "unknown") field_type = field.get("type", "VARCHAR(2147483647)") columns.append(f" `{field_name}` {field_type}") # Add watermark for eventTimestamp if it exists watermark_clause = "" has_event_timestamp = any(field.get("name") == "eventTimestamp" for field in schema_fields) if has_event_timestamp: watermark_clause = ",\n WATERMARK FOR `eventTimestamp` AS `eventTimestamp` - INTERVAL '3' SECOND" ddl_sql = f"""CREATE TABLE `{catalog}`.`{database}`.`{full_table_name}` ( {',\n'.join(columns)}{watermark_clause} ) WITH ( 'scan.startup.mode' = 'earliest-offset', 'properties.request.timeout.ms' = '120000', 'properties.auto.offset.reset' = 'earliest', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = '{topic}' );""" try: # Execute the DDL response = self._post("sql/execute", json_data={"sql": ddl_sql}) # Check if table is now available by switching to the target database try: # Switch to target database self._post("sql/execute", json_data={"sql": f"USE {catalog}.{database};"}) # Check tables in target database available_tables = self._post("sql/execute", json_data={"sql": "SHOW TABLES;"}) table_available = False if available_tables.get("table_data"): for table in available_tables["table_data"]["data"]: if table.get("table name") == full_table_name: table_available = True break # Switch back to default_database self._post("sql/execute", json_data={"sql": "USE default_catalog.default_database;"}) except: table_available = False response["message"] = f"Table '{full_table_name}' registered in Flink catalog successfully using template" response["table_name"] = full_table_name response["original_name"] = table_name response["topic"] = topic response["connector"] = "kafka" response["catalog"] = catalog response["database"] = database response["full_namespace"] = f"{catalog}.{database}" response["available_for_querying"] = table_available response["ddl_used"] = ddl_sql response["ssb_prefix_applied"] = use_ssb_prefix and not table_name.startswith('ssb_') response["template_used"] = "ssb.ssb_default template with watermark and kafka connector" return response except Exception as e: raise SSBError(f"Failed to register table '{full_table_name}' in Flink catalog: {str(e)}") # ============================================================================ # HIGH-PRIORITY ADDITIONS - ADVANCED JOB MANAGEMENT # ============================================================================ def get_job_events(self, job_id: int) -> Dict[str, Any]: """Get detailed job event history and timeline.""" return self._get(f"jobs/{job_id}/events") def get_job_state(self, job_id: int) -> Dict[str, Any]: """Get comprehensive job state information.""" return self._get(f"jobs/{job_id}/state") def get_job_mv_endpoints(self, job_id: int) -> Dict[str, Any]: """Get materialized view endpoints for a job.""" return self._get(f"jobs/{job_id}/mv") def create_job_mv_endpoint(self, job_id: int, mv_config: Dict[str, Any]) -> Dict[str, Any]: """Create or update a materialized view endpoint for a job.""" return self._post(f"jobs/{job_id}/mv", json_data=mv_config) def copy_job(self, job_id: int) -> Dict[str, Any]: """Duplicate an existing job.""" return self._post(f"jobs/{job_id}/copy") def copy_data_source(self, data_source_id: str) -> Dict[str, Any]: """Clone a data source.""" return self._post(f"data-sources/{data_source_id}/copy") # ============================================================================ # HIGH-PRIORITY ADDITIONS - MONITORING & DIAGNOSTICS # ============================================================================ def get_diagnostic_counters(self) -> Dict[str, Any]: """Get system performance counters and diagnostics.""" return self._get("diag/counters") def get_heartbeat(self) -> Dict[str, Any]: """Check system health and connectivity.""" return self._get("heartbeat") def analyze_sql(self, sql_query: str) -> Dict[str, Any]: """Analyze SQL query without execution (syntax, performance analysis).""" return self._post("sql/analyze", json_data={"sql": sql_query}) # ============================================================================ # HIGH-PRIORITY ADDITIONS - ENHANCED TABLE MANAGEMENT # ============================================================================ def list_tables_detailed(self) -> Dict[str, Any]: """Get comprehensive table information.""" result = self._get("tables") # Handle both list and dict responses if isinstance(result, list): return {"tables": result} return result def get_table_tree(self) -> Dict[str, Any]: """Get hierarchical table structure organized by catalog.""" return self._get("tables/tree") def validate_data_source(self, data_source_config: Dict[str, Any]) -> Dict[str, Any]: """Validate data source configuration.""" return self._post("data-sources/validate", json_data=data_source_config) def create_table_detailed(self, table_config: Dict[str, Any]) -> Dict[str, Any]: """Create table with full configuration.""" return self._post("tables", json_data=table_config) def get_table_details(self, table_id: str) -> Dict[str, Any]: """Get detailed information about a specific table.""" return self._get(f"tables/{table_id}") # ============================================================================ # HIGH-PRIORITY ADDITIONS - CONNECTOR & FORMAT MANAGEMENT # ============================================================================ def list_data_formats(self) -> Dict[str, Any]: """List all available data formats.""" result = self._get("ddl/data-formats") # Handle both list and dict responses if isinstance(result, list): return {"dataFormats": result} return result def get_data_format_details(self, format_id: str) -> Dict[str, Any]: """Get detailed information about a specific data format.""" return self._get(f"ddl/data-formats/{format_id}") def create_data_format(self, format_config: Dict[str, Any]) -> Dict[str, Any]: """Create a new data format.""" return self._post("ddl/data-formats", json_data=format_config) def get_connector_jar(self, connector_type: str) -> Dict[str, Any]: """Get connector JAR information.""" return self._get(f"ddl/connectors/jar/{connector_type}") def get_connector_type_details(self, connector_type: str) -> Dict[str, Any]: """Get detailed connector type information.""" return self._get(f"ddl/connectors/type/{connector_type}") def get_connector_details(self, connector_id: str) -> Dict[str, Any]: """Get detailed connector information.""" return self._get(f"ddl/connectors/{connector_id}") # ============================================================================ # HIGH-PRIORITY ADDITIONS - USER & PROJECT MANAGEMENT # ============================================================================ def get_user_settings(self) -> Dict[str, Any]: """Get user preferences and settings.""" return self._get("user/settings") def update_user_settings(self, settings: Dict[str, Any]) -> Dict[str, Any]: """Update user configuration.""" return self._put("user/settings", json_data=settings) def list_projects(self) -> Dict[str, Any]: """List available projects.""" result = self._get("projects") # Handle both list and dict responses if isinstance(result, list): return {"projects": result} return result def get_project_details(self, project_id: str) -> Dict[str, Any]: """Get project information.""" return self._get(f"projects/{project_id}") def create_project(self, project_config: Dict[str, Any]) -> Dict[str, Any]: """Create a new project.""" return self._post("projects", json_data=project_config) def get_user_info(self) -> Dict[str, Any]: """Get current user information.""" return self._get("user") # ============================================================================ # HIGH-PRIORITY ADDITIONS - API KEY MANAGEMENT # ============================================================================ def list_api_keys(self) -> Dict[str, Any]: """List user API keys.""" return self._get("api-keys") def create_api_key(self, key_config: Dict[str, Any]) -> Dict[str, Any]: """Create new API key.""" return self._post("api-keys", json_data=key_config) def delete_api_key(self, key_id: str) -> Dict[str, Any]: """Delete API key.""" return self._delete(f"api-keys/{key_id}") def get_api_key_details(self, key_id: str) -> Dict[str, Any]: """Get API key information.""" return self._get(f"api-keys/{key_id}") # ============================================================================ # HIGH-PRIORITY ADDITIONS - ENVIRONMENT MANAGEMENT # ============================================================================ def list_environments(self) -> Dict[str, Any]: """List available environments.""" result = self._get("environments") # Handle both list and dict responses if isinstance(result, list): return {"environments": result} return result def activate_environment(self, env_id: str) -> Dict[str, Any]: """Activate/switch to an environment.""" return self._post(f"environments/{env_id}/activate") def get_environment_details(self, env_id: str) -> Dict[str, Any]: """Get environment configuration.""" return self._get(f"environments/{env_id}") def create_environment(self, env_config: Dict[str, Any]) -> Dict[str, Any]: """Create new environment.""" return self._post("environments", json_data=env_config) def deactivate_environment(self) -> Dict[str, Any]: """Deactivate current environment.""" return self._post("environments/deactivate") # ============================================================================ # HIGH-PRIORITY ADDITIONS - SYNC & CONFIGURATION # ============================================================================ def get_sync_config(self) -> Dict[str, Any]: """Get sync configuration.""" return self._get("sync/config") def update_sync_config(self, config: Dict[str, Any]) -> Dict[str, Any]: """Update sync configuration.""" return self._post("sync/config", json_data=config) def delete_sync_config(self) -> Dict[str, Any]: """Delete sync configuration.""" return self._delete("sync/config") def validate_sync_config(self, project: str) -> Dict[str, Any]: """Validate sync configuration for a project.""" return self._post(f"sync/config/validate/{project}") def export_project(self, project: str) -> Dict[str, Any]: """Export project configuration.""" return self._get(f"sync/git/export/{project}") def import_project(self, project: str, config: Dict[str, Any]) -> Dict[str, Any]: """Import project configuration.""" return self._post(f"sync/git/import/{project}", json_data=config) # ============================================================================ # HIGH-PRIORITY ADDITIONS - UDF MANAGEMENT # ============================================================================ def list_udfs_detailed(self) -> Dict[str, Any]: """Get comprehensive UDF information.""" result = self._get("udfs") # Handle both list and dict responses if isinstance(result, list): return {"udfs": result} return result def run_udf(self, udf_id: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute UDF function.""" return self._post("udfs/run", json_data={"udfId": udf_id, "parameters": parameters}) def get_udf_artifacts(self) -> Dict[str, Any]: """Get UDF artifacts and dependencies.""" return self._get("udfs/artifact") def create_udf(self, udf_config: Dict[str, Any]) -> Dict[str, Any]: """Create custom UDF.""" return self._post("udfs", json_data=udf_config) def update_udf(self, udf_id: str, udf_config: Dict[str, Any]) -> Dict[str, Any]: """Update UDF configuration.""" return self._put(f"udfs/{udf_id}", json_data=udf_config) def get_udf_details(self, udf_id: str) -> Dict[str, Any]: """Get detailed UDF information.""" return self._get(f"udfs/{udf_id}") def get_udf_artifact_details(self, artifact_id: str) -> Dict[str, Any]: """Get UDF artifact details.""" return self._get(f"udfs/artifact/{artifact_id}") def get_udf_artifact_by_type(self, artifact_type: str) -> Dict[str, Any]: """Get UDF artifacts by type.""" return self._get(f"udfs/artifact/type/{artifact_type}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrooksIan/SSB-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server