Skip to main content
Glama
jaredlang
by jaredlang
connection.py4.14 kB
""" Cloud SQL connection management. Provides secure connections to Google Cloud SQL PostgreSQL using the Cloud SQL Python Connector with IAM authentication support. """ import os from dotenv import load_dotenv from typing import Optional from google.cloud.sql.connector import Connector import sqlalchemy from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine load_dotenv() # Cloud SQL configuration from environment variables PROJECT_ID = os.getenv("GCP_PROJECT_ID") REGION = os.getenv("CLOUD_SQL_REGION", "us-central1") INSTANCE_NAME = os.getenv("CLOUD_SQL_INSTANCE", "weather-forecasts") DB_NAME = os.getenv("CLOUD_SQL_DB", "weather") DB_USER = os.getenv("CLOUD_SQL_USER", "postgres") DB_PASSWORD = os.getenv("CLOUD_SQL_PASSWORD") # Build instance connection name if PROJECT_ID and REGION and INSTANCE_NAME: INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{INSTANCE_NAME}" else: INSTANCE_CONNECTION_NAME = None # Global connector instance (initialized on first use) _connector: Optional[Connector] = None def get_connector() -> Connector: """ Get or create the global Cloud SQL connector instance. Returns: Connector instance """ global _connector if _connector is None: _connector = Connector() return _connector def get_connection(): """ Get a connection to Cloud SQL PostgreSQL using pg8000. Returns a raw pg8000 DB-API connection suitable for reading/writing binary data (BYTEA columns). Returns: pg8000 connection object Raises: ValueError: If required environment variables are not set Exception: If connection fails Example: >>> conn = get_connection() >>> cursor = conn.cursor() >>> cursor.execute("SELECT * FROM forecasts") >>> results = cursor.fetchall() >>> conn.close() """ if not INSTANCE_CONNECTION_NAME: raise ValueError( "Missing required Cloud SQL configuration. " "Please set GCP_PROJECT_ID, CLOUD_SQL_REGION, and CLOUD_SQL_INSTANCE " "environment variables." ) if not DB_PASSWORD: raise ValueError( "Missing CLOUD_SQL_PASSWORD environment variable. " "Please set the database password." ) connector = get_connector() try: conn = connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASSWORD, db=DB_NAME ) return conn except Exception as e: raise Exception( f"Failed to connect to Cloud SQL instance {INSTANCE_CONNECTION_NAME}: {e}" ) def close_connector(): """ Close the global connector and cleanup resources. Should be called when shutting down the application. """ global _connector if _connector: _connector.close() _connector = None def test_connection() -> dict: """ Test the database connection and return status. Returns: Dictionary with connection status and details """ try: conn = get_connection() cursor = conn.cursor() # Test query cursor.execute("SELECT version()") result = cursor.fetchone() version = result[0] if result else "Unknown" # Check if forecasts table exists cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'forecasts' ) """) table_exists = cursor.fetchone()[0] cursor.close() conn.close() return { "status": "success", "connected": True, "instance": INSTANCE_CONNECTION_NAME, "database": DB_NAME, "version": version, "forecasts_table_exists": table_exists } except Exception as e: return { "status": "error", "connected": False, "error": str(e), "instance": INSTANCE_CONNECTION_NAME or "Not configured" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jaredlang/forecast_storage_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server