postgres_integration_example.py•10.6 kB
"""
PostgreSQL Integration Example
This example shows how to use the new architecture with separated PostgreSQL
integration and vector search functionality.
"""
import asyncio
from sqlalchemy import create_engine
from server import MCPServer
from postgres_integration import PostgreSQLIntegration
from vector_search import VectorSearchEngine
async def comprehensive_example():
"""Comprehensive example of the new architecture"""
print("PostgreSQL Integration & Vector Search Example")
print("=" * 50)
# Initialize MCP Server
mcp_server = MCPServer()
# Initialize database integrations
# Replace with your actual database connection string
engine = create_engine("postgresql://user:password@localhost/dbname")
mcp_server.initialize_database(engine, "my_database")
print("✓ MCP Server initialized with database integrations")
print()
# Direct PostgreSQL operations
if mcp_server.postgres_integration:
print("PostgreSQL Integration Examples:")
print("-" * 30)
# Test connection
connection_test = mcp_server.postgres_integration.test_connection()
if connection_test.get('success'):
print("✓ Database connection successful")
print(f" Database version: {connection_test.get('database_version', 'Unknown')}")
else:
print("✗ Database connection failed")
print(f" Error: {connection_test.get('error')}")
print()
# Get all tables
tables = mcp_server.postgres_integration.get_all_table_names()
print(f"Found {len(tables)} tables: {', '.join(tables[:5])}")
if len(tables) > 5:
print(f" ... and {len(tables) - 5} more")
print()
# Get schema for a specific table (if available)
if tables:
sample_table = tables[0]
schema = mcp_server.postgres_integration.get_table_schema(sample_table)
print(f"Schema for '{sample_table}':")
if 'error' not in schema:
print(f" Columns: {len(schema.get('columns', []))}")
print(f" Primary keys: {schema.get('primary_keys', [])}")
print(f" Foreign keys: {len(schema.get('foreign_keys', []))}")
else:
print(f" Error: {schema['error']}")
print()
# SQL generation from natural language (auto-discovers table semantics)
generated_sql = mcp_server.postgres_integration.generate_sql(
"Show me the most recent entries"
)
print(f"Generated SQL: {generated_sql}")
print()
# Safe SQL execution
sql_result = mcp_server.postgres_integration.safe_run_sql(generated_sql)
if sql_result.get('success'):
print("✓ Safe SQL execution successful")
print(f" Rows returned: {sql_result['row_count']}")
print(f" SQL executed: {sql_result['sql_executed']}")
else:
print("✗ Safe SQL execution failed")
print(f" Error: {sql_result['error']}")
print()
# Vector search operations
if mcp_server.vector_search:
print("Vector Search Examples:")
print("-" * 30)
# Semantic search example
search_results = mcp_server.vector_search.semantic_search(
question="Find information about recent activities",
k=3
)
print(f"Semantic search returned {len(search_results)} results")
for i, result in enumerate(search_results[:2]): # Show first 2
print(f" {i+1}. Table: {result.get('table_name')}")
snippet = result.get('snippet', '')
print(f" Snippet: {snippet[:80]}{'...' if len(snippet) > 80 else ''}")
print()
# Document population example
# populate_result = mcp_server.vector_search.populate_sample_docs()
# print(f"Document population: {populate_result.get('documents_created', 0)} docs created")
# print()
def mcp_tools_example():
"""Example of available MCP tools"""
print("Available MCP Tools:")
print("=" * 30)
tools = [
{
"name": "safe_sql",
"description": "Execute safe SQL queries (SELECT only)",
"example": '{"name": "safe_sql", "arguments": {"sql": "SELECT * FROM users LIMIT 10"}}'
},
{
"name": "semantic_search",
"description": "Search documents using vector similarity",
"example": '{"name": "semantic_search", "arguments": {"question": "recent trips", "k": 5}}'
},
{
"name": "get_database_schema",
"description": "Get comprehensive database schema information",
"example": '{"name": "get_database_schema", "arguments": {}}'
},
{
"name": "get_table_schema",
"description": "Get detailed schema for a specific table",
"example": '{"name": "get_table_schema", "arguments": {"table_name": "users"}}'
},
{
"name": "test_db_connection",
"description": "Test database connectivity",
"example": '{"name": "test_db_connection", "arguments": {}}'
},
{
"name": "get_database_stats",
"description": "Get comprehensive database statistics and performance info",
"example": '{"name": "get_database_stats", "arguments": {}}'
},
{
"name": "get_table_size",
"description": "Get detailed size information for a specific table",
"example": '{"name": "get_table_size", "arguments": {"table_name": "users"}}'
},
{
"name": "explain_query",
"description": "Execute EXPLAIN ANALYZE for performance analysis",
"example": '{"name": "explain_query", "arguments": {"sql": "SELECT * FROM users WHERE age > 25"}}'
},
{
"name": "generate_sql",
"description": "Generate SQL from natural language questions",
"example": '{"name": "generate_sql", "arguments": {"question": "Show me the most recent trips"}}'
}
]
for tool in tools:
print(f"• {tool['name']}")
print(f" {tool['description']}")
print(f" Example: {tool['example']}")
print()
def architecture_overview():
"""Overview of the new architecture"""
print("Architecture Overview:")
print("=" * 30)
print()
print("┌─ server.py (MCPServer)")
print("│ ├─ llm_integration (LLMIntegrationSystem)")
print("│ ├─ chat_memory (ChatMemorySystem)")
print("│ ├─ postgres_integration (PostgreSQLIntegration) ← NEW")
print("│ └─ vector_search (VectorSearchEngine)")
print("│")
print("├─ postgres_integration.py")
print("│ ├─ safe_run_sql()")
print("│ ├─ get_database_schema()")
print("│ ├─ get_table_schema()")
print("│ ├─ test_connection()")
print("│ └─ Schema inspection utilities")
print("│")
print("└─ vector_search.py")
print(" ├─ semantic_search()")
print(" ├─ populate_sample_docs()")
print(" └─ Uses postgres_integration for DB operations")
print()
print("Benefits:")
print("• Clear separation of concerns")
print("• PostgreSQL operations centralized")
print("• Vector search focused on its core functionality")
print("• Easy to extend with new database operations")
print("• Reusable components")
def direct_usage_examples():
"""Examples of direct module usage"""
print("Direct Module Usage Examples:")
print("=" * 40)
# Direct PostgreSQL integration usage
print("# Direct PostgreSQL Integration")
print("from postgres_integration import PostgreSQLIntegration")
print("from sqlalchemy import create_engine")
print()
print("engine = create_engine('postgresql://user:pass@localhost/db')")
print("postgres = PostgreSQLIntegration(engine, 'my_db')")
print()
print("# Configure for your domain (optional)")
print("from postgres_config_examples import create_e_commerce_config")
print("config = create_e_commerce_config()")
print("postgres = PostgreSQLIntegration(engine, 'my_db', config)")
print()
print("# Or use auto-discovery (no configuration needed)")
print("postgres = PostgreSQLIntegration(engine, 'my_db') # Auto-discovers semantics")
print()
print("# SQL generation from natural language")
print("sql = postgres.generate_sql('Show me recent entries')")
print("print(f'Generated: {sql}')")
print()
print("# Safe SQL execution")
print("result = postgres.safe_run_sql(sql)")
print()
print("# Schema inspection")
print("schema = postgres.get_database_schema()")
print("table_info = postgres.get_table_schema('users')")
print()
print("# Performance analysis")
print("explain = postgres.execute_explain('SELECT * FROM main_table ORDER BY created_at')")
print("stats = postgres.get_database_stats()")
print()
print("# Connection testing")
print("status = postgres.test_connection()")
print()
# Direct vector search usage
print("# Direct Vector Search")
print("from vector_search import VectorSearchEngine")
print()
print("vector_search = VectorSearchEngine(engine, 'my_db')")
print("results = vector_search.semantic_search('recent activities')")
print()
# MCP Server usage
print("# MCP Server Integration")
print("from server import MCPServer")
print()
print("mcp = MCPServer()")
print("mcp.initialize_database(engine, 'my_db') # Initializes both integrations")
print()
print("# Now available via MCP tools:")
print("# - safe_sql")
print("# - semantic_search")
print("# - get_database_schema")
print("# - get_table_schema")
print("# - test_db_connection")
print("# - get_database_stats")
print("# - get_table_size")
print("# - explain_query")
print("# - generate_sql ← NEW: Natural language to SQL")
if __name__ == "__main__":
print("PostgreSQL Integration & Vector Search")
print("=" * 50)
print()
# Show architecture overview
architecture_overview()
print()
# Show available tools
mcp_tools_example()
print()
# Show direct usage examples
direct_usage_examples()
print()
print("To run the comprehensive example:")
print("asyncio.run(comprehensive_example())")