#!/usr/bin/env python3
"""
DP-MCP Server - Main server implementation.
This module implements a comprehensive Data Platform MCP (Model Context Protocol) server
that provides seamless integration between PostgreSQL databases and MinIO object storage.
Built with FastMCP 2.0 for modern AI applications.
The server provides 11 tools organized into three categories:
- PostgreSQL Tools: Database operations and introspection
- MinIO Tools: Object storage management
- Combined Tools: Cross-platform operations including database backup to object storage
Key Features:
- FastMCP integration with HTTP transport
- Comprehensive error handling and logging
- Type-safe parameter validation
- Production-ready configuration management
- Docker support for development and deployment
Author: DP-MCP Development Team
Version: 0.1.0
License: MIT
"""
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict
from dotenv import load_dotenv
from fastmcp import FastMCP
# Load environment variables
load_dotenv()
# Configure logging first
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Import our tools
from dp_mcp.tools.postgres_tools import (
execute_query,
list_tables,
describe_table,
export_table_to_csv,
get_db_connection,
)
from dp_mcp.tools.minio_tools import (
list_buckets,
list_objects,
upload_object,
download_object,
delete_object,
create_bucket,
get_minio_client,
)
from dp_mcp.utils.config import get_config
# Import AI tools
try:
from dp_mcp.ai.ai_tools import initialize_ai_tools, get_ai_tools
AI_AVAILABLE = True
logger.info("AI tools module loaded successfully")
except ImportError as e:
AI_AVAILABLE = False
logger.warning(f"AI tools not available: {e}")
get_ai_tools = lambda: None
# Initialize FastMCP server
mcp = FastMCP("dp-mcp")
# PostgreSQL Tools
@mcp.tool()
async def execute_sql_query(query: str, limit: int = 1000) -> str:
"""
Execute a SQL query on PostgreSQL database with automatic result formatting.
This tool allows execution of any SQL query with automatic result formatting.
SELECT queries return formatted tables, while DML/DDL queries return status messages.
A LIMIT clause is automatically added to SELECT queries if not specified.
Args:
query (str): SQL query to execute. Can be SELECT, INSERT, UPDATE, DELETE, etc.
limit (int, optional): Maximum rows to return for SELECT queries. Defaults to 1000.
Returns:
str: Formatted query results or execution status message.
Raises:
Exception: Database connection errors, SQL syntax errors, or permission issues.
Examples:
>>> await execute_sql_query("SELECT * FROM users WHERE is_active = true", 50)
"Query executed successfully. 25 rows returned:
id | username | email | ...
1 | john_doe | john@example.com | ..."
>>> await execute_sql_query("UPDATE users SET last_login = NOW() WHERE id = 1")
"Query executed successfully. 1 rows affected."
"""
return await execute_query(query, limit)
@mcp.tool()
async def list_db_tables(schema: str = "public") -> str:
"""
List all tables in a specified PostgreSQL database schema.
This tool provides a comprehensive list of all tables and views in the specified
schema, including their types (TABLE, VIEW, MATERIALIZED VIEW, etc.).
Args:
schema (str, optional): Database schema name. Defaults to "public".
Returns:
str: Formatted list of tables with their types.
Examples:
>>> await list_db_tables("public")
"Tables in schema 'public':
• users (TABLE)
• products (TABLE)
• orders (TABLE)
• user_stats (VIEW)"
"""
return await list_tables(schema)
@mcp.tool()
async def describe_db_table(table_name: str, schema: str = "public") -> str:
"""
Get detailed structure information about a database table.
This tool provides comprehensive table metadata including column names,
data types, null constraints, default values, and other structural information.
Args:
table_name (str): Name of the table to describe.
schema (str, optional): Schema containing the table. Defaults to "public".
Returns:
str: Detailed table structure information.
Examples:
>>> await describe_db_table("users", "public")
"Table: public.users
Columns:
------------------------------------------------------------
id integer NOT NULL DEFAULT
username varchar(50) NOT NULL
email varchar(100) NOT NULL
created_at timestamp NOT NULL DEFAULT"
"""
return await describe_table(table_name, schema)
@mcp.tool()
async def export_table_csv(table_name: str, limit: int = 10000, where_clause: str = None) -> str:
"""
Export table data in CSV format for analysis or backup purposes.
This tool extracts data from a PostgreSQL table and formats it as CSV with headers.
Optional WHERE clause allows for filtered exports. Large datasets are automatically
limited to prevent memory issues.
Args:
table_name (str): Name of the table to export.
limit (int, optional): Maximum rows to export. Defaults to 10000.
where_clause (str, optional): SQL WHERE clause for filtering data.
Returns:
str: CSV-formatted data with headers.
Examples:
>>> await export_table_csv("orders", 1000, "order_date >= '2025-01-01'")
"id,user_id,order_number,total_amount,status,order_date
1,1,ORD-2025-001,129.99,completed,2025-01-15 10:30:00
2,2,ORD-2025-002,89.99,shipped,2025-01-16 14:20:00"
"""
return await export_table_to_csv(table_name, limit, where_clause)
# MinIO Tools
@mcp.tool()
async def list_minio_buckets() -> str:
"""List all available MinIO buckets."""
return await list_buckets()
@mcp.tool()
async def list_bucket_objects(bucket_name: str, prefix: str = None, max_keys: int = 1000) -> str:
"""List objects in a MinIO bucket."""
return await list_objects(bucket_name, prefix, max_keys)
@mcp.tool()
async def upload_to_minio(bucket_name: str, object_name: str, data: str, content_type: str = "text/plain") -> str:
"""Upload data to MinIO object store."""
return await upload_object(bucket_name, object_name, data, content_type)
@mcp.tool()
async def download_from_minio(bucket_name: str, object_name: str) -> str:
"""Download object from MinIO."""
return await download_object(bucket_name, object_name)
@mcp.tool()
async def create_minio_bucket(bucket_name: str, region: str = None) -> str:
"""Create a new MinIO bucket."""
return await create_bucket(bucket_name, region)
@mcp.tool()
async def delete_minio_object(bucket_name: str, object_name: str) -> str:
"""Delete object from MinIO."""
return await delete_object(bucket_name, object_name)
# Combined Operations
@mcp.tool()
async def backup_table_to_minio(table_name: str, bucket_name: str = "backups",
schema: str = "public", limit: int = 10000,
where_clause: str = None) -> str:
"""Backup a PostgreSQL table to MinIO as CSV."""
try:
# Export table to CSV
csv_data = await export_table_to_csv(table_name, limit, where_clause)
# Create object name with timestamp
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
object_name = f"{schema}_{table_name}_{timestamp}.csv"
# Upload to MinIO
upload_result = await upload_object(bucket_name, object_name, csv_data, "text/csv")
return f"Table {schema}.{table_name} backed up successfully to {bucket_name}/{object_name}. {upload_result}"
except Exception as e:
logger.error(f"Error backing up table {table_name}: {e}")
return f"Error: Failed to backup table {table_name} - {str(e)}"
# AI-Enhanced Tools (available only if AI module is loaded)
@mcp.tool()
async def ask_natural_language_query(question: str, schema: str = "public", model_name: str = None) -> str:
"""
Convert natural language questions into SQL queries and execute them with AI analysis.
This tool uses AI to understand natural language questions about your data,
automatically generates appropriate SQL queries, executes them, and provides
intelligent analysis of the results. Data privacy controls ensure sensitive
information never leaves your environment.
Args:
question (str): Natural language question about your data.
schema (str, optional): Database schema to query. Defaults to "public".
model_name (str, optional): Specific AI model to use. Uses default if not specified.
Returns:
str: JSON response with generated SQL, results, and AI analysis.
Examples:
>>> await ask_natural_language_query("How many users signed up last month?")
>>> await ask_natural_language_query("What are the top selling products?")
>>> await ask_natural_language_query("Show me users with the highest activity")
"""
if not AI_AVAILABLE:
return "Error: AI features are not available. Please check AI model configuration."
try:
ai_tools = get_ai_tools()
if not ai_tools:
return "Error: AI tools not initialized. Please check server configuration."
result = await ai_tools.natural_language_query(question, schema, model_name)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Natural language query failed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def explain_query_with_ai(sql_query: str, limit: int = 100, model_name: str = None) -> str:
"""
Execute a SQL query and get AI-powered explanation and insights.
This tool executes your SQL query and uses AI to provide intelligent
explanations of the results, identify patterns, and suggest insights.
Perfect for understanding complex query results and data relationships.
Args:
sql_query (str): SQL query to execute and explain.
limit (int, optional): Maximum rows to analyze. Defaults to 100.
model_name (str, optional): Specific AI model to use.
Returns:
str: JSON response with query results and AI explanation.
Examples:
>>> await explain_query_with_ai("SELECT COUNT(*) FROM users GROUP BY DATE(created_at)")
>>> await explain_query_with_ai("SELECT * FROM orders WHERE total > 1000")
"""
if not AI_AVAILABLE:
return "Error: AI features are not available. Please check AI model configuration."
try:
ai_tools = get_ai_tools()
if not ai_tools:
return "Error: AI tools not initialized. Please check server configuration."
result = await ai_tools.explain_query_results(sql_query, limit, model_name)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Query explanation failed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_ai_data_insights(schema: str = "public", model_name: str = None) -> str:
"""
Generate AI-powered suggestions for database analysis and insights.
This tool analyzes your database schema and suggests interesting queries
and analyses that could provide business value. Great for discovering
new ways to analyze your data and find hidden insights.
Args:
schema (str, optional): Database schema to analyze. Defaults to "public".
model_name (str, optional): Specific AI model to use.
Returns:
str: JSON response with AI-generated analysis suggestions.
Examples:
>>> await get_ai_data_insights("public")
>>> await get_ai_data_insights("analytics", "claude-3-sonnet")
"""
if not AI_AVAILABLE:
return "Error: AI features are not available. Please check AI model configuration."
try:
ai_tools = get_ai_tools()
if not ai_tools:
return "Error: AI tools not initialized. Please check server configuration."
result = await ai_tools.suggest_database_insights(schema, model_name)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"AI insights generation failed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def analyze_table_patterns(table_name: str, schema: str = "public",
sample_size: int = 1000, model_name: str = None) -> str:
"""
Use AI to analyze data patterns and quality in a specific table.
This tool samples data from a table and uses AI to identify patterns,
data quality issues, and provide recommendations for analysis.
Excellent for data profiling and understanding data characteristics.
Args:
table_name (str): Name of the table to analyze.
schema (str, optional): Database schema. Defaults to "public".
sample_size (int, optional): Number of rows to sample. Defaults to 1000.
model_name (str, optional): Specific AI model to use.
Returns:
str: JSON response with AI pattern analysis.
Examples:
>>> await analyze_table_patterns("users")
>>> await analyze_table_patterns("orders", "public", 500)
"""
if not AI_AVAILABLE:
return "Error: AI features are not available. Please check AI model configuration."
try:
ai_tools = get_ai_tools()
if not ai_tools:
return "Error: AI tools not initialized. Please check server configuration."
result = await ai_tools.analyze_data_patterns(table_name, schema, sample_size, model_name)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Pattern analysis failed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def generate_ai_data_report(title: str, tables: str, schema: str = "public",
model_name: str = None) -> str:
"""
Generate a comprehensive AI-powered data report for multiple tables.
This tool creates detailed reports analyzing multiple tables, providing
executive summaries, key findings, and actionable recommendations.
Perfect for stakeholder reports and comprehensive data analysis.
Args:
title (str): Report title/description.
tables (str): Comma-separated list of table names to include.
schema (str, optional): Database schema. Defaults to "public".
model_name (str, optional): Specific AI model to use.
Returns:
str: JSON response with comprehensive AI-generated report.
Examples:
>>> await generate_ai_data_report("Monthly User Analysis", "users,user_activity,subscriptions")
>>> await generate_ai_data_report("Sales Performance", "orders,products,customers")
"""
if not AI_AVAILABLE:
return "Error: AI features are not available. Please check AI model configuration."
try:
ai_tools = get_ai_tools()
if not ai_tools:
return "Error: AI tools not initialized. Please check server configuration."
table_list = [t.strip() for t in tables.split(",")]
result = await ai_tools.generate_data_report(title, table_list, schema, model_name)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Report generation failed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_ai_system_status() -> str:
"""
Get comprehensive status of the AI system including available models and features.
This tool provides detailed information about the AI integration status,
available models, privacy settings, and enabled features. Useful for
troubleshooting and understanding AI capabilities.
Returns:
str: JSON response with AI system status and configuration.
Examples:
>>> await get_ai_system_status()
"""
if not AI_AVAILABLE:
return json.dumps({
"ai_available": False,
"reason": "AI module not loaded",
"features_enabled": False
}, indent=2)
try:
ai_tools = get_ai_tools()
if not ai_tools:
return json.dumps({
"ai_available": False,
"reason": "AI tools not initialized",
"features_enabled": False
}, indent=2)
status = ai_tools.get_ai_status()
status["ai_available"] = True
return json.dumps(status, indent=2)
except Exception as e:
logger.error(f"AI status check failed: {e}")
return json.dumps({
"ai_available": False,
"reason": f"Error: {str(e)}",
"features_enabled": False
}, indent=2)
async def test_connections():
"""Test both PostgreSQL and MinIO connections."""
try:
# Test PostgreSQL
conn = get_db_connection()
conn.close()
logger.info("✓ PostgreSQL connection successful")
# Test MinIO
client = get_minio_client()
list(client.list_buckets())
logger.info("✓ MinIO connection successful")
return True
except Exception as e:
logger.error(f"Connection test failed: {e}")
return False
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description="DP-MCP Server with AI Integration")
parser.add_argument("--port", type=int, default=8888, help="HTTP server port")
parser.add_argument("--host", default="127.0.0.1", help="HTTP server host")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
# Removed --ai-env parameter - AI configuration is now auto-detected from .env.ai
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Test connections on startup
if not asyncio.run(test_connections()):
logger.error("Connection tests failed. Please check your configuration.")
sys.exit(1)
# Initialize AI tools with auto-detection
if AI_AVAILABLE:
try:
from dp_mcp.ai.auto_config import auto_configure_ai, get_ai_summary
# Auto-configure AI based on available resources
ai_manager = auto_configure_ai()
if ai_manager:
from dp_mcp.ai.ai_tools import AIEnhancedTools
ai_tools = AIEnhancedTools(ai_manager=ai_manager)
logger.info("✓ AI tools initialized with auto-detected configuration")
# Log AI status
summary = get_ai_summary()
if summary["total_models"] > 0:
models = []
if summary["ollama_models"]: models.extend([f"{m} (local)" for m in summary["ollama_models"]])
if summary["claude_configured"]: models.append("claude (cloud)")
if summary["openai_configured"]: models.append("openai (cloud)")
logger.info(f"✓ Available AI models: {', '.join(models)}")
logger.info(f"✓ Privacy level: {summary['privacy_level']}")
else:
logger.warning("⚠️ No AI models available. AI features will return mock responses.")
else:
ai_tools = None
logger.info("ℹ️ AI auto-configuration found no available models")
except Exception as e:
logger.warning(f"⚠️ AI initialization failed: {e}")
logger.info("Server will start without AI features")
else:
logger.info("AI features disabled (dependencies not available)")
logger.info(f"Starting DP-MCP Server on port {args.port}")
logger.info(f"Server will be available at: http://{args.host}:{args.port}/mcp")
# Run FastMCP server with HTTP transport
mcp.run(
transport="http",
host=args.host,
port=args.port,
log_level="debug" if args.debug else "info"
)
if __name__ == "__main__":
main()