main.py•13.8 kB
#!/usr/bin/env python3
"""
Main entry point for MySQL MCP system
Provides interactive CLI for managing multiple MySQL databases with natural language queries
"""
import sys
import os
import json
# Add the current directory to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from db_manager import MetadataDBManager, TargetDBManager
from nlp_processor import NLPProcessor
from database_models import TableMetadata, ColumnMetadata
def display_menu():
"""Display the main menu options"""
print("\n=== MySQL MCP System ===")
print("1. List all databases")
print("2. Add a new database")
print("3. Query a database (natural language)")
print("4. Execute SQL query")
print("5. Show database tables")
print("6. Collect and store table metadata")
print("0. Exit")
print("=" * 25)
def list_databases(manager: MetadataDBManager):
"""List all registered databases"""
try:
databases = manager.get_all_databases()
if not databases:
print("No databases found.")
return
print("\nRegistered Databases:")
print("-" * 80)
print(f"{'ID':<4} {'Name':<20} {'Host':<15} {'Port':<6} {'Username':<15} {'Database':<15}")
print("-" * 80)
for db in databases:
print(f"{db.id:<4} {db.name:<20} {db.host:<15} {db.port:<6} {db.username:<15} {db.database_name:<15}")
return databases
except Exception as e:
print(f"Error listing databases: {e}")
def add_database(manager: MetadataDBManager):
"""Add a new database connection"""
try:
print("\nAdding a new database connection:")
name = input("Name: ")
host = input("Host: ")
port = int(input("Port (default 3306): ") or "3306")
username = input("Username: ")
password = input("Password: ")
database_name = input("Database Name: ")
description = input("Description (optional): ")
from database_models import DatabaseConnection
new_db = DatabaseConnection(
id=0, # Will be assigned by database
name=name,
host=host,
port=port,
username=username,
password=password,
database_name=database_name,
description=description or None
)
if manager.add_database(new_db):
print("Database added successfully!")
else:
print("Failed to add database.")
except Exception as e:
print(f"Error adding database: {e}")
def query_database_nlp(metadata_manager: MetadataDBManager):
"""Query a database using natural language"""
try:
# First, list databases so user can choose
databases = list_databases(metadata_manager)
if not databases:
return
db_id = int(input("\nEnter database ID to query: "))
natural_query = input("Enter your natural language query (e.g., 'show me all records from users'): ")
# Get database info
db_info = metadata_manager.get_database_by_id(db_id)
if not db_info:
print("Database not found.")
return
# Connect to target database
target_manager = TargetDBManager(db_info)
if not target_manager.connect():
print("Failed to connect to target database.")
return
try:
# Process natural language query
nlp = NLPProcessor(metadata_manager)
sql_query = nlp.process_query(natural_query, db_id)
print(f"Generated SQL: {sql_query}")
# Execute query
results = target_manager.execute_query(sql_query)
if results:
print(f"\nQuery Results ({len(results)} rows):")
# Print header
if results:
headers = list(results[0].keys())
print("-" * (len(headers) * 20))
print("".join(f"{h:<20}" for h in headers))
print("-" * (len(headers) * 20))
# Print rows
for row in results:
print("".join(f"{str(v):<20}" for v in row.values()))
else:
print("Query executed successfully. No results to display.")
finally:
target_manager.disconnect()
except Exception as e:
print(f"Error querying database: {e}")
def execute_sql_query(metadata_manager: MetadataDBManager):
"""Execute a raw SQL query on a database"""
try:
# First, list databases so user can choose
databases = list_databases(metadata_manager)
if not databases:
return
db_id = int(input("\nEnter database ID to query: "))
sql_query = input("Enter your SQL query: ")
# Get database info
db_info = metadata_manager.get_database_by_id(db_id)
if not db_info:
print("Database not found.")
return
# Connect to target database
target_manager = TargetDBManager(db_info)
if not target_manager.connect():
print("Failed to connect to target database.")
return
try:
# Execute query
results = target_manager.execute_query(sql_query)
if results:
print(f"\nQuery Results ({len(results)} rows):")
# Print header
if results:
headers = list(results[0].keys())
print("-" * (len(headers) * 20))
print("".join(f"{h:<20}" for h in headers))
print("-" * (len(headers) * 20))
# Print rows
for row in results:
print("".join(f"{str(v):<20}" for v in row.values()))
else:
print("Query executed successfully. No results to display.")
finally:
target_manager.disconnect()
except Exception as e:
print(f"Error executing SQL query: {e}")
def show_database_tables(metadata_manager: MetadataDBManager):
"""Show all tables in a database"""
try:
# First, list databases so user can choose
databases = list_databases(metadata_manager)
if not databases:
return
db_id = int(input("\nEnter database ID to show tables: "))
# Get database info
db_info = metadata_manager.get_database_by_id(db_id)
if not db_info:
print("Database not found.")
return
# Connect to target database
target_manager = TargetDBManager(db_info)
if not target_manager.connect():
print("Failed to connect to target database.")
return
try:
# Get tables
tables = target_manager.get_tables()
if tables:
print(f"\nTables in database '{db_info.name}':")
print("-" * 30)
for table in tables:
print(table)
else:
print("No tables found in this database.")
finally:
target_manager.disconnect()
except Exception as e:
print(f"Error showing database tables: {e}")
def collect_and_store_table_metadata(metadata_manager: MetadataDBManager):
"""Collect and store metadata for tables in a database"""
try:
# First, list databases so user can choose
databases = list_databases(metadata_manager)
if not databases:
return
db_id = int(input("\nEnter database ID to collect metadata: "))
# Get database info
db_info = metadata_manager.get_database_by_id(db_id)
if not db_info:
print("Database not found.")
return
# Connect to target database
target_manager = TargetDBManager(db_info)
if not target_manager.connect():
print("Failed to connect to target database.")
return
try:
# Get tables
tables = target_manager.get_tables()
if not tables:
print("No tables found in this database.")
return
print(f"\nCollecting metadata for {len(tables)} tables...")
for table_name in tables:
try:
# Get table schema
schema_info = target_manager.get_table_schema(table_name)
# Count rows in the table
count_results = target_manager.execute_query(f"SELECT COUNT(*) as count FROM `{table_name}`")
row_count = count_results[0]['count'] if count_results else 0
# Create TableMetadata object (without the full schema JSON to avoid JSON errors)
table_meta = TableMetadata(
id=0, # Will be assigned by database
database_id=db_id,
table_name=table_name,
table_schema="", # Keep empty to avoid JSON issues
row_count=row_count,
description=f"Metadata for table {table_name}"
)
# Use upsert to avoid duplicate records
if metadata_manager.upsert_table_metadata(table_meta):
# Get the table_id of the inserted/updated table metadata
updated_table_meta = metadata_manager.get_table_metadata(db_id, table_name)
table_id = updated_table_meta.id
# Clear existing column metadata for this table
metadata_manager.clear_column_metadata_by_table(table_id)
# Add column metadata
columns = schema_info.get('columns', [])
for col in columns:
# Handle possible None values and ensure proper data types
field_name = col.get('Field', '') or ''
data_type = col.get('Type', '') or ''
null_value = col.get('Null', 'YES') or 'YES'
is_nullable = (null_value == 'YES')
column_key = col.get('Key', '') or ''
# Handle comment properly, as it might not exist
column_comment = col.get('Comment', '') if col.get('Comment') is not None else None
# Create ColumnMetadata object
column_meta = ColumnMetadata(
id=0,
table_id=table_id,
column_name=field_name,
data_type=data_type,
is_nullable=is_nullable,
column_key=column_key,
column_comment=column_comment
)
# Insert column metadata
if not metadata_manager.add_column_metadata(column_meta):
print(f" ⚠ Warning: Failed to store metadata for column '{field_name}' in table '{table_name}'")
print(f" ✓ Metadata collected for table '{table_name}' ({row_count} rows)")
else:
print(f" ✗ Failed to store metadata for table '{table_name}'")
except Exception as e:
print(f" ✗ Error collecting metadata for table '{table_name}': {e}")
print("\nMetadata collection completed!")
finally:
target_manager.disconnect()
except Exception as e:
print(f"Error collecting table metadata: {e}")
def main():
"""Main application loop"""
# Initialize metadata database manager
metadata_manager = MetadataDBManager()
# Try to connect to metadata database
if not metadata_manager.connect():
print("Failed to connect to metadata database. Please check your configuration.")
return
try:
while True:
display_menu()
choice = input("Enter your choice: ").strip()
if choice == '0':
print("Goodbye!")
break
elif choice == '1':
list_databases(metadata_manager)
elif choice == '2':
add_database(metadata_manager)
elif choice == '3':
query_database_nlp(metadata_manager)
elif choice == '4':
execute_sql_query(metadata_manager)
elif choice == '5':
show_database_tables(metadata_manager)
elif choice == '6':
collect_and_store_table_metadata(metadata_manager)
else:
print("Invalid choice. Please try again.")
finally:
metadata_manager.disconnect()
if __name__ == "__main__":
main()