"""
MCP Resources Service
Provides PostgreSQL database information as MCP Resources (static data)
"""
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
from urllib.parse import urlparse
from sqlalchemy import create_engine
from postgres_integration import PostgreSQLIntegration
logger = logging.getLogger(__name__)
class MCPResourcesService:
"""Service that exposes PostgreSQL data as MCP Resources"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.db_integrations: Dict[str, PostgreSQLIntegration] = {}
self._load_resource_definitions()
def _load_resource_definitions(self):
"""Load MCP resource definitions from configuration"""
try:
resources_file = Path(__file__).parent.parent / "resources" / "lists" / "mcp_resources_and_tools.json"
with open(resources_file, 'r') as f:
definitions = json.load(f)
self.resource_definitions = definitions.get("mcp_resources", {})
except Exception as e:
logger.warning(f"Could not load resource definitions: {e}")
self.resource_definitions = {}
def get_database_integration(self, database: str = "db3") -> PostgreSQLIntegration:
"""Get or create PostgreSQL integration for a database"""
if database not in self.db_integrations:
# Try to import config, fall back to default if not available
try:
import sys
from pathlib import Path
# Add the parent directory to sys.path temporarily
parent_dir = str(Path(__file__).parent.parent)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from config import Config
connection_string = Config.SQLALCHEMY_BINDS.get(database)
except ImportError:
# Fallback to default connection strings
connection_strings = {
'db1': 'postgresql://admin:password@192.168.230.101/defaultdb?connect_timeout=1',
'db2': 'postgresql://admin:password@192.168.230.102/defaultdb?connect_timeout=1',
'db3': 'postgresql://postgres:postgres@localhost/postgres?connect_timeout=1'
}
connection_string = connection_strings.get(database)
if not connection_string:
raise ValueError(f"Database '{database}' not configured")
engine = create_engine(connection_string)
self.db_integrations[database] = PostgreSQLIntegration(engine, database)
return self.db_integrations[database]
def list_resources(self) -> List[Dict[str, Any]]:
"""List all available MCP resources"""
resources = []
# Database-level resources (static URIs)
for resource in self.resource_definitions.get("database_info", []):
resources.append({
"uri": resource["uri"],
"name": resource["name"],
"description": resource["description"],
"mimeType": resource["mimeType"]
})
# Table-level resources (require enumeration)
try:
# Get tables from default database to show available table resources
integration = self.get_database_integration("db3")
tables = integration.get_all_table_names()
for table_name in tables:
for resource_template in self.resource_definitions.get("table_info", []):
uri = resource_template["uri"].format(table_name=table_name)
resources.append({
"uri": uri,
"name": f"{resource_template['name']} - {table_name}",
"description": f"{resource_template['description']} for table '{table_name}'",
"mimeType": resource_template["mimeType"]
})
except Exception as e:
logger.warning(f"Could not enumerate table resources: {e}")
return resources
async def read_resource(self, uri: str) -> Dict[str, Any]:
"""Read a specific MCP resource by URI"""
try:
parsed = urlparse(uri)
scheme = parsed.scheme
# Handle database:// URIs where the resource name is the hostname
if scheme == "database":
# For database://tables, netloc is 'tables'
resource_name = parsed.netloc if parsed.netloc else parsed.path.lstrip('/')
return await self._read_database_resource(resource_name)
elif scheme == "table":
# For table://tablename/schema, netloc is 'tablename', path is '/schema'
table_name = parsed.netloc
resource_type = parsed.path.lstrip('/')
path = f"{table_name}/{resource_type}"
return await self._read_table_resource(path)
else:
raise ValueError(f"Unknown resource scheme: {scheme}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
return {
"uri": str(uri),
"mimeType": "application/json",
"text": json.dumps({
"error": str(e),
"uri": str(uri)
}, default=str)
}
async def _read_database_resource(self, path: str) -> Dict[str, Any]:
"""Read database-level resources"""
database = "db3" # Default database, could be parameterized
if path == "tables":
integration = self.get_database_integration(database)
tables = integration.get_all_table_names()
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"tables": tables,
"count": len(tables),
"generated_at": self._get_timestamp()
}, indent=2)
}
elif path == "schema":
integration = self.get_database_integration(database)
schema = integration.get_database_schema()
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"schema": schema,
"generated_at": self._get_timestamp()
}, indent=2)
}
elif path == "stats":
integration = self.get_database_integration(database)
stats = integration.get_database_stats()
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"statistics": stats,
"generated_at": self._get_timestamp()
}, indent=2)
}
elif path == "semantics":
integration = self.get_database_integration(database)
semantics = integration._discover_table_semantics()
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"semantics": semantics,
"generated_at": self._get_timestamp()
}, indent=2)
}
elif path == "config":
try:
import sys
from pathlib import Path
# Add the parent directory to sys.path temporarily
parent_dir = str(Path(__file__).parent.parent)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from config import Config
databases = list(Config.SQLALCHEMY_BINDS.keys())
except ImportError:
# Fallback to default database configuration
databases = ['db1', 'db2', 'db3']
return {
"mimeType": "application/json",
"text": json.dumps({
"available_databases": databases,
"default_database": "db3",
"generated_at": self._get_timestamp()
}, indent=2)
}
elif path == "pgconfig":
integration = self.get_database_integration(database)
config = integration.get_postgresql_config()
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"postgresql_configuration": config,
"generated_at": self._get_timestamp()
}, indent=2)
}
else:
raise ValueError(f"Unknown database resource: {path}")
async def _read_table_resource(self, path: str) -> Dict[str, Any]:
"""Read table-level resources"""
database = "db3" # Default database
# Parse table resource path: {table_name}/schema, {table_name}/count, etc.
parts = path.split('/')
if len(parts) < 2:
raise ValueError(f"Invalid table resource path: {path}")
table_name = parts[0]
resource_type = parts[1]
integration = self.get_database_integration(database)
if resource_type == "schema":
schema = integration.get_table_schema(table_name)
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"table_name": table_name,
"schema": schema,
"generated_at": self._get_timestamp()
}, indent=2)
}
elif resource_type == "count":
count = integration.get_table_row_count(table_name)
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"table_name": table_name,
"row_count": count,
"generated_at": self._get_timestamp()
}, indent=2)
}
elif resource_type == "size":
size_info = integration.get_table_size(table_name)
return {
"mimeType": "application/json",
"text": json.dumps({
"database": database,
"table_name": table_name,
"size_info": size_info,
"generated_at": self._get_timestamp()
}, indent=2)
}
else:
raise ValueError(f"Unknown table resource type: {resource_type}")
def _get_timestamp(self) -> str:
"""Get current timestamp for resource metadata"""
from datetime import datetime
return datetime.now().isoformat()
def get_resource_info(self) -> Dict[str, Any]:
"""Get information about available resources"""
return {
"database_resources": [
"database://tables - List of all database tables",
"database://schema - Complete database schema",
"database://stats - Database performance statistics",
"database://semantics - Table relationships and context",
"database://config - Available database connections",
"database://pgconfig - PostgreSQL server configuration settings"
],
"table_resources": [
"table://{table_name}/schema - Table structure and columns",
"table://{table_name}/count - Current row count",
"table://{table_name}/size - Storage size information"
],
"resource_categories": self.resource_definitions
}