"""
AI-powered tools for the DP-MCP server.
This module provides secure AI-enhanced database and data analysis tools
that work with the existing PostgreSQL and MinIO infrastructure.
"""
import json
import logging
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
from .models import AIModelManager
from .config_templates import get_setup_for_environment
from .data_privacy import DataPrivacyManager, PrivacyLevel
from ..tools.postgres_tools import (
list_tables, describe_table, execute_query, export_table_to_csv
)
from ..tools.minio_tools import (
list_buckets, list_objects, upload_object, download_object
)
logger = logging.getLogger(__name__)
class AIEnhancedTools:
"""AI-enhanced tools for database and data operations."""
def __init__(self, environment: str = "development", ai_manager = None):
"""
Initialize AI tools with appropriate privacy and model configuration.
Args:
environment: Environment type (deprecated - use ai_manager instead)
ai_manager: Pre-configured AIModelManager instance (preferred)
"""
if ai_manager is not None:
self.ai_manager = ai_manager
self.environment = "auto-detected"
else:
# Fallback to old behavior for backward compatibility
self.ai_manager = get_setup_for_environment(environment)
self.environment = environment
self.logger = logging.getLogger(f"{__name__}.AIEnhancedTools")
# Validate setup
status = self._validate_ai_setup()
if not status["working"]:
self.logger.warning("No working AI models available. AI features will be limited.")
else:
self.logger.info(f"AI tools initialized with {len(status['working'])} working models")
def _validate_ai_setup(self) -> Dict[str, List[str]]:
"""Validate AI model setup."""
from .config_templates import validate_setup
return validate_setup(self.ai_manager)
async def natural_language_query(self, question: str, schema: Optional[str] = "public",
model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Convert natural language question to SQL query and execute it.
Args:
question: Natural language question about the data
schema: Database schema to search (default: public)
model_name: Specific AI model to use (optional)
Returns:
Dictionary with query, results, and AI analysis
"""
try:
# Get database schema information
tables = await list_tables(schema)
schema_info = {"tables": []}
# Get detailed schema for first few tables (to avoid overwhelming AI)
table_list = tables.split("\n")[2:12] if "\n" in tables else [] # Skip header, limit to 10 tables
for table_line in table_list:
if "|" in table_line:
table_name = table_line.split("|")[1].strip()
if table_name and table_name != "Table Name":
try:
table_desc = await describe_table(table_name, schema)
schema_info["tables"].append({
"name": table_name,
"description": table_desc
})
except Exception as e:
self.logger.warning(f"Could not describe table {table_name}: {e}")
# Generate SQL query using AI
self.logger.info(f"Generating SQL for question: {question[:100]}...")
sql_query = await self.ai_manager.generate_sql_query(
natural_language=question,
schema_info=schema_info,
model_name=model_name
)
# Clean up the SQL query (remove markdown formatting if present)
sql_query = sql_query.strip()
if sql_query.startswith("```sql"):
sql_query = sql_query[6:]
if sql_query.endswith("```"):
sql_query = sql_query[:-3]
sql_query = sql_query.strip()
# Execute the query
self.logger.info(f"Executing generated SQL: {sql_query[:100]}...")
query_results = await execute_query(sql_query, limit=100)
# Parse results for AI analysis
try:
# Try to parse as JSON for structured analysis
if query_results.startswith("[") or query_results.startswith("{"):
results_data = json.loads(query_results)
else:
# Fallback to text results
results_data = {"raw_results": query_results}
except json.JSONDecodeError:
results_data = {"raw_results": query_results}
# Generate AI analysis of results
self.logger.info("Generating AI analysis of results...")
analysis = await self.ai_manager.analyze_query_results(
query=sql_query,
results=results_data if isinstance(results_data, list) else [results_data],
model_name=model_name
)
return {
"question": question,
"generated_sql": sql_query,
"query_results": query_results,
"ai_analysis": analysis,
"model_used": model_name or self.ai_manager.default_model,
"timestamp": datetime.now().isoformat(),
"privacy_level": self.ai_manager.privacy_manager.config.privacy_level.value
}
except Exception as e:
self.logger.error(f"Natural language query failed: {e}")
return {
"question": question,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def explain_query_results(self, sql_query: str, limit: int = 100,
model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Execute a SQL query and get AI explanation of the results.
Args:
sql_query: SQL query to execute and explain
limit: Maximum number of rows to analyze
model_name: Specific AI model to use (optional)
Returns:
Dictionary with results and AI explanation
"""
try:
# Execute the query
self.logger.info(f"Executing query for explanation: {sql_query[:100]}...")
query_results = await execute_query(sql_query, limit=limit)
# Parse results
try:
if query_results.startswith("[") or query_results.startswith("{"):
results_data = json.loads(query_results)
else:
results_data = {"raw_results": query_results}
except json.JSONDecodeError:
results_data = {"raw_results": query_results}
# Generate explanation
self.logger.info("Generating AI explanation...")
explanation = await self.ai_manager.analyze_query_results(
query=sql_query,
results=results_data if isinstance(results_data, list) else [results_data],
model_name=model_name
)
return {
"sql_query": sql_query,
"query_results": query_results,
"ai_explanation": explanation,
"model_used": model_name or self.ai_manager.default_model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Query explanation failed: {e}")
return {
"sql_query": sql_query,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def suggest_database_insights(self, schema: Optional[str] = "public",
model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Generate AI suggestions for database analysis and insights.
Args:
schema: Database schema to analyze
model_name: Specific AI model to use (optional)
Returns:
Dictionary with suggested queries and analysis
"""
try:
# Get database overview
tables = await list_tables(schema)
# Get schema information for a few key tables
schema_info = {"schema": schema, "tables": []}
table_list = tables.split("\n")[2:7] if "\n" in tables else [] # First 5 tables
for table_line in table_list:
if "|" in table_line:
table_name = table_line.split("|")[1].strip()
if table_name and table_name != "Table Name":
try:
table_desc = await describe_table(table_name, schema)
schema_info["tables"].append({
"name": table_name,
"structure": table_desc
})
except Exception:
pass
# Generate insights prompt
prompt = f"""
Based on the following database schema, suggest 5 interesting analytical queries
that could provide business insights:
Database Schema:
{json.dumps(schema_info, indent=2)}
For each suggestion, provide:
1. A descriptive title
2. The business question it answers
3. The suggested SQL query
4. Expected insights
Format as JSON with this structure:
{{
"suggestions": [
{{
"title": "...",
"business_question": "...",
"suggested_sql": "...",
"expected_insights": "..."
}}
]
}}
"""
# Get AI suggestions
self.logger.info("Generating database insight suggestions...")
suggestions = await self.ai_manager.generate_response(
prompt=prompt,
model_name=model_name
)
return {
"schema": schema,
"table_count": len(schema_info["tables"]),
"ai_suggestions": suggestions,
"model_used": model_name or self.ai_manager.default_model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Database insights generation failed: {e}")
return {
"schema": schema,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def analyze_data_patterns(self, table_name: str, schema: str = "public",
sample_size: int = 1000, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Analyze data patterns in a specific table using AI.
Args:
table_name: Name of the table to analyze
schema: Database schema
sample_size: Number of rows to sample for analysis
model_name: Specific AI model to use (optional)
Returns:
Dictionary with data pattern analysis
"""
try:
# Get table structure
table_structure = await describe_table(table_name, schema)
# Get sample data
sample_query = f"SELECT * FROM {schema}.{table_name} LIMIT {sample_size}"
sample_data = await execute_query(sample_query, limit=sample_size)
# Prepare analysis prompt
prompt = f"""
Analyze the data patterns in this database table and provide insights:
Table: {schema}.{table_name}
Structure: {table_structure}
Sample Data: {sample_data[:2000]}... # Truncated for analysis
Please provide:
1. Data quality assessment
2. Pattern identification
3. Potential data issues
4. Suggestions for analysis
5. Recommended next steps
Focus on actionable insights and potential business value.
"""
# Generate analysis
self.logger.info(f"Analyzing data patterns for table: {table_name}")
analysis = await self.ai_manager.generate_response(
prompt=prompt,
model_name=model_name
)
return {
"table_name": f"{schema}.{table_name}",
"sample_size": sample_size,
"data_pattern_analysis": analysis,
"table_structure": table_structure,
"model_used": model_name or self.ai_manager.default_model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Data pattern analysis failed: {e}")
return {
"table_name": f"{schema}.{table_name}",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def generate_data_report(self, title: str, tables: List[str],
schema: str = "public", model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Generate a comprehensive data report using AI analysis.
Args:
title: Report title
tables: List of table names to include in report
schema: Database schema
model_name: Specific AI model to use (optional)
Returns:
Dictionary with comprehensive data report
"""
try:
report_sections = []
# Analyze each table
for table_name in tables[:5]: # Limit to 5 tables for performance
try:
# Get basic info
table_desc = await describe_table(table_name, schema)
row_count_query = f"SELECT COUNT(*) as total_rows FROM {schema}.{table_name}"
row_count = await execute_query(row_count_query, limit=1)
# Get sample data
sample_query = f"SELECT * FROM {schema}.{table_name} LIMIT 10"
sample_data = await execute_query(sample_query, limit=10)
report_sections.append({
"table": table_name,
"structure": table_desc,
"row_count": row_count,
"sample_data": sample_data[:500] # Truncate for AI processing
})
except Exception as e:
self.logger.warning(f"Could not analyze table {table_name}: {e}")
# Generate comprehensive report
prompt = f"""
Generate a comprehensive data report titled "{title}" based on the following database analysis:
{json.dumps(report_sections, indent=2)}
Please provide:
1. Executive Summary
2. Data Overview (tables, row counts, key metrics)
3. Key Findings and Insights
4. Data Quality Assessment
5. Recommendations for further analysis
6. Potential business applications
Format as a professional report with clear sections and actionable insights.
"""
self.logger.info(f"Generating comprehensive data report: {title}")
report = await self.ai_manager.generate_response(
prompt=prompt,
model_name=model_name
)
return {
"title": title,
"tables_analyzed": tables,
"schema": schema,
"report_content": report,
"analysis_sections": report_sections,
"model_used": model_name or self.ai_manager.default_model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Data report generation failed: {e}")
return {
"title": title,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_available_models(self) -> List[str]:
"""Get list of available AI models."""
return self.ai_manager.list_models()
def get_privacy_level(self) -> str:
"""Get current privacy protection level."""
return self.ai_manager.privacy_manager.config.privacy_level.value
def get_ai_status(self) -> Dict[str, Any]:
"""Get comprehensive AI system status."""
status = self._validate_ai_setup()
return {
"environment": self.environment,
"privacy_level": self.get_privacy_level(),
"available_models": self.get_available_models(),
"default_model": self.ai_manager.default_model,
"working_models": status["working"],
"failed_models": status["failed"],
"recommendations": status["recommendations"],
"features_enabled": {
"natural_language_query": len(status["working"]) > 0,
"query_explanation": len(status["working"]) > 0,
"data_insights": len(status["working"]) > 0,
"pattern_analysis": len(status["working"]) > 0,
"report_generation": len(status["working"]) > 0
}
}
# Global AI tools instance (will be initialized by server)
ai_tools: Optional[AIEnhancedTools] = None
def initialize_ai_tools(environment: str = "development") -> AIEnhancedTools:
"""Initialize global AI tools instance."""
global ai_tools
ai_tools = AIEnhancedTools(environment)
return ai_tools
def get_ai_tools() -> Optional[AIEnhancedTools]:
"""Get the global AI tools instance."""
return ai_tools