Skip to main content
Glama

mcp-snowflake-server-no-write-system-prompt

db_client.py3.22 kB
import asyncio import logging import time import uuid from typing import Any from snowflake.snowpark import Session # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger("mcp_snowflake_server") class SnowflakeDB: AUTH_EXPIRATION_TIME = 1800 def __init__(self, connection_config: dict): self.connection_config = connection_config self.session = None self.insights: list[str] = [] self.auth_time = 0 self.init_task = None # To store the task reference async def _init_database(self): """Initialize connection to the Snowflake database""" try: # Create session without setting specific database and schema self.session = Session.builder.configs(self.connection_config).create() # Set initial warehouse if provided, but don't set database or schema if "warehouse" in self.connection_config: self.session.sql(f"USE WAREHOUSE {self.connection_config['warehouse'].upper()}") self.auth_time = time.time() except Exception as e: raise ValueError(f"Failed to connect to Snowflake database: {e}") def start_init_connection(self): """Start database initialization in the background""" # Create a task that runs in the background loop = asyncio.get_event_loop() self.init_task = loop.create_task(self._init_database()) return self.init_task async def execute_query(self, query: str) -> tuple[list[dict[str, Any]], str]: """Execute a SQL query and return results as a list of dictionaries""" # If init_task exists and isn't done, wait for it to complete if self.init_task and not self.init_task.done(): await self.init_task # If session doesn't exist or has expired, initialize it and wait elif not self.session or time.time() - self.auth_time > self.AUTH_EXPIRATION_TIME: await self._init_database() logger.debug(f"Executing query: {query}") try: result = self.session.sql(query).to_pandas() result_rows = result.to_dict(orient="records") data_id = str(uuid.uuid4()) return result_rows, data_id except Exception as e: logger.error(f'Database error executing "{query}": {e}') raise def add_insight(self, insight: str) -> None: """Add a new insight to the collection""" self.insights.append(insight) def get_memo(self) -> str: """Generate a formatted memo from collected insights""" if not self.insights: return "No data insights have been discovered yet." memo = "📊 Data Intelligence Memo 📊\n\n" memo += "Key Insights Discovered:\n\n" memo += "\n".join(f"- {insight}" for insight in self.insights) if len(self.insights) > 1: memo += f"\n\nSummary:\nAnalysis has revealed {len(self.insights)} key data insights that suggest opportunities for strategic optimization and growth." return memo

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sgriffin-magnoliacap/mcp-snowflake-server-no-write-system-prompt'

If you have feedback or need assistance with the MCP directory API, please join our Discord server