Skip to main content
Glama
chayasin
by chayasin
database.py4.8 kB
""" Database connection management for MCP PostgreSQL server. """ import os import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2 import pool from typing import List, Dict, Any, Optional from dotenv import load_dotenv # Load environment variables load_dotenv() class DatabaseManager: """Manages PostgreSQL database connections and queries.""" def __init__(self): """Initialize database connection pool.""" self.connection_pool = psycopg2.pool.SimpleConnectionPool( 1, # minconn 10, # maxconn host=os.getenv('DB_HOST', 'localhost'), port=os.getenv('DB_PORT', '5432'), database=os.getenv('DB_NAME', 'mcp_database'), user=os.getenv('DB_USER', 'postgres'), password=os.getenv('DB_PASSWORD', 'postgres') ) def get_connection(self): """Get a connection from the pool.""" return self.connection_pool.getconn() def release_connection(self, conn): """Release a connection back to the pool.""" self.connection_pool.putconn(conn) def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """ Execute a SQL query and return results as a list of dictionaries. Args: query: SQL query string params: Optional query parameters for parameterized queries Returns: List of dictionaries containing query results """ conn = None try: conn = self.get_connection() with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) # For SELECT queries, fetch results if cursor.description: results = cursor.fetchall() return [dict(row) for row in results] # For INSERT/UPDATE/DELETE, commit and return affected rows conn.commit() return [{"affected_rows": cursor.rowcount}] except Exception as e: if conn: conn.rollback() raise Exception(f"Database error: {str(e)}") finally: if conn: self.release_connection(conn) def list_schemas(self) -> List[Dict[str, Any]]: """List all non-system schemas in the database.""" query = """ SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast') ORDER BY schema_name; """ return self.execute_query(query) def list_tables(self, schema: Optional[str] = None) -> List[Dict[str, Any]]: """ List all tables, optionally filtered by schema. Args: schema: Optional schema name to filter tables Returns: List of dictionaries with table information """ if schema: query = """ SELECT table_schema, table_name, table_type FROM information_schema.tables WHERE table_schema = %s ORDER BY table_schema, table_name; """ return self.execute_query(query, (schema,)) else: query = """ SELECT table_schema, table_name, table_type FROM information_schema.tables WHERE table_schema NOT IN ('pg_catalog', 'information_schema') ORDER BY table_schema, table_name; """ return self.execute_query(query) def describe_table(self, table_name: str, schema: str = 'public') -> List[Dict[str, Any]]: """ Get detailed information about table columns. Args: table_name: Name of the table schema: Schema name (default: 'public') Returns: List of dictionaries with column information """ query = """ SELECT column_name, data_type, character_maximum_length, is_nullable, column_default FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position; """ return self.execute_query(query, (schema, table_name)) def close(self): """Close all connections in the pool.""" if self.connection_pool: self.connection_pool.closeall()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chayasin/mcp-database'

If you have feedback or need assistance with the MCP directory API, please join our Discord server