import os
import json
import logging
from typing import Any, Dict, Optional, List
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from contextlib import contextmanager
class DatabaseService:
"""
PostgreSQL database service for managing schema and data storage.
Handles connection pooling, queries, and data management operations.
"""
def __init__(self):
"""
Initialize the database service with environment configuration.
"""
self.logger = logging.getLogger(__name__)
self.connection_pool = None
self._initialize_connection_pool()
def _get_database_config(self) -> Dict[str, Any]:
"""
Get database configuration from environment variables.
Returns:
Dictionary with database connection parameters
Raises:
ValueError: If required environment variables are missing
"""
config = {
'host': os.getenv('POSTGRES_HOST', 'localhost'),
'port': os.getenv('POSTGRES_PORT', '5432'),
'database': os.getenv('POSTGRES_DATABASE', 'jsonschema_mcp'),
'user': os.getenv('POSTGRES_USER'),
'password': os.getenv('POSTGRES_PASSWORD'),
'auto_reset': os.getenv('POSTGRES_AUTO_RESET', 'false').lower() == 'true',
'auto_create_schema': os.getenv('POSTGRES_AUTO_CREATE_SCHEMA', 'true').lower() == 'true',
}
# Validate required fields
required_fields = ['user', 'password']
missing_fields = [field for field in required_fields if not config[field]]
if missing_fields:
raise ValueError(f"Missing required environment variables: {missing_fields}")
return config
def _initialize_connection_pool(self):
"""
Initialize the PostgreSQL connection pool and setup database schema.
"""
try:
config = self._get_database_config()
# Create connection pool
self.connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
host=config['host'],
port=config['port'],
database=config['database'],
user=config['user'],
password=config['password'],
cursor_factory=RealDictCursor
)
# Test connection
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
self.logger.info(f"Database connection pool initialized successfully for {config['database']}")
# Initialize database schema
self._initialize_database_schema(config)
except Exception as e:
self.logger.warning(f"Failed to initialize database connection pool: {e}")
self.connection_pool = None
@contextmanager
def get_connection(self):
"""
Context manager for database connections.
Yields:
psycopg2.connection: Database connection from pool
Raises:
RuntimeError: If connection pool is not available
"""
if not self.connection_pool:
raise RuntimeError("Database connection pool not available")
conn = None
try:
conn = self.connection_pool.getconn()
yield conn
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
self.connection_pool.putconn(conn)
def _initialize_database_schema(self, config: Dict[str, Any]):
"""
Initialize database schema based on configuration.
Args:
config: Database configuration including auto_reset and auto_create_schema flags
"""
try:
# Check if schema initialization is needed
if config.get('auto_reset', False):
self.logger.info("Auto-reset enabled: Dropping and recreating all tables")
self._reset_database_schema()
elif config.get('auto_create_schema', True):
# Check if tables exist and create if needed
if not self._tables_exist():
self.logger.info("Tables not found: Creating database schema")
self.create_tables()
else:
self.logger.debug("Database tables already exist")
else:
self.logger.debug("Auto schema creation disabled")
except Exception as e:
self.logger.error(f"Failed to initialize database schema: {e}")
raise
def _tables_exist(self) -> bool:
"""
Check if required database tables exist.
Returns:
True if all required tables exist, False otherwise
"""
required_tables = ['schemas', 'data_storage']
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
AND table_name = ANY(%s)
""", (required_tables,))
existing_tables = [row['table_name'] for row in cursor.fetchall()]
missing_tables = set(required_tables) - set(existing_tables)
if missing_tables:
self.logger.info(f"Missing tables: {list(missing_tables)}")
return False
return True
except Exception as e:
self.logger.error(f"Failed to check table existence: {e}")
return False
def _reset_database_schema(self):
"""
Drop all tables and recreate the database schema.
WARNING: This will delete all data!
"""
drop_tables = [
"DROP TABLE IF EXISTS schemas CASCADE;",
"DROP TABLE IF EXISTS data_storage CASCADE;"
]
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
# Drop existing tables
for drop_sql in drop_tables:
cursor.execute(drop_sql)
conn.commit()
self.logger.info("Existing tables dropped successfully")
# Create new tables
self.create_tables()
self.logger.info("Database schema reset completed successfully")
except Exception as e:
self.logger.error(f"Failed to reset database schema: {e}")
raise
def create_tables(self):
"""
Create necessary database tables and indexes.
"""
create_schemas_table = """
CREATE TABLE schemas (
id SERIAL PRIMARY KEY,
collection VARCHAR(100) NOT NULL,
schema_id VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT unique_schema_collection_id UNIQUE(collection, schema_id)
);
"""
create_data_table = """
CREATE TABLE data_storage (
id SERIAL PRIMARY KEY,
collection VARCHAR(100) NOT NULL,
data_id VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT unique_data_collection_id UNIQUE(collection, data_id)
);
"""
create_update_trigger_function = """
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
"""
create_triggers = [
"""
CREATE TRIGGER update_schemas_updated_at
BEFORE UPDATE ON schemas
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
""",
"""
CREATE TRIGGER update_data_storage_updated_at
BEFORE UPDATE ON data_storage
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
"""
]
create_indexes = [
"CREATE INDEX idx_schemas_collection ON schemas(collection);",
"CREATE INDEX idx_schemas_schema_id ON schemas(schema_id);",
"CREATE INDEX idx_schemas_created_at ON schemas(created_at);",
"CREATE INDEX idx_schemas_data_gin ON schemas USING GIN(data);",
"CREATE INDEX idx_data_collection ON data_storage(collection);",
"CREATE INDEX idx_data_data_id ON data_storage(data_id);",
"CREATE INDEX idx_data_created_at ON data_storage(created_at);",
"CREATE INDEX idx_data_storage_data_gin ON data_storage USING GIN(data);"
]
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
# Create tables
cursor.execute(create_schemas_table)
cursor.execute(create_data_table)
# Create trigger function
cursor.execute(create_update_trigger_function)
# Create triggers
for trigger_sql in create_triggers:
cursor.execute(trigger_sql)
# Create indexes
for index_sql in create_indexes:
cursor.execute(index_sql)
conn.commit()
self.logger.info("Database tables, triggers, and indexes created successfully")
except Exception as e:
self.logger.error(f"Failed to create database tables: {e}")
raise
def load_data(self, collection: str, data_id: str) -> Optional[Dict[str, Any]]:
"""
Load data from the database.
Args:
collection: Collection name
data_id: Data identifier
Returns:
Dictionary with loaded data or None if not found
Raises:
RuntimeError: If database connection is not available
"""
if not self.connection_pool:
return None
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
# Try schemas table first
cursor.execute(
"SELECT data FROM schemas WHERE collection = %s AND schema_id = %s",
(collection, data_id)
)
result = cursor.fetchone()
if result:
self.logger.debug(f"Found data in schemas table: {collection}/{data_id}")
return dict(result['data'])
# Try data_storage table
cursor.execute(
"SELECT data FROM data_storage WHERE collection = %s AND data_id = %s",
(collection, data_id)
)
result = cursor.fetchone()
if result:
self.logger.debug(f"Found data in data_storage table: {collection}/{data_id}")
return dict(result['data'])
return None
except Exception as e:
self.logger.error(f"Failed to load data from database {collection}/{data_id}: {e}")
return None
def save_data(self, collection: str, data_id: str, data: Dict[str, Any], table: str = "data_storage") -> bool:
"""
Save data to the database.
Args:
collection: Collection name
data_id: Data identifier
data: Data to save
table: Table to save to ("schemas" or "data_storage")
Returns:
True if successful, False otherwise
Raises:
ValueError: If table name is invalid
"""
if table not in ["schemas", "data_storage"]:
raise ValueError(f"Invalid table name: {table}. Must be 'schemas' or 'data_storage'")
if not self.connection_pool:
self.logger.warning("Database connection not available for save operation")
return False
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
if table == "schemas":
cursor.execute("""
INSERT INTO schemas (collection, schema_id, data)
VALUES (%s, %s, %s)
ON CONFLICT (collection, schema_id)
DO UPDATE SET
data = EXCLUDED.data,
updated_at = CURRENT_TIMESTAMP
""", (collection, data_id, json.dumps(data)))
else: # data_storage
cursor.execute("""
INSERT INTO data_storage (collection, data_id, data)
VALUES (%s, %s, %s)
ON CONFLICT (collection, data_id)
DO UPDATE SET
data = EXCLUDED.data,
updated_at = CURRENT_TIMESTAMP
""", (collection, data_id, json.dumps(data)))
conn.commit()
self.logger.info(f"Successfully saved data to {table}: {collection}/{data_id}")
return True
except Exception as e:
self.logger.error(f"Failed to save data to database {table} {collection}/{data_id}: {e}")
return False
def delete_data(self, collection: str, data_id: str, table: str = "data_storage") -> bool:
"""
Delete data from the database.
Args:
collection: Collection name
data_id: Data identifier
table: Table to delete from ("schemas" or "data_storage")
Returns:
True if successful, False otherwise
"""
if table not in ["schemas", "data_storage"]:
raise ValueError(f"Invalid table name: {table}. Must be 'schemas' or 'data_storage'")
if not self.connection_pool:
self.logger.warning("Database connection not available for delete operation")
return False
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
if table == "schemas":
cursor.execute(
"DELETE FROM schemas WHERE collection = %s AND schema_id = %s",
(collection, data_id)
)
else: # data_storage
cursor.execute(
"DELETE FROM data_storage WHERE collection = %s AND data_id = %s",
(collection, data_id)
)
rows_affected = cursor.rowcount
conn.commit()
if rows_affected > 0:
self.logger.info(f"Successfully deleted data from {table}: {collection}/{data_id}")
return True
else:
self.logger.debug(f"No data found to delete in {table}: {collection}/{data_id}")
return False
except Exception as e:
self.logger.error(f"Failed to delete data from database {table} {collection}/{data_id}: {e}")
return False
def data_exists(self, collection: str, data_id: str, table: str = "data_storage") -> bool:
"""
Check if data exists in the database without loading it.
Args:
collection: Collection name
data_id: Data identifier
table: Table to check ("schemas" or "data_storage")
Returns:
True if data exists, False otherwise
"""
if table not in ["schemas", "data_storage"]:
raise ValueError(f"Invalid table name: {table}. Must be 'schemas' or 'data_storage'")
if not self.connection_pool:
self.logger.debug("Database connection not available for exists check")
return False
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
if table == "schemas":
cursor.execute(
"SELECT 1 FROM schemas WHERE collection = %s AND schema_id = %s LIMIT 1",
(collection, data_id)
)
else: # data_storage
cursor.execute(
"SELECT 1 FROM data_storage WHERE collection = %s AND data_id = %s LIMIT 1",
(collection, data_id)
)
result = cursor.fetchone()
exists = result is not None
self.logger.debug(f"Data exists check for {table} {collection}/{data_id}: {exists}")
return exists
except Exception as e:
self.logger.debug(f"Failed to check data existence in database {table} {collection}/{data_id}: {e}")
return False
def list_collections(self, table: str = "data_storage") -> List[str]:
"""
List all collections in the specified table.
Args:
table: Table to query ("schemas" or "data_storage")
Returns:
List of collection names
"""
if table not in ["schemas", "data_storage"]:
raise ValueError(f"Invalid table name: {table}. Must be 'schemas' or 'data_storage'")
if not self.connection_pool:
return []
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT DISTINCT collection FROM {table} ORDER BY collection")
results = cursor.fetchall()
return [row['collection'] for row in results]
except Exception as e:
self.logger.error(f"Failed to list collections from {table}: {e}")
return []
def list_data_ids(self, collection: str, table: str = "data_storage") -> List[str]:
"""
List all data IDs in a specific collection.
Args:
collection: Collection name
table: Table to query ("schemas" or "data_storage")
Returns:
List of data IDs
"""
if table not in ["schemas", "data_storage"]:
raise ValueError(f"Invalid table name: {table}. Must be 'schemas' or 'data_storage'")
if not self.connection_pool:
return []
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
if table == "schemas":
cursor.execute(
"SELECT schema_id FROM schemas WHERE collection = %s ORDER BY schema_id",
(collection,)
)
results = cursor.fetchall()
return [row['schema_id'] for row in results]
else: # data_storage
cursor.execute(
"SELECT data_id FROM data_storage WHERE collection = %s ORDER BY data_id",
(collection,)
)
results = cursor.fetchall()
return [row['data_id'] for row in results]
except Exception as e:
self.logger.error(f"Failed to list data IDs from {table}/{collection}: {e}")
return []
def get_database_info(self) -> Dict[str, Any]:
"""
Get information about the database connection and tables.
Returns:
Dictionary with database information
"""
info = {
"connected": bool(self.connection_pool),
"tables": {},
"error": None
}
if not self.connection_pool:
info["error"] = "Database connection not available"
return info
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
# Get table counts
for table in ["schemas", "data_storage"]:
cursor.execute(f"SELECT COUNT(*) as count FROM {table}")
result = cursor.fetchone()
info["tables"][table] = {
"total_records": result['count'] if result else 0
}
# Get collection counts
cursor.execute(f"SELECT collection, COUNT(*) as count FROM {table} GROUP BY collection")
collections = cursor.fetchall()
info["tables"][table]["collections"] = {
row['collection']: row['count'] for row in collections
}
except Exception as e:
info["error"] = str(e)
self.logger.error(f"Failed to get database info: {e}")
return info
def close(self):
"""
Close the database connection pool.
"""
if self.connection_pool:
self.connection_pool.closeall()
self.connection_pool = None
self.logger.info("Database connection pool closed")