Skip to main content
Glama
setup_sample_data.py10.7 kB
#!/usr/bin/env python3 """ Setup script for the DeFi DEX trading sample dataset. This script installs the sample dataset structure and catalog metadata to demonstrate igloo-mcp capabilities with real-world data patterns. """ import json import logging import sys from pathlib import Path from typing import Dict # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from igloo_mcp.config import get_config from igloo_mcp.snow_cli import SnowCLI, SnowCLIError def setup_logging(): """Configure logging for setup script.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) return logging.getLogger(__name__) def load_ddl_files(ddl_dir: Path) -> Dict[str, str]: """Load all SQL DDL files from the ddl directory.""" ddl_files = {} sql_files = [ "dex_trades_stable.sql", "coin_info.sql", "btc_dex_trades_usd_dt.sql", "filtered_dex_trades_view.sql", ] for sql_file in sql_files: file_path = ddl_dir / sql_file if file_path.exists(): with open(file_path, "r") as f: ddl_files[sql_file] = f.read() else: logging.warning(f"DDL file not found: {file_path}") return ddl_files def create_database_structure(snow_cli: SnowCLI, logger: logging.Logger) -> bool: """Create the sample database and schema structure.""" try: logger.info("Creating sample database structure...") # Create database and schemas queries = [ "CREATE DATABASE IF NOT EXISTS DEFI_SAMPLE_DB", "CREATE SCHEMA IF NOT EXISTS DEFI_SAMPLE_DB.RAW", "CREATE SCHEMA IF NOT EXISTS DEFI_SAMPLE_DB.PROCESSED", "CREATE SCHEMA IF NOT EXISTS DEFI_SAMPLE_DB.ANALYTICS", "USE DATABASE DEFI_SAMPLE_DB", ] for query in queries: logger.info(f"Executing: {query}") result = snow_cli.run_query(query, output_format="table") if result.returncode != 0: logger.error(f"Failed to execute: {query}") logger.error(f"Error: {result.raw_stderr}") return False logger.info("✅ Database structure created successfully") return True except SnowCLIError as e: logger.error(f"Failed to create database structure: {e}") return False def create_sample_tables( snow_cli: SnowCLI, ddl_files: Dict[str, str], logger: logging.Logger ) -> bool: """Create sample tables using the DDL files.""" try: logger.info("Creating sample tables...") # Modify DDL to use sample database for filename, ddl in ddl_files.items(): logger.info(f"Creating objects from {filename}...") # Replace references to use sample database modified_ddl = ddl.replace( "PIPELINE_V2_GROOT_DB.PIPELINE_V2_GROOT_SCHEMA", "DEFI_SAMPLE_DB.PROCESSED", ).replace("PIPELINE_V2_GROOT_DB", "DEFI_SAMPLE_DB") # Execute the DDL try: result = snow_cli.run_query(modified_ddl, output_format="table") if result.returncode != 0: logger.warning(f"DDL execution had issues: {result.raw_stderr}") # Continue anyway - some errors might be expected (missing source tables) else: logger.info(f"✅ Successfully processed {filename}") except SnowCLIError as e: logger.warning( f"Expected error for {filename} (missing source data): {e}" ) # This is expected since we don't have the raw data continue logger.info("✅ Sample table structure created") return True except Exception as e: logger.error(f"Failed to create sample tables: {e}") return False def generate_sample_catalog(snow_cli: SnowCLI, logger: logging.Logger) -> bool: """Generate a sample catalog for the created objects.""" try: logger.info("Generating sample catalog...") # Use igloo-mcp to catalog the sample database from igloo_mcp.catalog import build_catalog catalog_dir = Path(__file__).parent / "catalog" catalog_dir.mkdir(exist_ok=True) # Build catalog for our sample database totals = build_catalog( str(catalog_dir), database="DEFI_SAMPLE_DB", account_scope=False, incremental=False, output_format="jsonl", include_ddl=True, max_ddl_concurrency=4, catalog_concurrency=8, export_sql=False, ) logger.info(f"✅ Catalog generated successfully: {totals}") return True except Exception as e: logger.error(f"Failed to generate catalog: {e}") return False def create_lineage_example(snow_cli: SnowCLI, logger: logging.Logger) -> bool: """Create lineage analysis for the sample data.""" try: logger.info("Generating lineage examples...") from igloo_mcp.lineage import LineageQueryService # Set up lineage service catalog_dir = str(Path(__file__).parent / "catalog") lineage_dir = str(Path(__file__).parent / "lineage") Path(lineage_dir).mkdir(exist_ok=True) service = LineageQueryService(catalog_dir, lineage_dir) # Try to build lineage graph try: # This might fail if we don't have enough data, but that's okay result = service.object_subgraph( "DEFI_SAMPLE_DB.PROCESSED.DEX_TRADES_STABLE", direction="both", depth=2 ) logger.info( f"✅ Lineage example created with {len(result.graph.nodes)} nodes" ) except KeyError as e: logger.info(f"Lineage generation skipped (expected): {e}") # Create a simple example file instead example_lineage = { "object": "DEFI_SAMPLE_DB.PROCESSED.DEX_TRADES_STABLE", "description": "Main fact table for DEX trading data", "upstream": [ "DEFI_SAMPLE_DB.RAW.OBJECT_CHANGES", "DEFI_SAMPLE_DB.RAW.DEX_EVENTS", ], "downstream": [ "DEFI_SAMPLE_DB.ANALYTICS.FILTERED_DEX_TRADES_VIEW", "DEFI_SAMPLE_DB.ANALYTICS.BTC_DEX_TRADES_USD_DT", ], } with open(Path(lineage_dir) / "example_lineage.json", "w") as f: json.dump(example_lineage, f, indent=2) return True except Exception as e: logger.error(f"Failed to create lineage examples: {e}") return False def print_usage_instructions(logger: logging.Logger): """Print instructions for using the sample dataset.""" logger.info("\n" + "=" * 60) logger.info("🎉 DeFi Sample Dataset Setup Complete!") logger.info("=" * 60) logger.info("") logger.info("The sample dataset includes:") logger.info("• DEX_TRADES_STABLE - Main fact table (structure only)") logger.info("• COIN_INFO - Dynamic table for crypto metadata") logger.info("• FILTERED_DEX_TRADES_VIEW - Business logic view") logger.info("• BTC_DEX_TRADES_USD_DT - BTC-focused analytics") logger.info("") logger.info("Try these igloo-mcp MCP tools via your AI assistant:") logger.info("") logger.info("📊 Catalog Commands:") logger.info(' "Build a catalog for DEFI_SAMPLE_DB"') logger.info(' "Build a catalog for DEFI_SAMPLE_DB.ANALYTICS schema"') logger.info("") logger.info("🔗 Lineage Commands:") logger.info(' "Query lineage for DEX_TRADES_STABLE"') logger.info(' "Show upstream lineage for FILTERED_DEX_TRADES_VIEW"') logger.info("") logger.info("🕸️ Dependency Graph:") logger.info(' "Build dependency graph for DEFI_SAMPLE_DB"') logger.info(' "Generate dependency graph as JSON"') logger.info("") logger.info("🤖 MCP Server Examples:") logger.info(" 'Show me the schema of DEX_TRADES_STABLE'") logger.info(" 'What feeds into the BTC analytics table?'") logger.info(" 'Build a catalog for the DeFi sample database'") logger.info("") logger.info("📚 Documentation:") logger.info(" See examples/sample_data/pipeline_documentation.md") logger.info(" See examples/sample_data/README.md") logger.info("") def main(): """Main setup function.""" logger = setup_logging() logger.info("🚀 Setting up DeFi DEX Trading Sample Dataset") logger.info("=" * 50) try: # Initialize SnowCLI logger.info("Initializing Snowflake connection...") snow_cli = SnowCLI() get_config() # Initialize configuration # Test connection if not snow_cli.test_connection(): logger.error( "❌ Failed to connect to Snowflake. Please check your configuration." ) return 1 logger.info("✅ Snowflake connection successful") # Load DDL files ddl_dir = Path(__file__).parent / "ddl" ddl_files = load_ddl_files(ddl_dir) if not ddl_files: logger.error("❌ No DDL files found. Please ensure DDL files exist.") return 1 logger.info(f"✅ Loaded {len(ddl_files)} DDL files") # Create database structure if not create_database_structure(snow_cli, logger): logger.error("❌ Failed to create database structure") return 1 # Create sample tables if not create_sample_tables(snow_cli, ddl_files, logger): logger.error("❌ Failed to create sample tables") return 1 # Generate catalog if not generate_sample_catalog(snow_cli, logger): logger.warning("⚠️ Catalog generation had issues (this might be expected)") # Create lineage examples if not create_lineage_example(snow_cli, logger): logger.warning( "⚠️ Lineage example creation had issues (this might be expected)" ) # Print usage instructions print_usage_instructions(logger) return 0 except KeyboardInterrupt: logger.info("\n❌ Setup interrupted by user") return 1 except Exception as e: logger.error(f"❌ Unexpected error during setup: {e}") return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server