llmDatabaseBlueprint.py•13.5 kB
from flask import Blueprint, request, jsonify, current_app
from sqlalchemy import text
import json
from llmDatabaseRouter import LLMDatabaseRouter
llm_db_bp = Blueprint('llm_db', __name__)
def get_router_for_db(db_key: str) -> LLMDatabaseRouter:
"""Get an LLM database router instance for the specified database"""
try:
engine = current_app.db.get_engine(current_app, bind=db_key)
return LLMDatabaseRouter(engine, db_key, current_app)
except Exception as e:
current_app.logger.error(f"Error creating router for {db_key}: {e}")
raise
@llm_db_bp.route('/rebuild_catalog', methods=['POST'])
def rebuild_catalog():
"""Rebuild the schema catalog for a database"""
data = request.get_json() or {}
db_key = data.get('db', 'db1')
try:
router = get_router_for_db(db_key)
result = router.rebuild_catalog()
if result.get('success'):
return jsonify({
"success": True,
"message": f"Catalog rebuilt for {db_key}",
"entries_created": result.get('entries_created', 0)
}), 200
else:
return jsonify({
"success": False,
"error": result.get('error', 'Unknown error')
}), 500
except Exception as e:
current_app.logger.error(f"Error rebuilding catalog for {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/schema_info', methods=['GET'])
def get_schema_info():
"""Get comprehensive schema information for a database"""
db_key = request.args.get('db', 'db1')
try:
router = get_router_for_db(db_key)
schema_info = router.get_schema_info()
return jsonify({
"success": True,
"database": db_key,
"schema_info": schema_info
}), 200
except Exception as e:
current_app.logger.error(f"Error getting schema info for {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/search_catalog', methods=['POST'])
def search_catalog():
"""Search the schema catalog for relevant elements"""
data = request.get_json()
if not data or 'query' not in data:
return jsonify({"error": "Missing 'query' in request body"}), 400
db_key = data.get('db', 'db1')
query = data.get('query')
k = data.get('k', 12)
try:
router = get_router_for_db(db_key)
results = router.search_catalog(query, k)
return jsonify({
"success": True,
"query": query,
"results": results,
"count": len(results)
}), 200
except Exception as e:
current_app.logger.error(f"Error searching catalog in {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/safe_sql', methods=['POST'])
def execute_safe_sql():
"""Execute SQL with safety guardrails"""
data = request.get_json()
if not data or 'sql' not in data:
return jsonify({"error": "Missing 'sql' in request body"}), 400
db_key = data.get('db', 'db1')
sql = data.get('sql')
limit_safe = data.get('limit_safe', True)
try:
router = get_router_for_db(db_key)
result = router.safe_run_sql(sql, limit_safe)
if result.get('success'):
return jsonify({
"success": True,
"result": result
}), 200
else:
return jsonify({
"success": False,
"error": result.get('error'),
"sql_attempted": result.get('sql_attempted')
}), 400
except Exception as e:
current_app.logger.error(f"Error executing SQL in {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/generate_sql', methods=['POST'])
def generate_sql():
"""Generate SQL from natural language question"""
data = request.get_json()
if not data or 'question' not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
db_key = data.get('db', 'db1')
question = data.get('question')
try:
router = get_router_for_db(db_key)
# Get relevant schema elements
schema_hits = router.search_catalog(question, k=12)
# Generate SQL
sql = router.generate_sql(question, {'tables': schema_hits})
return jsonify({
"success": True,
"question": question,
"generated_sql": sql,
"schema_elements_used": len(schema_hits),
"schema_context": schema_hits
}), 200
except Exception as e:
current_app.logger.error(f"Error generating SQL in {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/semantic_search', methods=['POST'])
def semantic_search():
"""Search for semantic content in the database"""
data = request.get_json()
if not data or 'question' not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
db_key = data.get('db', 'db1')
question = data.get('question')
table_filter = data.get('table_filter')
fk_filter = data.get('fk_filter')
try:
router = get_router_for_db(db_key)
results = router.semantic_rows(question, table_filter, fk_filter)
return jsonify({
"success": True,
"question": question,
"results": results,
"count": len(results)
}), 200
except Exception as e:
current_app.logger.error(f"Error in semantic search for {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/answer', methods=['POST'])
def answer_question():
"""Main endpoint to answer natural language questions about the database"""
data = request.get_json()
if not data or 'question' not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
db_key = data.get('db', 'db1')
question = data.get('question')
try:
router = get_router_for_db(db_key)
result = router.answer(question)
return jsonify({
"success": True,
"database": db_key,
**result
}), 200
except Exception as e:
current_app.logger.error(f"Error answering question in {db_key}: {e}")
return jsonify({"error": str(e)}), 500
@llm_db_bp.route('/populate_sample_docs', methods=['POST'])
def populate_sample_docs():
"""Populate document embeddings with sample data from existing tables"""
data = request.get_json() or {}
db_key = data.get('db', 'db1')
try:
router = get_router_for_db(db_key)
# Sample document population for trip management tables
with router.engine.begin() as connection:
# Clear existing doc embeddings
connection.execute(text("DELETE FROM doc_embeddings WHERE table_name LIKE '%trip%' OR table_name LIKE '%personnel%'"))
# Populate from descriptions table (if it exists)
try:
descriptions_result = connection.execute(text("""
SELECT d.description_id, d.trip_id, d.note, d.created_at,
t.start_date, t.end_date,
p.name as author_name
FROM descriptions d
LEFT JOIN trips t ON t.trip_id = d.trip_id
LEFT JOIN personnel p ON p.personnel_id = d.personnel_id
ORDER BY d.created_at DESC
LIMIT 100
"""))
doc_count = 0
for row in descriptions_result:
snippet = f"Trip Note — {row.author_name or 'Unknown'} — {row.start_date} to {row.end_date} — {row.note[:400]}"
connection.execute(text("""
INSERT INTO doc_embeddings (table_name, pk_json, snippet, created_at)
VALUES ('descriptions', :pk_json, :snippet, :created_at)
"""), {
'pk_json': json.dumps({'description_id': str(row.description_id), 'trip_id': str(row.trip_id)}),
'snippet': snippet,
'created_at': row.created_at
})
doc_count += 1
# Populate from personnel table
personnel_result = connection.execute(text("""
SELECT personnel_id, name, position, age
FROM personnel
"""))
for row in personnel_result:
snippet = f"Personnel — {row.name} — Position: {row.position or 'Unknown'} — Age: {row.age or 'Unknown'}"
connection.execute(text("""
INSERT INTO doc_embeddings (table_name, pk_json, snippet)
VALUES ('personnel', :pk_json, :snippet)
"""), {
'pk_json': json.dumps({'personnel_id': str(row.personnel_id)}),
'snippet': snippet
})
doc_count += 1
return jsonify({
"success": True,
"message": f"Populated {doc_count} document embeddings for {db_key}",
"documents_created": doc_count
}), 200
except Exception as e:
# If trip tables don't exist, just return success with 0 docs
return jsonify({
"success": True,
"message": f"No suitable tables found for document population in {db_key}",
"documents_created": 0
}), 200
except Exception as e:
current_app.logger.error(f"Error populating sample docs for {db_key}: {e}")
return jsonify({"error": str(e)}), 500
# Documentation endpoint
@llm_db_bp.route('/help', methods=['GET'])
def get_help():
"""Get API documentation for the LLM database router"""
documentation = {
"endpoints": {
"/rebuild_catalog": {
"method": "POST",
"description": "Rebuild the schema catalog for a database",
"parameters": {
"db": "Database key (default: 'db1')"
}
},
"/schema_info": {
"method": "GET",
"description": "Get comprehensive schema information",
"parameters": {
"db": "Database key (default: 'db1')"
}
},
"/search_catalog": {
"method": "POST",
"description": "Search schema catalog for relevant elements",
"parameters": {
"db": "Database key (default: 'db1')",
"query": "Search query (required)",
"k": "Number of results to return (default: 12)"
}
},
"/safe_sql": {
"method": "POST",
"description": "Execute SQL with safety guardrails",
"parameters": {
"db": "Database key (default: 'db1')",
"sql": "SQL query to execute (required)",
"limit_safe": "Auto-inject LIMIT clause (default: true)"
}
},
"/generate_sql": {
"method": "POST",
"description": "Generate SQL from natural language",
"parameters": {
"db": "Database key (default: 'db1')",
"question": "Natural language question (required)"
}
},
"/semantic_search": {
"method": "POST",
"description": "Search for semantic content",
"parameters": {
"db": "Database key (default: 'db1')",
"question": "Search question (required)",
"table_filter": "Filter by table name (optional)",
"fk_filter": "Filter by foreign key values (optional)"
}
},
"/answer": {
"method": "POST",
"description": "Answer natural language questions about the database",
"parameters": {
"db": "Database key (default: 'db1')",
"question": "Question to answer (required)"
}
},
"/populate_sample_docs": {
"method": "POST",
"description": "Populate document embeddings with sample data",
"parameters": {
"db": "Database key (default: 'db1')"
}
}
},
"examples": {
"answer_questions": [
"Who was on the most recent trip?",
"What were the key events on that trip?",
"How many trips were there in 2025?",
"Show me all personnel in the Engineering role"
]
}
}
return jsonify(documentation), 200