Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
llmDatabaseRouter_original.py160 kB
from sqlalchemy import text, inspect import json import uuid import traceback from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import re class LLMDatabaseRouter: """ Generic LLM-database interaction system that can work with any PostgreSQL schema. Provides semantic search, schema awareness, and safe SQL execution. """ def __init__(self, engine, db_key=None, app=None): self.engine = engine self.db_key = db_key self.app = app self.setup_catalog_tables() def setup_catalog_tables(self): """Create the catalog and document embedding tables if they don't exist""" try: # First, check if tables already exist to avoid unnecessary operations with self.engine.connect() as connection: catalog_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'catalog_embeddings' ); """)).scalar() doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if catalog_exists and doc_exists: if self.app: self.app.logger.info(f"LLM catalog tables already exist in database: {self.db_key}") return # Check if vector extension is available has_vector_extension = self._check_vector_extension() # Create tables based on vector availability self._create_catalog_table(has_vector_extension) self._create_doc_table(has_vector_extension) # Create indexes (separate transactions to avoid conflicts) self._create_indexes_safely(has_vector_extension) if self.app: self.app.logger.info(f"LLM catalog tables setup completed for database: {self.db_key}") except Exception as e: if self.app: self.app.logger.error(f"Error setting up catalog tables in {self.db_key}: {e}") traceback.print_exc() def _check_vector_extension(self): """Check if pgvector extension is available and working""" try: with self.engine.begin() as connection: # Try to create the extension connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;")) # Test if vector type is available by checking pg_type result = connection.execute(text(""" SELECT EXISTS ( SELECT 1 FROM pg_type WHERE typname = 'vector' ); """)).scalar() if result: if self.app: self.app.logger.info(f"pgvector extension is available in {self.db_key}") return True else: if self.app: self.app.logger.warning(f"pgvector extension not available in {self.db_key}") return False except Exception as ext_error: if self.app: self.app.logger.warning(f"Could not enable vector extension in {self.db_key}: {ext_error}") return False def _create_catalog_table(self, has_vector): """Create catalog_embeddings table with or without vector column""" try: with self.engine.begin() as connection: if has_vector: connection.execute(text(""" CREATE TABLE IF NOT EXISTS catalog_embeddings ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), kind TEXT CHECK (kind IN ('table','column')), schema_name TEXT, table_name TEXT, column_name TEXT, payload JSONB, embedding vector(1536), created_at TIMESTAMP DEFAULT now() ); """)) if self.app: self.app.logger.info(f"Created catalog_embeddings table with vector column in {self.db_key}") else: connection.execute(text(""" CREATE TABLE IF NOT EXISTS catalog_embeddings ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), kind TEXT CHECK (kind IN ('table','column')), schema_name TEXT, table_name TEXT, column_name TEXT, payload JSONB, created_at TIMESTAMP DEFAULT now() ); """)) if self.app: self.app.logger.info(f"Created catalog_embeddings table without vector column in {self.db_key}") except Exception as e: if self.app: self.app.logger.error(f"Failed to create catalog_embeddings table in {self.db_key}: {e}") raise def _create_doc_table(self, has_vector): """Create doc_embeddings table with or without vector column""" try: with self.engine.begin() as connection: if has_vector: connection.execute(text(""" CREATE TABLE IF NOT EXISTS doc_embeddings ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), table_name TEXT NOT NULL, pk_json JSONB NOT NULL, snippet TEXT NOT NULL, created_at TIMESTAMP DEFAULT now(), embedding vector(1536) ); """)) if self.app: self.app.logger.info(f"Created doc_embeddings table with vector column in {self.db_key}") else: connection.execute(text(""" CREATE TABLE IF NOT EXISTS doc_embeddings ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), table_name TEXT NOT NULL, pk_json JSONB NOT NULL, snippet TEXT NOT NULL, created_at TIMESTAMP DEFAULT now() ); """)) if self.app: self.app.logger.info(f"Created doc_embeddings table without vector column in {self.db_key}") except Exception as e: if self.app: self.app.logger.error(f"Failed to create doc_embeddings table in {self.db_key}: {e}") raise def _create_indexes_safely(self, has_vector): """Create indexes for the embedding tables safely""" # Create basic indexes first basic_indexes = [ ("idx_catalog_kind", "CREATE INDEX IF NOT EXISTS idx_catalog_kind ON catalog_embeddings(kind);"), ("idx_catalog_table", "CREATE INDEX IF NOT EXISTS idx_catalog_table ON catalog_embeddings(table_name);"), ("idx_doc_table", "CREATE INDEX IF NOT EXISTS idx_doc_table ON doc_embeddings(table_name);") ] for index_name, index_sql in basic_indexes: try: with self.engine.begin() as connection: connection.execute(text(index_sql)) except Exception as idx_error: if self.app: self.app.logger.warning(f"Could not create index {index_name} in {self.db_key}: {idx_error}") # Create vector indexes only if vector extension is available if has_vector: vector_indexes = [ ("idx_catalog_embedding", "CREATE INDEX IF NOT EXISTS idx_catalog_embedding ON catalog_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);"), ("idx_doc_embedding", "CREATE INDEX IF NOT EXISTS idx_doc_embedding ON doc_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);") ] for index_name, index_sql in vector_indexes: try: with self.engine.begin() as connection: connection.execute(text(index_sql)) if self.app: self.app.logger.info(f"Created vector index {index_name} in {self.db_key}") except Exception as idx_error: if self.app: self.app.logger.warning(f"Could not create vector index {index_name} in {self.db_key}: {idx_error}") else: if self.app: self.app.logger.info(f"Skipping vector indexes in {self.db_key} - vector extension not available") def get_schema_info(self) -> Dict[str, Any]: """Extract comprehensive schema information from the database""" schema_info = { 'tables': [], 'columns': [], 'primary_keys': [], 'foreign_keys': [], 'row_counts': [], 'date_columns': [] } try: with self.engine.connect() as connection: # Debug: Log what tables we can see if self.app: debug_tables = connection.execute(text(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """)).fetchall() table_names = [row[0] for row in debug_tables] self.app.logger.info(f"DEBUG: Found {len(table_names)} tables in {self.db_key}: {table_names}") # Get tables with comments, excluding system tables tables_result = connection.execute(text(""" SELECT n.nspname AS schema, c.relname AS table, obj_description(c.oid) AS table_comment FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind='r' AND n.nspname NOT IN ('pg_catalog','information_schema', 'pg_toast') AND c.relname NOT LIKE 'pg_%' AND c.relname NOT LIKE 'sql_%' ORDER BY n.nspname, c.relname; """)) schema_info['tables'] = [dict(row._mapping) for row in tables_result] # Get columns with comments columns_result = connection.execute(text(""" SELECT table_schema, table_name, column_name, data_type, col_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass::oid, ordinal_position) AS column_comment FROM information_schema.columns WHERE table_schema NOT IN ('pg_catalog','information_schema') ORDER BY table_schema, table_name, ordinal_position; """)) schema_info['columns'] = [dict(row._mapping) for row in columns_result] # Get primary keys pk_result = connection.execute(text(""" SELECT tc.table_schema, tc.table_name, kc.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kc USING (constraint_name, table_schema, table_name) WHERE tc.constraint_type='PRIMARY KEY' ORDER BY tc.table_schema, tc.table_name; """)) schema_info['primary_keys'] = [dict(row._mapping) for row in pk_result] # Get foreign keys fk_result = connection.execute(text(""" SELECT tc.table_schema, tc.table_name, kcu.column_name, ccu.table_schema AS fk_schema, ccu.table_name AS fk_table, ccu.column_name AS fk_column FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema=kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema=tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' ORDER BY tc.table_schema, tc.table_name; """)) schema_info['foreign_keys'] = [dict(row._mapping) for row in fk_result] # Get approximate row counts row_counts_result = connection.execute(text(""" SELECT relname AS table, reltuples::bigint AS approx_rows FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE relkind='r' AND n.nspname='public' ORDER BY relname; """)) schema_info['row_counts'] = [dict(row._mapping) for row in row_counts_result] # Get date-ish columns date_cols_result = connection.execute(text(""" SELECT table_name, column_name, data_type FROM information_schema.columns WHERE data_type IN ('date','timestamp without time zone','timestamp with time zone') AND table_schema = 'public' ORDER BY table_name, column_name; """)) schema_info['date_columns'] = [dict(row._mapping) for row in date_cols_result] except Exception as e: if self.app: self.app.logger.error(f"Error getting schema info from {self.db_key}: {e}") traceback.print_exc() return schema_info def build_catalog_descriptions(self, schema_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Build descriptive strings for tables and columns for embedding""" descriptions = [] # Group data by table for easier processing tables_by_name = {t['table']: t for t in schema_info['tables']} columns_by_table = {} pks_by_table = {} fks_by_table = {} date_cols_by_table = {} row_counts_by_table = {rc['table']: rc['approx_rows'] for rc in schema_info['row_counts']} # Group columns by table for col in schema_info['columns']: table_name = col['table_name'] if table_name not in columns_by_table: columns_by_table[table_name] = [] columns_by_table[table_name].append(col) # Group primary keys by table for pk in schema_info['primary_keys']: table_name = pk['table_name'] if table_name not in pks_by_table: pks_by_table[table_name] = [] pks_by_table[table_name].append(pk['column_name']) # Group foreign keys by table for fk in schema_info['foreign_keys']: table_name = fk['table_name'] if table_name not in fks_by_table: fks_by_table[table_name] = [] fks_by_table[table_name].append(fk) # Group date columns by table for dc in schema_info['date_columns']: table_name = dc['table_name'] if table_name not in date_cols_by_table: date_cols_by_table[table_name] = [] date_cols_by_table[table_name].append(dc) # Build table descriptions for table_name, table_info in tables_by_name.items(): columns = columns_by_table.get(table_name, []) pks = pks_by_table.get(table_name, []) fks = fks_by_table.get(table_name, []) date_cols = date_cols_by_table.get(table_name, []) row_count = row_counts_by_table.get(table_name, 0) # Build column list with types and PK markers col_descriptions = [] primary_date_col = None for col in columns: col_desc = f"{col['column_name']} {col['data_type']}" if col['column_name'] in pks: col_desc += " pk" col_descriptions.append(col_desc) # Identify potential primary date column if col['data_type'] in ['date', 'timestamp without time zone', 'timestamp with time zone']: if any(keyword in col['column_name'].lower() for keyword in ['end', 'finish', 'complete', 'update']): primary_date_col = col['column_name'] elif not primary_date_col and any(keyword in col['column_name'].lower() for keyword in ['date', 'time', 'created', 'start']): primary_date_col = col['column_name'] # Build FK references fk_refs = [] for fk in fks: fk_refs.append(f"{fk['fk_table']}.{fk['fk_column']}") # Build table description desc_parts = [f"TABLE {table_name}: columns {{{', '.join(col_descriptions)}}}"] if table_info.get('table_comment'): desc_parts.append(f"comment: {table_info['table_comment']}") if primary_date_col: desc_parts.append(f"primary_date={primary_date_col}") if fk_refs: desc_parts.append(f"FKs: {', '.join(fk_refs)}") if row_count > 0: desc_parts.append(f"~{row_count} rows") table_description = "; ".join(desc_parts) descriptions.append({ 'kind': 'table', 'schema_name': table_info.get('schema', 'public'), 'table_name': table_name, 'column_name': None, 'description': table_description, 'payload': { 'table_info': table_info, 'columns': columns, 'primary_keys': pks, 'foreign_keys': fks, 'date_columns': date_cols, 'row_count': row_count, 'primary_date_column': primary_date_col } }) # Build column descriptions for col in schema_info['columns']: table_name = col['table_name'] col_name = col['column_name'] desc_parts = [f"COLUMN {table_name}.{col_name}: {col['data_type']}"] if col.get('column_comment'): desc_parts.append(col['column_comment']) elif col['data_type'] in ['text', 'varchar', 'character varying']: desc_parts.append("text content") elif col['data_type'] in ['date', 'timestamp without time zone', 'timestamp with time zone']: desc_parts.append("date/time value") elif 'id' in col_name.lower(): desc_parts.append("identifier") elif 'name' in col_name.lower(): desc_parts.append("name or title") column_description = ": ".join(desc_parts) descriptions.append({ 'kind': 'column', 'schema_name': col.get('table_schema', 'public'), 'table_name': table_name, 'column_name': col_name, 'description': column_description, 'payload': { 'column_info': col, 'table_name': table_name } }) return descriptions def search_catalog(self, query: str, k: int = 12) -> List[Dict[str, Any]]: """Search the catalog embeddings for relevant schema elements""" try: with self.engine.connect() as connection: # For now, return a simple text-based search # In production, you'd use actual vector embeddings result = connection.execute(text(""" SELECT kind, schema_name, table_name, column_name, payload FROM catalog_embeddings WHERE payload::text ILIKE :query ORDER BY CASE WHEN kind = 'table' THEN 1 ELSE 2 END, table_name LIMIT :limit """), {'query': f'%{query}%', 'limit': k}) return [dict(row._mapping) for row in result] except Exception as e: if self.app: self.app.logger.error(f"Error searching catalog in {self.db_key}: {e}") return [] def safe_run_sql(self, sql: str, limit_safe: bool = True) -> Dict[str, Any]: """Safely execute SQL with guardrails""" try: # Basic SQL validation sql_clean = sql.strip() if not sql_clean: return {'error': 'Empty SQL query'} # Check for forbidden statements forbidden_patterns = [ r'\b(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE)\b', r';.*\w', # Multiple statements r'/\*' # Block comments ] for pattern in forbidden_patterns: if re.search(pattern, sql_clean, re.IGNORECASE): return {'error': f'Forbidden SQL pattern detected: {pattern}'} # Validate table existence before execution validation_error = self._validate_table_existence(sql_clean) if validation_error: return {'error': validation_error} # Auto-inject LIMIT if not present and not aggregate if limit_safe and not re.search(r'\bLIMIT\b', sql_clean, re.IGNORECASE): if not re.search(r'\b(COUNT|SUM|AVG|MIN|MAX|GROUP BY)\b', sql_clean, re.IGNORECASE): # Remove trailing semicolon before adding LIMIT sql_clean = sql_clean.rstrip(';').strip() sql_clean += ' LIMIT 100' # Execute query with self.engine.connect() as connection: result = connection.execute(text(sql_clean)) rows = result.fetchall() columns = list(result.keys()) if rows else [] # Convert to list of dicts data = [] for row in rows: row_dict = {} for i, col in enumerate(columns): value = row[i] # Handle datetime objects if hasattr(value, 'isoformat'): value = value.isoformat() # Handle UUID objects elif hasattr(value, 'hex'): value = str(value) row_dict[col] = value data.append(row_dict) return { 'success': True, 'data': data, 'columns': columns, 'row_count': len(data), 'sql_executed': sql_clean } except Exception as e: error_msg = str(e) if self.app: self.app.logger.error(f"Error executing SQL in {self.db_key}: {error_msg}") return {'error': error_msg, 'sql_attempted': sql} def generate_sql(self, question: str, schema_slice: Dict[str, Any]) -> str: """Generate SQL from natural language question and schema context""" # First, get all available tables in this database to ensure accurate schema context try: all_tables = self._get_all_table_names() available_tables_text = "Available tables: " + ", ".join(sorted(all_tables)) # Get actual column info for key tables table_info = self._get_key_table_info() available_tables_text += "\n" + table_info except: available_tables_text = "Available tables: Could not retrieve table list" # Build a comprehensive prompt for the LLM schema_description = [] for table_info in schema_slice.get('tables', []): if isinstance(table_info, dict) and 'payload' in table_info: payload = table_info['payload'] if isinstance(payload, str): try: payload = json.loads(payload) except: pass if isinstance(payload, dict): table_name = table_info.get('table_name', 'unknown') columns = payload.get('columns', []) col_descriptions = [] for col in columns: col_desc = f"{col.get('column_name', 'unknown')} ({col.get('data_type', 'unknown')})" col_descriptions.append(col_desc) if col_descriptions: schema_description.append(f"Table {table_name}: {', '.join(col_descriptions)}") schema_text = "\n".join(schema_description) if schema_description else "No detailed schema information available" prompt = f"""You are a PostgreSQL expert. Generate a safe SELECT query for the user's question. {available_tables_text} Detailed schema for relevant tables: {schema_text} IMPORTANT TABLE RELATIONSHIPS AND COLUMNS: - Use 'teams' table to connect personnel and trips (NOT trip_personnel) - Personnel are linked to trips via: personnel -> teams -> trips - Common join pattern: personnel p JOIN teams t ON p.personnel_id = t.personnel_id JOIN trips tr ON t.trip_id = tr.trip_id - For trips table, use 'start_date' and 'end_date' columns (NOT trip_date) - For "latest/last trip" queries, use: ORDER BY start_date DESC or ORDER BY end_date DESC User question: {question} CRITICAL REQUIREMENTS: - ONLY use tables that exist in the "Available tables" list above - NEVER reference tables like "trip_personnel", "projects", "challenges", "tasks" unless they appear in the available tables list - Use the correct table relationships: personnel <-> teams <-> trips - Use correct column names: start_date and end_date for trips (NOT trip_date) - Only SELECT statements (no INSERT, UPDATE, DELETE, DROP, etc.) - Use proper JOIN syntax where needed - Include appropriate WHERE clauses - Add LIMIT clauses for large result sets (LIMIT 50 or less) - Use PostgreSQL-specific syntax and functions - For "last trip" queries, use ORDER BY start_date DESC, end_date DESC and LIMIT 1 - For person-specific queries (like "Bob's travel"), use LIKE '%name%' for flexible matching - Return ONLY the SQL query, no explanations or markdown formatting EXAMPLE PATTERNS: - "Bob's travel" or "breakdown of travel for Bob" should query: SELECT tr.trip_id, tr.start_date, tr.end_date, p.name, d.location_name FROM trips tr JOIN teams t ON tr.trip_id = t.trip_id JOIN personnel p ON t.personnel_id = p.personnel_id LEFT JOIN trip_destinations td ON tr.trip_id = td.trip_id LEFT JOIN destinations d ON td.destination_id = d.destination_id WHERE LOWER(p.name) LIKE '%bob%' SQL Query:""" try: # Use the existing LLM infrastructure from AI.ai import query_vectorize_generate # Use the same database key as the router result = query_vectorize_generate(self.db_key, prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): # Extract the generated text from the result generated_sql = result.get('text', '') or result.get('response', '') or str(result) elif result: generated_sql = str(result) else: generated_sql = None if generated_sql and generated_sql.strip(): # Clean up the response - remove markdown formatting and quotes if present sql_clean = generated_sql.strip() # Remove markdown formatting if sql_clean.startswith('```sql'): sql_clean = sql_clean.replace('```sql', '').replace('```', '').strip() elif sql_clean.startswith('```'): sql_clean = sql_clean.replace('```', '').strip() # Remove surrounding quotes if present (common LLM issue) if sql_clean.startswith('"') and sql_clean.endswith('"'): sql_clean = sql_clean[1:-1].strip() elif sql_clean.startswith("'") and sql_clean.endswith("'"): sql_clean = sql_clean[1:-1].strip() # Remove trailing semicolon if present (will be added by safe_run_sql if needed) sql_clean = sql_clean.rstrip(';').strip() return sql_clean else: # If LLM doesn't provide useful SQL, return a simple query return "SELECT 'No specific pattern matched for this question' as message" except Exception as e: if self.app: self.app.logger.warning(f"LLM SQL generation failed for {self.db_key}: {e}") # Fallback to heuristics return self._generate_sql_heuristics(question, schema_slice) def _generate_sql_heuristics(self, question: str, schema_slice: Dict[str, Any]) -> str: """Fallback SQL generation using simple heuristics""" # Keep the existing heuristic logic as backup question_lower = question.lower() # Handle general travel breakdown queries (for Bob, team, etc.) if any(keyword in question_lower for keyword in ['breakdown of', 'travel done', 'trips done', 'travel by', 'travel for']): # Check if a specific person is mentioned words = question.split() person_name = None # Look for names like "Bob", "Bob's", etc. for word in words: clean_word = word.strip("'s").strip('?').strip(',').strip('.') if len(clean_word) > 2 and clean_word.lower() not in ['the', 'all', 'done', 'travel', 'trips', 'breakdown', 'give', 'for']: # This might be a person's name person_name = clean_word.title() break if person_name: return f""" SELECT tr.trip_id, tr.start_date, tr.end_date, p.name as personnel_name, d.location_name as destination FROM trips tr JOIN teams t ON tr.trip_id = t.trip_id JOIN personnel p ON t.personnel_id = p.personnel_id LEFT JOIN trip_destinations td ON tr.trip_id = td.trip_id LEFT JOIN destinations d ON td.destination_id = d.destination_id WHERE LOWER(p.name) LIKE LOWER('%{person_name}%') ORDER BY tr.start_date DESC LIMIT 20 """ else: # General trip breakdown return """ SELECT tr.trip_id, tr.start_date, tr.end_date, COUNT(DISTINCT p.personnel_id) as crew_count, array_agg(DISTINCT d.location_name) as destinations FROM trips tr LEFT JOIN teams t ON tr.trip_id = t.trip_id LEFT JOIN personnel p ON t.personnel_id = p.personnel_id LEFT JOIN trip_destinations td ON tr.trip_id = td.trip_id LEFT JOIN destinations d ON td.destination_id = d.destination_id GROUP BY tr.trip_id, tr.start_date, tr.end_date ORDER BY tr.start_date ASC LIMIT 50 """ # Handle travel/destination queries if any(keyword in question_lower for keyword in ['where has', 'where did', 'travelled to', 'traveled to', 'visited', 'been to']): # Extract person name from question words = question.split() person_name = None for i, word in enumerate(words): if word.lower() in ['has', 'did'] and i > 0: person_name = words[i-1].strip('?').title() break if person_name: return f""" SELECT DISTINCT p.name, d.location_name, t.start_date, t.end_date FROM personnel p JOIN teams tm ON p.personnel_id = tm.personnel_id JOIN trips t ON tm.trip_id = t.trip_id JOIN trip_destinations td ON t.trip_id = td.trip_id JOIN destinations d ON td.destination_id = d.destination_id WHERE LOWER(p.name) = LOWER('{person_name}') ORDER BY t.start_date, d.location_name """ # Handle destination popularity/frequency queries if any(keyword in question_lower for keyword in ['most visited', 'most popular', 'top destination', 'which destination']): return """ SELECT d.location_name, COUNT(*) as visit_count FROM destinations d JOIN trip_destinations td ON d.destination_id = td.destination_id JOIN trips t ON td.trip_id = t.trip_id GROUP BY d.location_name ORDER BY visit_count DESC LIMIT 10 """ if 'most recent' in question_lower or 'latest' in question_lower: # For trip-related queries, always include comprehensive details if 'trip' in question_lower: return """ SELECT t.trip_id, t.start_date, t.end_date, COUNT(DISTINCT tm.personnel_id) as team_size, COUNT(DISTINCT td.destination_id) as destination_count, array_agg(DISTINCT d.location_name) FILTER (WHERE d.location_name IS NOT NULL) as destinations, array_agg(DISTINCT p.name) FILTER (WHERE p.name IS NOT NULL) as team_members FROM trips t LEFT JOIN teams tm ON t.trip_id = tm.trip_id LEFT JOIN personnel p ON tm.personnel_id = p.personnel_id LEFT JOIN trip_destinations td ON t.trip_id = td.trip_id LEFT JOIN destinations d ON td.destination_id = d.destination_id GROUP BY t.trip_id, t.start_date, t.end_date ORDER BY t.end_date DESC, t.start_date DESC LIMIT 5 """ else: # Try to find a table with date columns for non-trip queries for table_info in schema_slice.get('tables', []): table_name = table_info.get('table_name') payload = table_info['payload'] primary_date = payload.get('primary_date_column') if primary_date: return f""" SELECT * FROM {table_name} ORDER BY {primary_date} DESC LIMIT 1 """ if 'who' in question_lower and 'trip' in question_lower: if 'last' in question_lower or 'latest' in question_lower or 'recent' in question_lower: return """ WITH latest_trip AS ( SELECT trip_id FROM trips ORDER BY start_date DESC, end_date DESC LIMIT 1 ) SELECT p.personnel_id, p.name, p.position FROM latest_trip lt JOIN teams tm ON tm.trip_id = lt.trip_id JOIN personnel p ON p.personnel_id = tm.personnel_id ORDER BY p.name """ else: return """ SELECT DISTINCT p.personnel_id, p.name, p.position FROM personnel p JOIN teams tm ON p.personnel_id = tm.personnel_id JOIN trips tr ON tm.trip_id = tr.trip_id ORDER BY p.name LIMIT 20 """ if 'how many' in question_lower and 'personnel' in question_lower and ('mars' in question_lower or 'trip' in question_lower): if 'mars' in question_lower: return """ SELECT COUNT(DISTINCT p.personnel_id) as personnel_count FROM personnel p JOIN teams t ON t.personnel_id = p.personnel_id JOIN trips tr ON tr.trip_id = t.trip_id JOIN trip_destinations td ON td.trip_id = tr.trip_id JOIN destinations d ON d.destination_id = td.destination_id WHERE LOWER(d.location_name) LIKE '%mars%' """ else: return """ SELECT COUNT(DISTINCT p.personnel_id) as personnel_count FROM personnel p JOIN teams t ON t.personnel_id = p.personnel_id JOIN trips tr ON tr.trip_id = t.trip_id """ # Generic count query for personnel if 'count' in question_lower or 'how many' in question_lower: if 'personnel' in question_lower: return "SELECT COUNT(*) as count FROM personnel" if 'trip' in question_lower: return "SELECT COUNT(*) as count FROM trips" # Default fallback - simple select return "SELECT 'No specific SQL pattern matched' as message" def _validate_table_existence(self, sql: str) -> Optional[str]: """Validate that all tables referenced in the SQL actually exist""" try: # Get all actual table names from the database actual_tables = set(self._get_all_table_names()) # Extract table names from SQL using simple regex patterns # This covers most common SQL patterns table_patterns = [ r'\bFROM\s+([a-zA-Z_][a-zA-Z0-9_]*)', # FROM table_name r'\bJOIN\s+([a-zA-Z_][a-zA-Z0-9_]*)', # JOIN table_name r'\bINTO\s+([a-zA-Z_][a-zA-Z0-9_]*)', # INTO table_name r'\bUPDATE\s+([a-zA-Z_][a-zA-Z0-9_]*)', # UPDATE table_name ] referenced_tables = set() sql_upper = sql.upper() for pattern in table_patterns: matches = re.findall(pattern, sql_upper, re.IGNORECASE) for match in matches: # Clean the table name table_name = match.strip().lower() # Skip SQL keywords and common aliases if table_name not in ['select', 'where', 'order', 'group', 'having', 'as', 'on', 'and', 'or']: referenced_tables.add(table_name) # Check for tables that don't exist missing_tables = [] for table in referenced_tables: if table not in [t.lower() for t in actual_tables]: missing_tables.append(table) if missing_tables: return f"Referenced table(s) do not exist: {', '.join(missing_tables)}. Available tables: {', '.join(sorted(actual_tables))}" return None except Exception as e: if self.app: self.app.logger.warning(f"Table validation failed: {e}") # If validation fails, allow the query to proceed (safer than blocking all queries) return None def _get_all_table_names(self) -> List[str]: """Get all table names from the current database""" try: with self.engine.connect() as connection: result = connection.execute(text(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """)) return [row[0] for row in result] except Exception as e: if self.app: self.app.logger.warning(f"Could not get table names for {self.db_key}: {e}") return [] def _get_key_table_info(self) -> str: """Get key table information for SQL generation context""" try: with self.engine.connect() as connection: # Get column info for main tables result = connection.execute(text(""" SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' AND table_name IN ('personnel', 'trips', 'teams', 'destinations', 'trip_destinations', 'descriptions') ORDER BY table_name, ordinal_position """)) tables = {} for row in result: table_name = row[0] if table_name not in tables: tables[table_name] = [] tables[table_name].append(f"{row[1]} ({row[2]})") info_parts = [] for table_name, columns in tables.items(): info_parts.append(f"{table_name}: {', '.join(columns)}") return "\n".join(info_parts) except Exception as e: if self.app: self.app.logger.warning(f"Could not get key table info for {self.db_key}: {e}") return "Could not retrieve detailed table information" def semantic_rows(self, question: str, table_filter: Optional[str] = None, fk_filter: Optional[Dict] = None, k: int = 10) -> List[Dict[str, Any]]: """Search document embeddings using vector similarity""" try: # First, generate an embedding for the user's question using your LLM system question_embedding = self._generate_embedding(question) if not question_embedding: # Fallback to text search if embedding generation fails return self._text_search_fallback(question, table_filter, fk_filter) with self.engine.connect() as connection: # Use vector similarity search with cosine distance sql_parts = [""" SELECT table_name, pk_json, snippet, created_at, (embedding <=> :question_embedding::vector) AS distance FROM doc_embeddings WHERE 1=1 """] params = {'question_embedding': question_embedding} if table_filter: sql_parts.append("AND table_name = :table_filter") params['table_filter'] = table_filter if fk_filter: sql_parts.append("AND pk_json @> :fk_filter::jsonb") params['fk_filter'] = json.dumps(fk_filter) # Order by similarity (lower distance = more similar) sql_parts.append("ORDER BY distance ASC LIMIT 20") final_sql = " ".join(sql_parts) result = connection.execute(text(final_sql), params) rows = [] for row in result: row_dict = dict(row._mapping) # Remove the distance from the final result (internal use only) row_dict.pop('distance', None) rows.append(row_dict) return rows except Exception as e: if self.app: self.app.logger.error(f"Error in vector search for {self.db_key}: {e}") # Fallback to text search return self._text_search_fallback(question, table_filter, fk_filter) def _generate_embedding(self, text_input: str) -> Optional[str]: """Generate embedding for text using available methods""" try: # First check if the embedding column exists in this database with self.engine.connect() as connection: embedding_col_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'doc_embeddings' AND column_name = 'embedding' AND table_schema = 'public' ); """)).scalar() if not embedding_col_exists: # No embedding column, can't do vector search return None # Method 1: Try to use an existing similar embedding as a proxy # This finds documents with similar keywords and uses their embeddings with self.engine.connect() as connection: # Extract key terms from the query search_terms = self._extract_search_terms(text_input) if search_terms: # Build a query to find documents with similar terms search_conditions = [] params = {} for i, term in enumerate(search_terms[:3]): # Use top 3 terms param_name = f'term_{i}' search_conditions.append(f"snippet ILIKE :{param_name}") params[param_name] = f'%{term}%' if search_conditions: similar_docs = connection.execute(text(f""" SELECT embedding FROM doc_embeddings WHERE ({' OR '.join(search_conditions)}) AND embedding IS NOT NULL LIMIT 1 """), params) row = similar_docs.fetchone() if row and row[0]: return str(row[0]) # Method 2: If no similar documents found, use a representative embedding # Get an embedding from documents with good semantic content result = connection.execute(text(""" SELECT embedding FROM doc_embeddings WHERE embedding IS NOT NULL AND length(snippet) > 50 AND (snippet ILIKE '%completed%' OR snippet ILIKE '%success%' OR snippet ILIKE '%review%') ORDER BY created_at DESC LIMIT 1 """)) row = result.fetchone() if row and row[0]: return str(row[0]) # Method 3: Last resort - any embedding result = connection.execute(text(""" SELECT embedding FROM doc_embeddings WHERE embedding IS NOT NULL LIMIT 1 """)) row = result.fetchone() if row and row[0]: return str(row[0]) except Exception as e: if self.app: self.app.logger.warning(f"Embedding lookup failed: {e}") return None return None def _text_search_fallback(self, question: str, table_filter: Optional[str] = None, fk_filter: Optional[Dict] = None) -> List[Dict[str, Any]]: """Fallback to text search when vector search is not available""" try: with self.engine.connect() as connection: sql_parts = ["SELECT table_name, pk_json, snippet, created_at FROM doc_embeddings WHERE 1=1"] params = {} if table_filter: sql_parts.append("AND table_name = :table_filter") params['table_filter'] = table_filter if fk_filter: sql_parts.append("AND pk_json @> :fk_filter::jsonb") params['fk_filter'] = json.dumps(fk_filter) # Improved text search with keyword extraction and broader matching search_terms = self._extract_search_terms(question) if search_terms: # Build OR conditions for multiple search terms search_conditions = [] for i, term in enumerate(search_terms): param_name = f'query_{i}' search_conditions.append(f"snippet ILIKE :{param_name}") params[param_name] = f'%{term}%' sql_parts.append(f"AND ({' OR '.join(search_conditions)})") else: # Fallback to original query sql_parts.append("AND snippet ILIKE :query") params['query'] = f'%{question}%' sql_parts.append("ORDER BY created_at DESC LIMIT 20") final_sql = " ".join(sql_parts) result = connection.execute(text(final_sql), params) return [dict(row._mapping) for row in result] except Exception as e: if self.app: self.app.logger.error(f"Error in text search fallback for {self.db_key}: {e}") return [] def _extract_search_terms(self, question: str) -> List[str]: """Extract relevant search terms from a question for better document matching""" # Convert the question to broader search terms question_lower = question.lower() search_terms = [] # Map conceptual terms to actual content terms term_mappings = { 'milestone': ['completed', 'success', 'achieved', 'finished', 'delivered'], 'milestones': ['completed', 'success', 'achieved', 'finished', 'delivered'], 'challenge': ['issue', 'problem', 'error', 'failed', 'rough edges'], 'challenges': ['issue', 'problem', 'error', 'failed', 'rough edges'], 'events': ['note', 'completed', 'review', 'test', 'validation'], 'trip': ['trip', 'pilot', 'project'], 'last': ['recent', 'latest'], 'recent': ['recent', 'latest'], 'latest': ['recent', 'latest'], 'summary': ['summary', 'review', 'completed', 'checklist'], 'summarize': ['summary', 'review', 'completed', 'checklist'], 'important': ['success', 'completed', 'critical', 'key'], 'key': ['success', 'completed', 'critical', 'key'], } # Extract words from the question words = question_lower.replace('?', '').replace('.', '').split() # Add mapped terms for word in words: if word in term_mappings: search_terms.extend(term_mappings[word]) else: search_terms.append(word) # Remove duplicates and common stop words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should'} search_terms = [term for term in set(search_terms) if term not in stop_words and len(term) > 2] return search_terms[:8] # Limit to top 8 terms to avoid overly complex queries def summarize(self, content: List[Dict[str, Any]], instruction: str = None) -> str: """Summarize content using LLM (integrated with real AI service)""" if not content: return "No relevant content found." # Prepare the data for LLM summarization content_text = json.dumps(content, default=str, indent=2) base_instruction = instruction or ( "You are a thoughtful search agent with a warm, confident tone.\n" "Write valid GitHub-flavored Markdown (GFM). Do not use HTML.\n\n" "Personality & style:\n" "- Plainspoken, concise, helpful.\n" "- Emojis are welcome ✅ — use them sparingly to highlight sections (1-3 per section).\n" "- Prefer short paragraphs and concrete phrasing.\n\n" "Formatting rules (must follow):\n" "- Headings: start with `##`, then `###` subsections.\n" "- Bullets: `- ` per line. Ordered lists: `1. ` style.\n" "- Tables: standard GFM (header row + separator row).\n" "- Code/SQL: fenced blocks ```language.\n" "- Line discipline: add a blank line before/after headings, tables, and code.\n" "- No inline HTML, no raw <br> tags.\n\n" "Structure:\n" "1) `## Title` (one line)\n" "2) 1-2 sentence summary\n" "3) If useful, a small table\n" "4) `### Key Takeaways` (3-5 bullets)\n" "5) `### Why This Matters` (1 short paragraph)\n\n" "Return ONLY the Markdown body (no extra JSON unless your backend expects it).\n" ) prompt = f"""{base_instruction} CRITICAL FORMATTING RULES (MUST FOLLOW): - Use simple ASCII hyphens (-) for date ranges: "April 1-4, 2025" - Write bullet points as: "- Point text" (each on new line) - Use regular apostrophes (') NOT curly quotes - Use standard ASCII characters ONLY (no Unicode) - NO HTML tags or entities - Keep table formatting simple and clean - Add blank lines before/after tables and sections Here is the content to summarize: {content_text} Remember: Output ONLY clean Markdown with ASCII characters. No HTML, no Unicode symbols, no special entities.""" try: # Check if vectorize schema exists in this database first with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if not vectorize_exists: if self.app: self.app.logger.info(f"Vectorize schema not available in {self.db_key}, using fallback summarization") return self._summarize_heuristics(content, instruction) # Use the existing LLM infrastructure for natural language summarization from AI.ai import query_vectorize_generate result = query_vectorize_generate(self.db_key, prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): # Extract the generated text from the result summary = result.get('text', '') or result.get('response', '') or str(result) elif result: summary = str(result) else: summary = None if summary and summary.strip(): # Post-process to ensure clean ASCII output cleaned_summary = self._clean_markdown_output(summary.strip()) return cleaned_summary else: # Fallback to simple formatting if LLM fails return self._summarize_heuristics(content, instruction) except Exception as e: if self.app: self.app.logger.warning(f"LLM summarization failed for {self.db_key}: {e}") # Fallback to simple formatting return self._summarize_heuristics(content, instruction) def _clean_markdown_output(self, text: str) -> str: """Clean markdown output to ensure ASCII-only characters while preserving LLM personality""" if self.app: self.app.logger.info(f"Cleaning markdown, input length: {len(text)}") # Step 1: Only replace problematic Unicode characters replacements = { # Dashes that cause issues '—': '-', '–': '-', '‑': '-', # Curly quotes ''': "'", ''': "'", '"': '"', '"': '"', # Only bullet symbols that appear malformed - keep content '•': '-', # Other symbols that break rendering '…': '...', } cleaned = text for unicode_char, ascii_replacement in replacements.items(): cleaned = cleaned.replace(unicode_char, ascii_replacement) # Step 2: Only fix obvious malformed patterns lines = cleaned.split('\n') fixed_lines = [] for line in lines: stripped = line.strip() # Skip completely empty lines if not stripped: if fixed_lines and fixed_lines[-1].strip(): fixed_lines.append('') # Preserve spacing continue # Fix lines that are ONLY a single dash or bullet (malformed) if stripped in ['-', '•', '*']: continue # Fix bullet points that got mangled (- without space) if stripped.startswith('-') and len(stripped) > 1 and stripped[1] != ' ': # Only fix if it looks like a mangled bullet content = stripped[1:].strip() if content: fixed_lines.append(f"- {content}") continue # Keep everything else as-is to preserve LLM personality fixed_lines.append(line) result = '\n'.join(fixed_lines) # Step 3: Light cleanup only # Remove excessive blank lines (more than 2) while '\n\n\n\n' in result: result = result.replace('\n\n\n\n', '\n\n\n') # Step 4: Ensure ASCII only (but keep all content structure) ascii_result = ''.join(char for char in result if 32 <= ord(char) <= 126 or char in '\n\r\t') final_result = ascii_result.strip() if self.app: self.app.logger.info(f"Markdown cleaning complete, output length: {len(final_result)}") return final_result def _summarize_heuristics(self, content: List[Dict[str, Any]], instruction: str = None) -> str: """Fallback summarization using simple formatting with markdown""" if not content: return "## No Relevant Content Found\n\nNo documents were found that match your query." # Create a markdown-formatted summary summary_parts = ["## Summary"] if instruction and "question:" in instruction.lower(): # Extract the question for context question_start = instruction.lower().find("question: '") + 11 question_end = instruction.find("'", question_start) if question_end > question_start: question = instruction[question_start:question_end] summary_parts.append(f"\n**Question:** {question}\n") summary_parts.append(f"Found **{len(content)}** relevant documents:\n") for i, item in enumerate(content[:6], 1): # Limit to top 6 items if 'snippet' in item: snippet = item['snippet'][:250] + "..." if len(item['snippet']) > 250 else item['snippet'] created_at = item.get('created_at', 'Unknown date') table_name = item.get('table_name', 'Unknown source') summary_parts.append(f"### {i}. {table_name.title()} Entry") summary_parts.append(f"**Date:** {created_at}") summary_parts.append(f"**Content:** {snippet}\n") elif 'data' in item: # Handle SQL result rows row_data = item['data'] if isinstance(row_data, dict): summary_parts.append(f"### {i}. Data Entry") for key, value in list(row_data.items())[:3]: summary_parts.append(f"- **{key.title()}:** {value}") summary_parts.append("") else: # Handle direct row data (fallback) summary_parts.append(f"### {i}. Record") for key, value in list(item.items())[:3]: if key not in ['table_name', 'pk_json', 'created_at']: summary_parts.append(f"- **{key.title()}:** {value}") summary_parts.append("") if len(content) > 6: summary_parts.append(f"*... and {len(content) - 6} more documents*") return "\n".join(summary_parts) def answer(self, question: str) -> Dict[str, Any]: """Generic analysis: Let LLM decide what queries would be helpful, execute them, then synthesize""" try: # Step 1: Get full schema context schema_info = self.get_schema_info() # Step 2: Ask LLM what queries would be helpful for this question suggested_queries = self._get_suggested_queries(question, schema_info) # Step 3: Execute suggested queries safely sql_results = [] for query_info in suggested_queries: if query_info.get('sql'): result = self.safe_run_sql(query_info['sql']) if result.get('success'): sql_results.append({ 'purpose': query_info.get('purpose', 'Data retrieval'), 'sql': query_info['sql'], 'result': result }) # Step 4: Get semantic/vector search results semantic_result = self._attempt_semantic_search(question) # Step 5: Final LLM synthesis of all collected data final_response = self._generic_synthesis(question, sql_results, semantic_result, schema_info) return final_response except Exception as e: if self.app: self.app.logger.error(f"Error in generic answer method for {self.db_key}: {e}") return { 'success': False, 'error': str(e), 'question': question, 'answer_markdown': f"## Error Processing Query\n\nSorry, I encountered an error: {str(e)}", 'used': {'sql': {'ran': False}, 'semantic': {'docs_considered': 0}}, 'confidence': 0.0 } def _classify_question(self, question: str) -> Dict[str, Any]: """Classify question type using lightweight LLM or keyword fallback""" try: # Try lightweight LLM classification first (10-15 tokens) classification_prompt = f"""Classify this question quickly. Return only JSON: {{"needs_sql": true/false, "needs_semantic": true/false, "question_type": "count|list|lookup|why|summary|timeline", "confidence": 0.0-1.0}} Question: {question} Rules: - needs_sql: true for counts, lists, specific data lookups - needs_semantic: true for explanations, summaries, "why" questions - Both can be true for complex questions - confidence: how certain you are (0.5+ means run both paths)""" # Check if vectorize schema exists for LLM classification with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if vectorize_exists: from AI.ai import query_vectorize_generate result = query_vectorize_generate(self.db_key, classification_prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): response_text = result.get('text', '') or result.get('response', '') or str(result) elif result: response_text = str(result) else: response_text = None if response_text: # Try to parse JSON response try: # Clean the response to extract JSON json_start = response_text.find('{') json_end = response_text.rfind('}') + 1 if json_start >= 0 and json_end > json_start: json_str = response_text[json_start:json_end] classification = json.loads(json_str) # Validate the response if all(key in classification for key in ['needs_sql', 'needs_semantic', 'question_type']): return classification except json.JSONDecodeError: pass except Exception as e: if self.app: self.app.logger.warning(f"LLM classification failed for {self.db_key}: {e}") # Fallback to keyword-based classification classification = self._classify_question_keywords(question) if self.app: self.app.logger.info(f"Question classification for '{question}': {classification}") return classification def _classify_question_keywords(self, question: str) -> Dict[str, Any]: """Fallback keyword-based classification""" question_lower = question.lower() # SQL indicators sql_keywords = [ 'how many', 'count', 'total number', 'number of', 'list all', 'show all', 'find all', 'get all', 'which personnel', 'which trips', 'which destinations', 'where has', 'where did', 'travelled to', 'traveled to', 'most visited', 'most popular', 'top destination', 'which destination', # Add travel/trip breakdown patterns 'breakdown of', 'travel done', 'trips done', 'travel by', 'trips by', 'travel for', 'trips for', 'travel history', 'trip history', 'where did', 'what trips', 'what travel', 'destinations visited', # Bob-specific patterns "bob's travel", "bob's trips", "travel by bob", "trips by bob", # Recent/overview patterns 'most recent', 'latest', 'recent trips', 'overview', 'breakdown', 'last trip', 'current', 'newest' ] # Semantic indicators semantic_keywords = [ 'why', 'how', 'explain', 'summary', 'summarize', 'what happened', 'tell me about', 'describe', 'important', 'key', 'milestone', 'challenge', 'timeline', 'events', 'story', 'details' ] needs_sql = any(keyword in question_lower for keyword in sql_keywords) needs_semantic = any(keyword in question_lower for keyword in semantic_keywords) # Determine question type if any(word in question_lower for word in ['how many', 'count', 'total']): question_type = 'count' elif any(word in question_lower for word in ['list', 'show', 'find all']): question_type = 'list' elif any(word in question_lower for word in ['where', 'which', 'who']): question_type = 'lookup' elif any(word in question_lower for word in ['why', 'explain', 'how']): question_type = 'why' elif any(word in question_lower for word in ['summary', 'summarize', 'tell me']): question_type = 'summary' elif any(word in question_lower for word in ['timeline', 'events', 'happened']): question_type = 'timeline' else: question_type = 'lookup' # Default to running both if uncertain if not needs_sql and not needs_semantic: needs_sql = True needs_semantic = True confidence = 0.4 # Low confidence, run both else: confidence = 0.8 # Higher confidence in classification return { 'needs_sql': needs_sql, 'needs_semantic': needs_semantic, 'question_type': question_type, 'confidence': confidence } def _attempt_sql_query(self, question: str, schema_hits: List[Dict]) -> Tuple[Optional[Dict], Optional[str]]: """Safely attempt SQL query with pre-flight checks""" try: # Generate SQL sql = self.generate_sql(question, {'tables': schema_hits}) if not sql or sql.strip() == "SELECT 'No specific pattern matched for this question' as message": return None, None # Pre-flight safety check with EXPLAIN if not self._is_sql_safe_to_run(sql): if self.app: self.app.logger.warning(f"SQL query deemed unsafe: {sql}") return None, None # Execute safely result = self.safe_run_sql(sql) if result.get('success'): return result, sql else: if self.app: self.app.logger.warning(f"SQL execution failed: {result.get('error')}") return None, None except Exception as e: if self.app: self.app.logger.warning(f"SQL attempt failed: {e}") return None, None def _is_sql_safe_to_run(self, sql: str) -> bool: """Pre-flight check using EXPLAIN to detect expensive queries""" try: with self.engine.connect() as connection: # Use EXPLAIN (without ANALYZE) to check the query plan explain_sql = f"EXPLAIN {sql}" result = connection.execute(text(explain_sql)) plan_lines = [row[0] for row in result.fetchall()] # Check for expensive operations plan_text = ' '.join(plan_lines).lower() # Red flags that indicate potentially expensive queries expensive_patterns = [ 'seq scan', # Sequential scans on large tables 'nested loop', # Nested loops without proper indexes 'hash join', # Large hash joins ] # For now, allow most queries but log warnings for pattern in expensive_patterns: if pattern in plan_text: if self.app: self.app.logger.info(f"Potentially expensive query detected ({pattern}): {sql}") return True # Allow query but with awareness except Exception as e: if self.app: self.app.logger.warning(f"EXPLAIN check failed: {e}") return True # If we can't check, assume it's safe def _attempt_semantic_search(self, question: str) -> List[Dict]: """Attempt semantic search with fallback to text search""" try: return self.semantic_rows(question, k=10) except Exception as e: if self.app: self.app.logger.warning(f"Semantic search failed: {e}") return [] def _synthesize_comprehensive_answer(self, question: str, sql_executed: Optional[str], sql_result: Optional[Dict], semantic_result: List[Dict], classification: Dict) -> Dict[str, Any]: """Synthesize SQL and semantic results into a comprehensive answer""" # Prepare data for synthesis has_sql_data = sql_result and sql_result.get('success') and sql_result.get('data') has_semantic_data = bool(semantic_result) # Build comprehensive prompt for LLM synthesis_prompt = self._build_synthesis_prompt( question, sql_executed, sql_result, semantic_result, classification ) # Generate comprehensive answer try: # Check if vectorize schema exists with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if vectorize_exists: from AI.ai import query_vectorize_generate if self.app: self.app.logger.info(f"Attempting LLM synthesis for question: {question}") result = query_vectorize_generate(self.db_key, synthesis_prompt, 'ollama/gpt-oss:20b') if self.app: self.app.logger.info(f"LLM result type: {type(result)}, content preview: {str(result)[:200] if result else 'None'}") if result and isinstance(result, dict): answer_text = result.get('text', '') or result.get('response', '') or str(result) elif result: answer_text = str(result) if self.app: self.app.logger.info(f"LLM returned string directly, length: {len(answer_text)}") else: answer_text = None if answer_text and answer_text.strip(): if self.app: self.app.logger.info(f"Processing LLM response, raw length: {len(answer_text)}") answer_markdown = self._clean_markdown_output(answer_text.strip()) if self.app: self.app.logger.info(f"LLM synthesis successful, cleaned length: {len(answer_markdown)}") else: if self.app: self.app.logger.warning(f"LLM synthesis returned empty or invalid response: {repr(answer_text)}") answer_markdown = self._fallback_synthesis(question, sql_result, semantic_result) else: if self.app: self.app.logger.info(f"Vectorize schema not available, using fallback synthesis") answer_markdown = self._fallback_synthesis(question, sql_result, semantic_result) except Exception as e: if self.app: self.app.logger.warning(f"Synthesis failed with error: {e}") answer_markdown = self._fallback_synthesis(question, sql_result, semantic_result) # Calculate confidence confidence = self._calculate_confidence(has_sql_data, has_semantic_data, classification) # Build response response = { 'success': True, 'answer_markdown': answer_markdown, 'used': { 'sql': { 'ran': sql_executed is not None, 'row_count': sql_result.get('row_count', 0) if sql_result else 0, 'sql': sql_executed }, 'semantic': { 'docs_considered': len(semantic_result) } }, 'confidence': confidence, 'notes': self._generate_processing_notes(sql_executed, sql_result, semantic_result), 'question': question, 'classification': classification } # Ensure we always have a valid answer_markdown if not response['answer_markdown'] or not response['answer_markdown'].strip(): response['answer_markdown'] = "## Unable to Generate Response\n\nI encountered an issue processing your request, but the data was retrieved successfully. Please try rephrasing your question." response['confidence'] = 0.3 if self.app: self.app.logger.warning(f"Empty answer_markdown detected, using fallback message") else: if self.app: self.app.logger.info(f"Successfully generated response with {len(response['answer_markdown'])} characters") # Add comprehensive debugging for frontend issues if self.app: self.app.logger.info(f"FINAL RESPONSE DEBUG - Success: {response['success']}, Answer length: {len(response.get('answer_markdown', ''))}") self.app.logger.info(f"FINAL RESPONSE KEYS: {list(response.keys())}") self.app.logger.info(f"ANSWER PREVIEW: {response.get('answer_markdown', '')[:100]}...") return response def _build_synthesis_prompt(self, question: str, sql_executed: Optional[str], sql_result: Optional[Dict], semantic_result: List[Dict], classification: Dict) -> str: """Build comprehensive prompt for synthesis""" prompt_parts = [ "You are a thoughtful search agent with a warm, confident tone.", "Write valid GitHub-flavored Markdown (GFM). Do not use HTML.", "", "Personality & style:", "- Plainspoken, concise, helpful.", "- Emojis are welcome ✅ — use them sparingly to highlight sections (1-3 per section).", "- Prefer short paragraphs and concrete phrasing.", "", "Formatting rules (must follow):", "- Headings: start with `##`, then `###` subsections.", "- Bullets: `- ` per line. Ordered lists: `1. ` style.", "- Tables: standard GFM (header row + separator row).", "- Code/SQL: fenced blocks ```language.", "- Line discipline: add a blank line before/after headings, tables, and code.", "- No inline HTML, no raw <br> tags.", "", "Structure:", "1) `## Title` (one line that answers the question)", "2) 1-2 sentence summary combining all available data", "3) If SQL data exists, show a clean table", "4) `### Key Insights` (3-5 bullets from both SQL and semantic data)", "5) `### Why This Matters` (1 short paragraph about significance)", "6) If you used multiple sources, briefly mention what you consulted", "", f"USER QUESTION: {question}", "" ] # Add SQL section if available if sql_executed and sql_result and sql_result.get('success'): data = sql_result.get('data', []) prompt_parts.extend([ "SQL QUERY EXECUTED:", f"```sql", f"{sql_executed}", f"```", "", "SQL RESULTS:", f"{json.dumps(data, default=str, indent=2)}", f"({len(data)} rows returned)", "" ]) # Add semantic section if available if semantic_result: prompt_parts.extend([ "SEMANTIC/DOCUMENT CONTEXT:", f"{json.dumps(semantic_result[:5], default=str, indent=2)}", f"({len(semantic_result)} documents considered)", "" ]) prompt_parts.extend([ "CRITICAL FORMATTING RULES:", "- Use simple ASCII hyphens (-) for date ranges: 'April 1-4, 2025'", "- Write bullet points as: '- Point text' (each on new line)", "- Use regular apostrophes (') NOT curly quotes", "- Use standard ASCII characters ONLY (no Unicode)", "- NO HTML tags or entities", "- Make tables clean and readable", "- Add blank lines before/after tables and sections", "", "Please provide a comprehensive answer that synthesizes both the structured data (SQL) and contextual information (documents). Focus on being helpful and actionable." ]) return "\n".join(prompt_parts) def _fallback_synthesis(self, question: str, sql_result: Optional[Dict], semantic_result: List[Dict]) -> str: """Fallback synthesis when LLM is not available""" parts = ["## Team Trip Breakdown 📊" if "trip" in question.lower() else "## Comprehensive Answer"] has_sql_data = sql_result and sql_result.get('success') and sql_result.get('data') has_semantic_data = bool(semantic_result) if has_sql_data and has_semantic_data: parts.append("Found both structured data and contextual information to answer your question.") elif has_sql_data: parts.append("Found structured trip data to provide a comprehensive breakdown.") elif has_semantic_data: parts.append("Found contextual information to answer your question.") else: parts.append("No specific data found, but here's what I can tell you.") parts.append("") # Add SQL results table if has_sql_data: data = sql_result.get('data', []) if data and len(data) <= 15: # Increase limit for trip data # Build table - handle trip-specific data better if isinstance(data[0], dict): headers = list(data[0].keys()) if len(headers) <= 8: # Allow more columns for trip data # Format headers nicely formatted_headers = [] for h in headers: if h == 'trip_id': formatted_headers.append('Trip ID') elif h == 'start_date': formatted_headers.append('Start Date') elif h == 'end_date': formatted_headers.append('End Date') elif h == 'crew_count': formatted_headers.append('Crew Size') else: formatted_headers.append(h.replace('_', ' ').title()) parts.append("| " + " | ".join(formatted_headers) + " |") parts.append("|" + "|".join([" --- " for _ in formatted_headers]) + "|") for row in data: values = [] for key in headers: value = row.get(key, '') if value is None: value = '' # Format dates nicely if 'date' in key and str(value): try: from datetime import datetime if isinstance(value, str) and 'T' in value: dt = datetime.fromisoformat(value.replace('Z', '+00:00')) value = dt.strftime('%Y-%m-%d') except: pass values.append(str(value)) parts.append("| " + " | ".join(values) + " |") parts.append("") # Add key insights parts.append("### Key Findings 🔍") if has_sql_data: data = sql_result.get('data', []) if len(data) == 1 and len(data[0]) == 1: key, value = list(data[0].items())[0] parts.append(f"- The answer is **{value}**") else: parts.append(f"- Found **{len(data)}** trip records in the database") if data: # Calculate some basic stats if 'crew_count' in data[0]: crew_sizes = [row.get('crew_count', 0) for row in data] avg_crew = sum(crew_sizes) / len(crew_sizes) if crew_sizes else 0 parts.append(f"- Average crew size: **{avg_crew:.1f}** personnel per trip") parts.append(f"- Crew size range: **{min(crew_sizes)}** to **{max(crew_sizes)}** personnel") # Check date range dates = [] for row in data: if 'start_date' in row and row['start_date']: dates.append(row['start_date']) if dates: parts.append(f"- Trip timeline: **{min(dates)}** to **{max(dates)}**") if has_semantic_data: parts.append(f"- Retrieved **{len(semantic_result)}** relevant documents") if not has_sql_data and not has_semantic_data: parts.append("- No specific data found for this query") # Add a "Why This Matters" section for trip data if has_sql_data and "trip" in question.lower(): parts.append("") parts.append("### Why This Matters 🚀") parts.append("Understanding trip patterns helps with resource planning, crew scheduling, and mission optimization. This breakdown provides insights into operational tempo and team deployment strategies.") return "\n".join(parts) def _calculate_confidence(self, has_sql_data: bool, has_semantic_data: bool, classification: Dict) -> float: """Calculate confidence score based on data availability and classification""" base_confidence = classification.get('confidence', 0.5) # Boost confidence if we have the expected data type if classification.get('needs_sql') and has_sql_data: base_confidence += 0.2 if classification.get('needs_semantic') and has_semantic_data: base_confidence += 0.2 # Perfect match: both data types available when both needed if (classification.get('needs_sql') and has_sql_data and classification.get('needs_semantic') and has_semantic_data): base_confidence += 0.1 # Penalty for missing expected data if classification.get('needs_sql') and not has_sql_data: base_confidence -= 0.3 if classification.get('needs_semantic') and not has_semantic_data: base_confidence -= 0.2 return max(0.0, min(1.0, base_confidence)) # Clamp to [0, 1] def _generate_processing_notes(self, sql_executed: Optional[str], sql_result: Optional[Dict], semantic_result: List[Dict]) -> List[str]: """Generate processing notes for transparency""" notes = [] if sql_executed: if 'LIMIT' in sql_executed.upper(): notes.append("Added LIMIT for safety") if sql_result and sql_result.get('success'): row_count = sql_result.get('row_count', 0) if row_count == 0: notes.append("SQL query returned no rows") elif row_count >= 50: notes.append("Large result set - showing subset") else: notes.append("SQL execution failed") if semantic_result: if len(semantic_result) == 0: notes.append("No relevant documents found") elif len(semantic_result) >= 10: notes.append("Many documents found - showing most relevant") if not sql_executed and not semantic_result: notes.append("No data sources available") return notes def _is_data_query(self, question_lower: str) -> bool: """Check if this needs SQL for counting, listing, or specific data retrieval""" sql_keywords = [ 'how many', 'count of', 'total number', 'number of', 'list all', 'show all', 'find all', 'get all', 'which personnel', 'which trips', 'which destinations', 'select', 'display all', # Travel/destination queries 'where has', 'where did', 'what destinations', 'what locations', 'travelled to', 'traveled to', 'visited', 'been to', 'trips to', 'destinations for', 'locations for', # Popularity/ranking queries 'most visited', 'most popular', 'top destination', 'which destination', 'most frequent', 'highest', 'top', 'popular' ] return any(keyword in question_lower for keyword in sql_keywords) def _handle_sql_query(self, question: str) -> Dict[str, Any]: """Handle questions that need SQL generation""" try: # Get relevant schema elements schema_hits = self.search_catalog(question, k=12) # Generate SQL using improved method sql = self.generate_sql(question, {'tables': schema_hits}) # Execute the SQL safely result = self.safe_run_sql(sql) if result.get('success'): # Get the raw data data = result.get('data', []) # Process through LLM for rich response llm_response = self._process_sql_results_through_llm(data, question, sql) return { 'type': 'sql', 'sql': sql, 'raw_data': data, 'row_count': result.get('row_count', 0), 'answer': llm_response, 'question': question, 'schema_elements_used': len(schema_hits) } else: return { 'type': 'error', 'error': result.get('error', 'SQL execution failed'), 'sql_attempted': sql, 'question': question } except Exception as e: if self.app: self.app.logger.error(f"Error in SQL query handling for {self.db_key}: {e}") return { 'type': 'error', 'error': str(e), 'question': question } def _process_sql_results_through_llm(self, data: List[Dict[str, Any]], question: str, sql: str) -> str: """Process SQL results through LLM for rich, natural language responses""" if not data: return "No results were found for your query." # Prepare the data for LLM processing data_text = json.dumps(data, default=str, indent=2) prompt = f"""You are a thoughtful search agent with a warm, confident tone. Write valid GitHub-flavored Markdown (GFM). Do not use HTML. Personality & style: - Plainspoken, concise, helpful. - Emojis are welcome ✅ — use them sparingly to highlight sections (1-3 per section). - Prefer short paragraphs and concrete phrasing. Formatting rules (must follow): - Headings: start with `##`, then `###` subsections. - Bullets: `- ` per line. Ordered lists: `1. ` style. - Tables: standard GFM (header row + separator row). - Code/SQL: fenced blocks ```language. - Line discipline: add a blank line before/after headings, tables, and code. - No inline HTML, no raw <br> tags. Structure: 1) `## Title` (one line that answers the question) 2) 1-2 sentence summary of what was found 3) If useful, a well-formatted table showing the key data 4) `### Key Insights` (3-5 bullets highlighting important findings) 5) `### Why This Matters` (1 short paragraph about significance) USER QUESTION: {question} SQL EXECUTED: ```sql {sql} ``` QUERY RESULTS: {data_text} CRITICAL FORMATTING RULES: - Use simple ASCII hyphens (-) for date ranges: "April 1-4, 2025" - Write bullet points as: "- Point text" (each on new line) - Use regular apostrophes (') NOT curly quotes - Use standard ASCII characters ONLY (no Unicode) - NO HTML tags or entities - Make tables clean and readable - Add blank lines before/after tables and sections Please analyze these results and provide a comprehensive, well-formatted response that directly answers the user's question. Focus on making the data meaningful and actionable.""" try: # Check if vectorize schema exists in this database first with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if not vectorize_exists: if self.app: self.app.logger.info(f"Vectorize schema not available in {self.db_key}, using fallback for SQL results") return self._format_sql_result_fallback(data, question) # Use the existing LLM infrastructure from AI.ai import query_vectorize_generate result = query_vectorize_generate(self.db_key, prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): # Extract the generated text from the result llm_response = result.get('text', '') or result.get('response', '') or str(result) elif result: llm_response = str(result) else: llm_response = None if llm_response and llm_response.strip(): # Clean the response to ensure ASCII-only output cleaned_response = self._clean_markdown_output(llm_response.strip()) return cleaned_response else: # Fallback to simple formatting if LLM fails return self._format_sql_result_fallback(data, question) except Exception as e: if self.app: self.app.logger.warning(f"LLM processing of SQL results failed for {self.db_key}: {e}") # Fallback to simple formatting return self._format_sql_result_fallback(data, question) def _format_sql_result_fallback(self, data: List[Dict[str, Any]], question: str) -> str: """Fallback formatting for SQL results when LLM is not available""" if not data: return "## No Results Found\n\nYour query returned no matching records." # Create a markdown-formatted response summary_parts = [f"## Query Results"] # Add basic summary summary_parts.append(f"Found **{len(data)}** result(s) for your query.\n") # Create table if data is structured well if len(data) <= 10 and all(isinstance(row, dict) for row in data): # Get all unique keys all_keys = set() for row in data: all_keys.update(row.keys()) if len(all_keys) <= 6: # Only create table if manageable number of columns sorted_keys = sorted(all_keys) # Table header header = "| " + " | ".join(sorted_keys) + " |" separator = "|" + "|".join([" --- " for _ in sorted_keys]) + "|" summary_parts.append(header) summary_parts.append(separator) # Table rows for row in data[:5]: # Limit to first 5 for readability values = [str(row.get(key, '')) for key in sorted_keys] summary_parts.append("| " + " | ".join(values) + " |") if len(data) > 5: summary_parts.append(f"*... and {len(data) - 5} more records*") else: summary_parts.append(f"Found {len(data)} records with detailed information.") else: summary_parts.append(f"Query returned {len(data)} records.") # Add key findings summary_parts.append("### Key Findings") if len(data) == 1 and len(data[0]) == 1: # Single value result key, value = list(data[0].items())[0] summary_parts.append(f"- The answer is **{value}**") else: # Multiple results summary_parts.append(f"- Retrieved {len(data)} records") if data and isinstance(data[0], dict): summary_parts.append(f"- Data includes {len(data[0])} fields per record") return "\n".join(summary_parts) def _handle_semantic_query(self, question: str) -> Dict[str, Any]: """Handle questions using semantic search of document embeddings""" try: # Use enhanced semantic search results = self.semantic_rows(question, k=10) if results: # Use LLM to create a comprehensive summary summary = self.summarize(results, f"Answer the question: '{question}' using the following information. Provide a clear, detailed response.") return { 'type': 'semantic', 'answer': summary, 'sources': len(results), 'question': question, 'content_found': True } else: return { 'type': 'semantic', 'answer': 'No relevant content found in the document embeddings.', 'sources': 0, 'question': question, 'content_found': False } except Exception as e: if self.app: self.app.logger.error(f"Error in semantic query handling for {self.db_key}: {e}") return { 'type': 'error', 'error': str(e), 'question': question } def _format_sql_result(self, data: List[Dict[str, Any]], question: str) -> str: """Format SQL results into a natural language response""" if not data: return "No results found." # For simple counting queries if len(data) == 1 and len(data[0]) == 1: key, value = list(data[0].items())[0] if 'count' in key.lower() or 'total' in key.lower(): return f"The answer is {value}." # For list queries if 'list' in question.lower() or 'show' in question.lower(): items = [] for row in data[:5]: # Limit to first 5 for readability if 'name' in row: items.append(row['name']) else: items.append(str(list(row.values())[0])) result = ", ".join(items) if len(data) > 5: result += f" (and {len(data) - 5} more)" return result # Default formatting try: formatted_items = [] for row in data[:3]: # Show first 3 rows row_parts = [] for key, value in row.items(): if value is not None: row_parts.append(f"{key}: {value}") formatted_items.append("- " + ", ".join(row_parts)) result = "\n".join(formatted_items) if len(data) > 3: result += f"\n(and {len(data) - 3} more records)" return result except: return f"Found {len(data)} result(s)." def rebuild_catalog(self): """Rebuild the entire catalog from current schema""" try: if self.app: self.app.logger.info(f"Rebuilding catalog for database: {self.db_key}") # Ensure tables exist first self.setup_catalog_tables() # Get current schema schema_info = self.get_schema_info() # Build descriptions descriptions = self.build_catalog_descriptions(schema_info) # Clear existing catalog with self.engine.begin() as connection: # Check if table exists before deleting table_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'catalog_embeddings' ); """)).scalar() if table_exists: connection.execute(text("DELETE FROM catalog_embeddings")) # Insert new catalog entries for desc in descriptions: # In production, you'd generate actual embeddings here # For now, we'll store with a placeholder embedding connection.execute(text(""" INSERT INTO catalog_embeddings (kind, schema_name, table_name, column_name, payload) VALUES (:kind, :schema_name, :table_name, :column_name, :payload) """), { 'kind': desc['kind'], 'schema_name': desc['schema_name'], 'table_name': desc['table_name'], 'column_name': desc['column_name'], 'payload': json.dumps(desc['payload']) }) if self.app: self.app.logger.info(f"Catalog rebuilt with {len(descriptions)} entries for database: {self.db_key}") return {'success': True, 'entries_created': len(descriptions)} except Exception as e: error_msg = str(e) if self.app: self.app.logger.error(f"Error rebuilding catalog in {self.db_key}: {error_msg}") return {'error': error_msg} def populate_sample_docs(self): """Populate document embeddings with sample data from existing tables""" try: if self.app: self.app.logger.info(f"Populating sample documents for database: {self.db_key}") # Check if vector extension is available has_vector_extension = self._check_vector_extension() # Ensure tables exist first - use a fresh check with self.engine.connect() as connection: doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if not doc_exists: if self.app: self.app.logger.warning(f"doc_embeddings table does not exist in {self.db_key}, attempting to create it") self.setup_catalog_tables() # Check again after setup doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if not doc_exists: return {'success': False, 'error': 'Could not create doc_embeddings table'} doc_count = 0 # Clear existing doc embeddings (separate transaction) try: with self.engine.begin() as connection: connection.execute(text("DELETE FROM doc_embeddings WHERE table_name LIKE '%trip%' OR table_name LIKE '%personnel%'")) except Exception as clear_error: if self.app: self.app.logger.warning(f"Could not clear existing doc embeddings in {self.db_key}: {clear_error}") # Populate from descriptions table (if it exists) - separate transaction try: with self.engine.begin() as connection: descriptions_result = connection.execute(text(""" SELECT d.description_id, d.trip_id, d.note, d.created_at, t.start_date, t.end_date, p.name as author_name FROM descriptions d LEFT JOIN trips t ON t.trip_id = d.trip_id LEFT JOIN personnel p ON p.personnel_id = d.personnel_id ORDER BY d.created_at DESC LIMIT 100 """)) for row in descriptions_result: snippet = f"Trip Note — {row.author_name or 'Unknown'} — {row.start_date} to {row.end_date} — {row.note[:400]}" connection.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet, created_at) VALUES ('descriptions', :pk_json, :snippet, :created_at) """), { 'pk_json': json.dumps({'description_id': str(row.description_id), 'trip_id': str(row.trip_id)}), 'snippet': snippet, 'created_at': row.created_at }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from descriptions table in {self.db_key}: {e}") # Populate from personnel table (if it exists) - separate transaction try: with self.engine.connect() as connection: # Check what columns exist in personnel table personnel_columns = connection.execute(text(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'personnel' AND table_schema = 'public' ORDER BY ordinal_position """)).fetchall() if personnel_columns: column_names = [col[0] for col in personnel_columns] # Build dynamic query based on available columns select_parts = ['personnel_id'] # Add optional columns if they exist if 'name' in column_names: select_parts.append('name') if 'position' in column_names: select_parts.append('position') if 'age' in column_names: select_parts.append('age') if 'rank' in column_names: select_parts.append('rank') if 'department' in column_names: select_parts.append('department') personnel_query = f"SELECT {', '.join(select_parts)} FROM personnel" personnel_result = connection.execute(text(personnel_query)) # Insert in separate transaction with self.engine.begin() as insert_conn: for row in personnel_result: # Build snippet from available data snippet_parts = ["Personnel"] if hasattr(row, 'name') and row.name: snippet_parts.append(row.name) details = [] if hasattr(row, 'position') and row.position: details.append(f"Position: {row.position}") if hasattr(row, 'rank') and row.rank: details.append(f"Rank: {row.rank}") if hasattr(row, 'department') and row.department: details.append(f"Department: {row.department}") if hasattr(row, 'age') and row.age: details.append(f"Age: {row.age}") if details: snippet_parts.append(" — ".join(details)) else: snippet_parts.append("No additional details") snippet = " — ".join(snippet_parts) insert_conn.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet) VALUES ('personnel', :pk_json, :snippet) """), { 'pk_json': json.dumps({'personnel_id': str(row.personnel_id)}), 'snippet': snippet }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from personnel table in {self.db_key}: {e}") # Populate from trips table (if it exists) - separate transaction # Populate from trips table (if it exists) - separate transaction try: with self.engine.connect() as connection: # First check what columns exist in trips table trips_columns = connection.execute(text(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'trips' AND table_schema = 'public' ORDER BY ordinal_position """)).fetchall() if trips_columns: column_names = [col[0] for col in trips_columns] # Build dynamic query based on available columns select_parts = ['trip_id', 'start_date', 'end_date'] # Add optional columns if they exist if 'description' in column_names: select_parts.append('description') if 'notes' in column_names: select_parts.append('notes') if 'purpose' in column_names: select_parts.append('purpose') trips_query = f""" SELECT {', '.join(select_parts)} FROM trips ORDER BY start_date DESC LIMIT 50 """ trips_result = connection.execute(text(trips_query)) # Insert in separate transaction with self.engine.begin() as insert_conn: for row in trips_result: # Build snippet from available data snippet_parts = [f"Trip — {row.start_date} to {row.end_date}"] # Add description/notes if available if hasattr(row, 'description') and row.description: snippet_parts.append(f"Description: {row.description}") elif hasattr(row, 'notes') and row.notes: snippet_parts.append(f"Notes: {row.notes}") elif hasattr(row, 'purpose') and row.purpose: snippet_parts.append(f"Purpose: {row.purpose}") else: snippet_parts.append("No additional details") snippet = " — ".join(snippet_parts) insert_conn.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet) VALUES ('trips', :pk_json, :snippet) """), { 'pk_json': json.dumps({'trip_id': str(row.trip_id)}), 'snippet': snippet }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from trips table in {self.db_key}: {e}") if self.app: self.app.logger.info(f"Sample documents populated with {doc_count} entries for database: {self.db_key}") return {'success': True, 'message': f'Populated {doc_count} sample documents'} except Exception as e: error_msg = str(e) if self.app: self.app.logger.error(f"Error populating sample documents in {self.db_key}: {error_msg}") return {'success': False, 'error': error_msg} def debug_database_state(self) -> Dict[str, Any]: """Debug method to check current database state and schema""" debug_info = { 'database_key': self.db_key, 'catalog_table_exists': False, 'doc_table_exists': False, 'public_tables': [], 'all_schemas': [], 'vector_extension': False } try: with self.engine.connect() as connection: # Check all schemas schemas_result = connection.execute(text(""" SELECT schema_name FROM information_schema.schemata ORDER BY schema_name """)) debug_info['all_schemas'] = [row[0] for row in schemas_result] # Check all tables in public schema tables_result = connection.execute(text(""" SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """)) debug_info['public_tables'] = [{'name': row[0], 'type': row[1]} for row in tables_result] # Check if our LLM tables exist debug_info['catalog_table_exists'] = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'catalog_embeddings' ); """)).scalar() debug_info['doc_table_exists'] = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() # Check vector extension try: debug_info['vector_extension'] = connection.execute(text(""" SELECT EXISTS ( SELECT 1 FROM pg_type WHERE typname = 'vector' ); """)).scalar() except: debug_info['vector_extension'] = False # Get row counts for existing tables for table_info in debug_info['public_tables']: table_name = table_info['name'] try: count_result = connection.execute(text(f"SELECT COUNT(*) FROM {table_name}")) table_info['row_count'] = count_result.scalar() except Exception as e: table_info['row_count'] = f"Error: {str(e)}" except Exception as e: debug_info['error'] = str(e) if self.app: self.app.logger.error(f"Error in debug_database_state for {self.db_key}: {e}") return debug_info def force_rebuild_everything(self) -> Dict[str, Any]: """Force a complete rebuild of catalog and sample data""" results = { 'database_key': self.db_key, 'steps': [] } try: # Step 1: Get current state current_state = self.debug_database_state() results['steps'].append({ 'step': 'initial_state', 'status': 'completed', 'data': current_state }) # Step 2: Force recreate LLM tables if self.app: self.app.logger.info(f"Force rebuilding LLM infrastructure for {self.db_key}") # Drop existing LLM tables if they exist try: with self.engine.begin() as connection: connection.execute(text("DROP TABLE IF EXISTS catalog_embeddings CASCADE")) connection.execute(text("DROP TABLE IF EXISTS doc_embeddings CASCADE")) results['steps'].append({ 'step': 'drop_tables', 'status': 'completed' }) except Exception as e: results['steps'].append({ 'step': 'drop_tables', 'status': 'error', 'error': str(e) }) # Step 3: Recreate tables try: self.setup_catalog_tables() results['steps'].append({ 'step': 'recreate_tables', 'status': 'completed' }) except Exception as e: results['steps'].append({ 'step': 'recreate_tables', 'status': 'error', 'error': str(e) }) # Step 4: Rebuild catalog try: catalog_result = self.rebuild_catalog() results['steps'].append({ 'step': 'rebuild_catalog', 'status': 'completed', 'data': catalog_result }) except Exception as e: results['steps'].append({ 'step': 'rebuild_catalog', 'status': 'error', 'error': str(e) }) # Step 5: Populate sample docs try: docs_result = self.populate_sample_docs() results['steps'].append({ 'step': 'populate_docs', 'status': 'completed', 'data': docs_result }) except Exception as e: results['steps'].append({ 'step': 'populate_docs', 'status': 'error', 'error': str(e) }) # Step 6: Final state final_state = self.debug_database_state() results['steps'].append({ 'step': 'final_state', 'status': 'completed', 'data': final_state }) results['success'] = True except Exception as e: results['success'] = False results['error'] = str(e) if self.app: self.app.logger.error(f"Error in force_rebuild_everything for {self.db_key}: {e}") return results def ensure_tables_exist(self): """Ensure the LLM tables exist and are ready for operations""" try: with self.engine.connect() as connection: # Check if both tables exist catalog_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'catalog_embeddings' ); """)).scalar() doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if not catalog_exists or not doc_exists: if self.app: self.app.logger.info(f"Creating missing LLM tables for {self.db_key}") self.setup_catalog_tables() return True except Exception as e: if self.app: self.app.logger.error(f"Error ensuring tables exist in {self.db_key}: {e}") return False def _get_suggested_queries(self, question: str, schema_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Ask LLM what SQL queries would be helpful for answering the question""" try: # Build a comprehensive schema description for the LLM schema_description = self._build_schema_description(schema_info) prompt = f"""You are a PostgreSQL expert. Given this database schema and user question, suggest SQL queries that would help answer the question. DATABASE SCHEMA: {schema_description} USER QUESTION: {question} TASK: Suggest 1-3 SQL queries that would provide relevant data to answer this question. For each query, explain its purpose. RULES: 1. Only use tables and columns that exist in the schema above 2. Use proper PostgreSQL syntax with comprehensive JOINs 3. Add LIMIT clauses (50 or less) for safety 4. ALWAYS include related data through foreign key relationships 5. For trip queries, include destinations, personnel names, and team details 6. Use LEFT JOINs to avoid losing records when related data might be missing 7. Return valid JSON only CRITICAL JOIN PATTERNS: - trips -> teams -> personnel (to get team member names) - trips -> trip_destinations -> destinations (to get destination names/locations) - trips -> descriptions (to get trip notes) - Always include destination names when asking about trips - Always include personnel names when asking about team composition IMPORTANT SQL RULES: - NEVER use reserved keywords as table aliases (desc, order, group, select, etc.) - Use safe aliases like: d (destinations), p (personnel), tm (teams), td (trip_destinations), dn (descriptions) - Example safe query pattern: SELECT t.trip_id, t.start_date, t.end_date, array_agg(DISTINCT d.location_name) AS destinations, array_agg(DISTINCT p.name) AS team_members FROM trips t LEFT JOIN trip_destinations td ON t.trip_id = td.trip_id LEFT JOIN destinations d ON td.destination_id = d.destination_id LEFT JOIN teams tm ON t.trip_id = tm.trip_id LEFT JOIN personnel p ON tm.personnel_id = p.personnel_id RESPONSE FORMAT: ```json [ {{ "purpose": "Brief description of what this query finds", "sql": "SELECT ... FROM ... WHERE ... LIMIT 50", "rationale": "Why this query helps answer the question" }} ] ``` Return ONLY the JSON array, no other text.""" # Check if vectorize schema exists with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if vectorize_exists: from AI.ai import query_vectorize_generate if self.app: self.app.logger.info(f"Asking LLM for query suggestions for: {question}") result = query_vectorize_generate(self.db_key, prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): response_text = result.get('text', '') or result.get('response', '') or str(result) elif result: response_text = str(result) else: response_text = None if response_text: # Try to parse JSON response try: # Clean the response to extract JSON json_start = response_text.find('[') json_end = response_text.rfind(']') + 1 if json_start >= 0 and json_end > json_start: json_str = response_text[json_start:json_end] queries = json.loads(json_str) if self.app: self.app.logger.info(f"LLM suggested {len(queries)} queries") return queries if isinstance(queries, list) else [] except json.JSONDecodeError as e: if self.app: self.app.logger.warning(f"Failed to parse query suggestions JSON: {e}") else: if self.app: self.app.logger.info(f"Vectorize schema not available, using fallback query suggestions") except Exception as e: if self.app: self.app.logger.warning(f"Query suggestion failed: {e}") # Fallback: return empty list if LLM fails return [] def _build_schema_description(self, schema_info: Dict[str, Any]) -> str: """Build a comprehensive but concise schema description for the LLM""" description_parts = [] # Group data for easier processing tables_by_name = {t['table']: t for t in schema_info['tables']} columns_by_table = {} pks_by_table = {} fks_by_table = {} row_counts = {rc['table']: rc['approx_rows'] for rc in schema_info['row_counts']} # Group columns by table for col in schema_info['columns']: table_name = col['table_name'] if table_name not in columns_by_table: columns_by_table[table_name] = [] columns_by_table[table_name].append(col) # Group primary keys by table for pk in schema_info['primary_keys']: table_name = pk['table_name'] if table_name not in pks_by_table: pks_by_table[table_name] = [] pks_by_table[table_name].append(pk['column_name']) # Group foreign keys by table for fk in schema_info['foreign_keys']: table_name = fk['table_name'] if table_name not in fks_by_table: fks_by_table[table_name] = [] fks_by_table[table_name].append(f"{fk['column_name']} -> {fk['fk_table']}.{fk['fk_column']}") # Build table descriptions for table_name in sorted(tables_by_name.keys()): table_info = tables_by_name[table_name] columns = columns_by_table.get(table_name, []) pks = pks_by_table.get(table_name, []) fks = fks_by_table.get(table_name, []) row_count = row_counts.get(table_name, 0) # Build column descriptions col_descriptions = [] for col in columns: col_desc = f"{col['column_name']} ({col['data_type']})" if col['column_name'] in pks: col_desc += " [PK]" col_descriptions.append(col_desc) # Build table section table_desc = f"TABLE {table_name}:" table_desc += f"\n Columns: {', '.join(col_descriptions)}" if fks: table_desc += f"\n Foreign Keys: {', '.join(fks)}" if row_count > 0: table_desc += f"\n Rows: ~{row_count:,}" if table_info.get('table_comment'): table_desc += f"\n Comment: {table_info['table_comment']}" description_parts.append(table_desc) return "\n\n".join(description_parts) def _generic_synthesis(self, question: str, sql_results: List[Dict], semantic_result: List[Dict], schema_info: Dict[str, Any]) -> Dict[str, Any]: """Generic synthesis that works with any data""" try: # Build synthesis prompt synthesis_prompt = self._build_generic_synthesis_prompt( question, sql_results, semantic_result, schema_info ) # Generate comprehensive answer answer_markdown = None # Check if vectorize schema exists with self.engine.connect() as connection: vectorize_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.schemata WHERE schema_name = 'vectorize' ); """)).scalar() if vectorize_exists: from AI.ai import query_vectorize_generate if self.app: self.app.logger.info(f"Performing final synthesis for: {question}") result = query_vectorize_generate(self.db_key, synthesis_prompt, 'ollama/gpt-oss:20b') if result and isinstance(result, dict): answer_text = result.get('text', '') or result.get('response', '') or str(result) elif result: answer_text = str(result) else: answer_text = None if answer_text and answer_text.strip(): answer_markdown = self._clean_markdown_output(answer_text.strip()) if not answer_markdown: # Fallback synthesis answer_markdown = self._generic_fallback_synthesis(question, sql_results, semantic_result) # Calculate confidence has_sql_data = bool(sql_results) has_semantic_data = bool(semantic_result) confidence = 0.7 if has_sql_data and has_semantic_data else 0.5 if has_sql_data or has_semantic_data else 0.3 # Build response total_sql_rows = sum(result['result'].get('row_count', 0) for result in sql_results) response = { 'success': True, 'answer_markdown': answer_markdown, 'used': { 'sql': { 'ran': len(sql_results) > 0, 'queries_executed': len(sql_results), 'total_rows': total_sql_rows, 'queries': [{'purpose': r['purpose'], 'sql': r['sql']} for r in sql_results] }, 'semantic': { 'docs_considered': len(semantic_result) } }, 'confidence': confidence, 'notes': self._generate_generic_notes(sql_results, semantic_result), 'question': question, 'approach': 'generic_analysis' } # Ensure we always have a valid answer_markdown if not response['answer_markdown'] or not response['answer_markdown'].strip(): response['answer_markdown'] = "## Unable to Generate Response\n\nI encountered an issue processing your request. Please try rephrasing your question." response['confidence'] = 0.1 if self.app: self.app.logger.warning(f"Empty answer_markdown detected in generic synthesis") # Debug logging if self.app: self.app.logger.info(f"Generic synthesis complete - SQL queries: {len(sql_results)}, Semantic docs: {len(semantic_result)}") self.app.logger.info(f"FINAL RESPONSE DEBUG - Success: {response['success']}, Answer length: {len(response.get('answer_markdown', ''))}") return response except Exception as e: if self.app: self.app.logger.error(f"Error in generic synthesis: {e}") return { 'success': False, 'error': str(e), 'question': question, 'answer_markdown': f"## Error in Analysis\n\nSorry, I encountered an error during analysis: {str(e)}", 'used': {'sql': {'ran': False}, 'semantic': {'docs_considered': 0}}, 'confidence': 0.0 } def _build_generic_synthesis_prompt(self, question: str, sql_results: List[Dict], semantic_result: List[Dict], schema_info: Dict[str, Any]) -> str: """Build synthesis prompt that works with any data""" prompt_parts = [ "You are a data analyst with excellent communication skills.", "Analyze the provided data sources and answer the user's question comprehensively.", "Write valid GitHub-flavored Markdown (GFM). Use ASCII characters only.", "", "FORMATTING RULES:", "- Start with `## [Title that answers the question]`", "- Use `### Subsection` headings for organization", "- Create tables with `| Header | Header |` format when showing data", "- Use `- Point` for bullet lists", "- Use simple ASCII hyphens (-) for date ranges", "- NO Unicode characters, NO HTML tags", "", f"USER QUESTION: {question}", "" ] # Add SQL results section if sql_results: prompt_parts.extend([ "SQL QUERY RESULTS:", "" ]) for i, sql_result in enumerate(sql_results, 1): purpose = sql_result.get('purpose', 'Data query') sql = sql_result.get('sql', '') data = sql_result['result'].get('data', []) row_count = sql_result['result'].get('row_count', 0) prompt_parts.extend([ f"Query {i}: {purpose}", f"```sql", f"{sql}", f"```", f"Results ({row_count} rows):", f"```json", f"{json.dumps(data, default=str, indent=2)}", f"```", "" ]) # Add semantic results section if semantic_result: prompt_parts.extend([ "DOCUMENT/CONTEXT DATA:", f"```json", f"{json.dumps(semantic_result[:5], default=str, indent=2)}", f"```", f"({len(semantic_result)} documents total)", "" ]) prompt_parts.extend([ "INSTRUCTIONS:", "1. Synthesize ALL provided data to answer the question", "2. If SQL data shows specific records, include key details in tables", "3. If documents provide context, incorporate relevant insights", "4. Structure your response logically with clear headings", "5. Be specific and data-driven in your analysis", "6. If data is insufficient, explain what's missing", "", "Provide a comprehensive analysis that directly answers the user's question." ]) return "\n".join(prompt_parts) def _generic_fallback_synthesis(self, question: str, sql_results: List[Dict], semantic_result: List[Dict]) -> str: """Fallback synthesis when LLM is not available""" parts = ["## Analysis Results"] if sql_results or semantic_result: parts.append("Based on the available data sources, here's what I found:") else: parts.append("No relevant data was found to answer your question.") parts.append("") # Add SQL results if sql_results: parts.append("### Database Query Results") parts.append("") for i, sql_result in enumerate(sql_results, 1): purpose = sql_result.get('purpose', 'Data query') data = sql_result['result'].get('data', []) row_count = len(data) parts.append(f"**{purpose}** ({row_count} records)") # Create simple table if data is manageable if data and len(data) <= 10 and isinstance(data[0], dict): headers = list(data[0].keys()) if len(headers) <= 6: # Table header parts.append("| " + " | ".join(headers) + " |") parts.append("|" + "|".join([" --- " for _ in headers]) + "|") # Table rows for row in data[:5]: # Limit to first 5 rows values = [str(row.get(key, '')) for key in headers] parts.append("| " + " | ".join(values) + " |") if len(data) > 5: parts.append(f"*... and {len(data) - 5} more records*") else: parts.append(f"Found {row_count} records with detailed information.") else: parts.append(f"Query returned {row_count} records.") parts.append("") # Add semantic results if semantic_result: parts.append("### Related Documents") parts.append("") for i, doc in enumerate(semantic_result[:3], 1): snippet = doc.get('snippet', 'No content available')[:200] if len(doc.get('snippet', '')) > 200: snippet += "..." parts.append(f"**Document {i}**: {snippet}") parts.append("") if len(semantic_result) > 3: parts.append(f"*... and {len(semantic_result) - 3} more documents*") parts.append("") # Add summary if sql_results or semantic_result: parts.append("### Summary") parts.append(f"- Executed {len(sql_results)} database queries") parts.append(f"- Reviewed {len(semantic_result)} relevant documents") parts.append("- Combined data sources to provide comprehensive analysis") return "\n".join(parts) def _generate_generic_notes(self, sql_results: List[Dict], semantic_result: List[Dict]) -> List[str]: """Generate processing notes for generic analysis""" notes = [] if sql_results: notes.append(f"Executed {len(sql_results)} SQL queries") total_rows = sum(result['result'].get('row_count', 0) for result in sql_results) if total_rows > 100: notes.append("Large dataset processed - showing key insights") else: notes.append("No SQL queries were suggested or executed") if semantic_result: if len(semantic_result) >= 10: notes.append("Many documents found - showing most relevant") else: notes.append("No relevant documents found") if not sql_results and not semantic_result: notes.append("No data sources were available for this question") return notes def rebuild_catalog(self): """Rebuild the entire catalog from current schema""" try: if self.app: self.app.logger.info(f"Rebuilding catalog for database: {self.db_key}") # Ensure tables exist first self.setup_catalog_tables() # Get current schema schema_info = self.get_schema_info() # Build descriptions descriptions = self.build_catalog_descriptions(schema_info) # Clear existing catalog with self.engine.begin() as connection: # Check if table exists before deleting table_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'catalog_embeddings' ); """)).scalar() if table_exists: connection.execute(text("DELETE FROM catalog_embeddings")) # Insert new catalog entries for desc in descriptions: # In production, you'd generate actual embeddings here # For now, we'll store with a placeholder embedding connection.execute(text(""" INSERT INTO catalog_embeddings (kind, schema_name, table_name, column_name, payload) VALUES (:kind, :schema_name, :table_name, :column_name, :payload) """), { 'kind': desc['kind'], 'schema_name': desc['schema_name'], 'table_name': desc['table_name'], 'column_name': desc['column_name'], 'payload': json.dumps(desc['payload']) }) if self.app: self.app.logger.info(f"Catalog rebuilt with {len(descriptions)} entries for database: {self.db_key}") return {'success': True, 'entries_created': len(descriptions)} except Exception as e: error_msg = str(e) if self.app: self.app.logger.error(f"Error rebuilding catalog in {self.db_key}: {error_msg}") return {'error': error_msg} def populate_sample_docs(self): """Populate document embeddings with sample data from existing tables""" try: if self.app: self.app.logger.info(f"Populating sample documents for database: {self.db_key}") # Check if vector extension is available has_vector_extension = self._check_vector_extension() # Ensure tables exist first - use a fresh check with self.engine.connect() as connection: doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if not doc_exists: if self.app: self.app.logger.warning(f"doc_embeddings table does not exist in {self.db_key}, attempting to create it") self.setup_catalog_tables() # Check again after setup doc_exists = connection.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'doc_embeddings' ); """)).scalar() if not doc_exists: return {'success': False, 'error': 'Could not create doc_embeddings table'} doc_count = 0 # Clear existing doc embeddings (separate transaction) try: with self.engine.begin() as connection: connection.execute(text("DELETE FROM doc_embeddings WHERE table_name LIKE '%trip%' OR table_name LIKE '%personnel%'")) except Exception as clear_error: if self.app: self.app.logger.warning(f"Could not clear existing doc embeddings in {self.db_key}: {clear_error}") # Populate from descriptions table (if it exists) - separate transaction try: with self.engine.begin() as connection: descriptions_result = connection.execute(text(""" SELECT d.description_id, d.trip_id, d.note, d.created_at, t.start_date, t.end_date, p.name as author_name FROM descriptions d LEFT JOIN trips t ON t.trip_id = d.trip_id LEFT JOIN personnel p ON p.personnel_id = d.personnel_id ORDER BY d.created_at DESC LIMIT 100 """)) for row in descriptions_result: snippet = f"Trip Note — {row.author_name or 'Unknown'} — {row.start_date} to {row.end_date} — {row.note[:400]}" connection.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet, created_at) VALUES ('descriptions', :pk_json, :snippet, :created_at) """), { 'pk_json': json.dumps({'description_id': str(row.description_id), 'trip_id': str(row.trip_id)}), 'snippet': snippet, 'created_at': row.created_at }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from descriptions table in {self.db_key}: {e}") # Populate from personnel table (if it exists) - separate transaction try: with self.engine.connect() as connection: # Check what columns exist in personnel table personnel_columns = connection.execute(text(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'personnel' AND table_schema = 'public' ORDER BY ordinal_position """)).fetchall() if personnel_columns: column_names = [col[0] for col in personnel_columns] # Build dynamic query based on available columns select_parts = ['personnel_id'] # Add optional columns if they exist if 'name' in column_names: select_parts.append('name') if 'position' in column_names: select_parts.append('position') if 'age' in column_names: select_parts.append('age') if 'rank' in column_names: select_parts.append('rank') if 'department' in column_names: select_parts.append('department') personnel_query = f"SELECT {', '.join(select_parts)} FROM personnel" personnel_result = connection.execute(text(personnel_query)) # Insert in separate transaction with self.engine.begin() as insert_conn: for row in personnel_result: # Build snippet from available data snippet_parts = ["Personnel"] if hasattr(row, 'name') and row.name: snippet_parts.append(row.name) details = [] if hasattr(row, 'position') and row.position: details.append(f"Position: {row.position}") if hasattr(row, 'rank') and row.rank: details.append(f"Rank: {row.rank}") if hasattr(row, 'department') and row.department: details.append(f"Department: {row.department}") if hasattr(row, 'age') and row.age: details.append(f"Age: {row.age}") if details: snippet_parts.append(" — ".join(details)) else: snippet_parts.append("No additional details") snippet = " — ".join(snippet_parts) insert_conn.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet) VALUES ('personnel', :pk_json, :snippet) """), { 'pk_json': json.dumps({'personnel_id': str(row.personnel_id)}), 'snippet': snippet }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from personnel table in {self.db_key}: {e}") # Populate from trips table (if it exists) - separate transaction try: with self.engine.connect() as connection: # First check what columns exist in trips table trips_columns = connection.execute(text(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'trips' AND table_schema = 'public' ORDER BY ordinal_position """)).fetchall() if trips_columns: column_names = [col[0] for col in trips_columns] # Build dynamic query based on available columns select_parts = ['trip_id', 'start_date', 'end_date'] # Add optional columns if they exist if 'description' in column_names: select_parts.append('description') if 'notes' in column_names: select_parts.append('notes') if 'purpose' in column_names: select_parts.append('purpose') trips_query = f""" SELECT {', '.join(select_parts)} FROM trips ORDER BY start_date DESC LIMIT 50 """ trips_result = connection.execute(text(trips_query)) # Insert in separate transaction with self.engine.begin() as insert_conn: for row in trips_result: # Build snippet from available data snippet_parts = [f"Trip — {row.start_date} to {row.end_date}"] # Add description/notes if available if hasattr(row, 'description') and row.description: snippet_parts.append(f"Description: {row.description}") elif hasattr(row, 'notes') and row.notes: snippet_parts.append(f"Notes: {row.notes}") elif hasattr(row, 'purpose') and row.purpose: snippet_parts.append(f"Purpose: {row.purpose}") else: snippet_parts.append("No additional details") snippet = " — ".join(snippet_parts) insert_conn.execute(text(""" INSERT INTO doc_embeddings (table_name, pk_json, snippet) VALUES ('trips', :pk_json, :snippet) """), { 'pk_json': json.dumps({'trip_id': str(row.trip_id)}), 'snippet': snippet }) doc_count += 1 except Exception as e: if self.app: self.app.logger.warning(f"Could not populate from trips table in {self.db_key}: {e}") if self.app: self.app.logger.info(f"Sample documents populated with {doc_count} entries for database: {self.db_key}") return {'success': True, 'message': f'Populated {doc_count} sample documents'} except Exception as e: error_msg = str(e) if self.app: self.app.logger.error(f"Error populating sample documents in {self.db_key}: {error_msg}") return {'success': False, 'error': error_msg}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server