cli.py•14.5 kB
#!/usr/bin/env python3
"""
DP-MCP CLI Tool
A command-line interface for testing and using DP-MCP tools directly
without needing an MCP client.
Usage:
python cli.py <command> [options]
Examples:
python cli.py list-tables
python cli.py describe-table users
python cli.py query "SELECT * FROM users LIMIT 5"
python cli.py export-csv users --limit 100
python cli.py list-buckets
python cli.py upload test-bucket hello.txt "Hello World"
"""
import argparse
import asyncio
import sys
from typing import Optional
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Import tool functions
from src.dp_mcp.tools.postgres_tools import (
describe_table,
list_tables,
execute_query,
export_table_to_csv
)
from src.dp_mcp.tools.minio_tools import (
list_buckets,
list_objects,
upload_object,
download_object,
create_bucket,
delete_object
)
class Colors:
"""ANSI color codes for pretty output."""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_header(text: str):
"""Print a colored header."""
print(f"\n{Colors.HEADER}{Colors.BOLD}🚀 {text}{Colors.END}")
print(f"{Colors.HEADER}{'=' * (len(text) + 4)}{Colors.END}")
def print_success(text: str):
"""Print success message."""
print(f"{Colors.GREEN}✅ {text}{Colors.END}")
def print_error(text: str):
"""Print error message."""
print(f"{Colors.RED}❌ {text}{Colors.END}")
def print_info(text: str):
"""Print info message."""
print(f"{Colors.CYAN}ℹ️ {text}{Colors.END}")
# PostgreSQL Commands
async def cmd_list_tables(args):
"""List all database tables."""
print_header("Listing Database Tables")
try:
result = await list_tables(args.schema)
print(result)
print_success("Tables listed successfully")
except Exception as e:
print_error(f"Failed to list tables: {e}")
async def cmd_describe_table(args):
"""Describe a database table."""
print_header(f"Describing Table: {args.table}")
try:
result = await describe_table(args.table, args.schema)
print(result)
print_success("Table described successfully")
except Exception as e:
print_error(f"Failed to describe table: {e}")
async def cmd_query(args):
"""Execute a SQL query."""
print_header("Executing SQL Query")
print_info(f"Query: {args.sql}")
print_info(f"Limit: {args.limit}")
try:
result = await execute_query(args.sql, args.limit)
print(f"\n{result}")
print_success("Query executed successfully")
except Exception as e:
print_error(f"Query failed: {e}")
async def cmd_export_csv(args):
"""Export table data to CSV."""
print_header(f"Exporting Table: {args.table}")
print_info(f"Limit: {args.limit}")
if args.where:
print_info(f"Filter: {args.where}")
try:
result = await export_table_to_csv(args.table, args.limit, args.where)
if args.output:
with open(args.output, 'w') as f:
f.write(result)
print_success(f"CSV exported to: {args.output}")
else:
print(f"\n{result}")
print_success("CSV data displayed above")
except Exception as e:
print_error(f"Export failed: {e}")
# MinIO Commands
async def cmd_list_buckets(args):
"""List all MinIO buckets."""
print_header("Listing MinIO Buckets")
try:
result = await list_buckets()
print(result)
print_success("Buckets listed successfully")
except Exception as e:
print_error(f"Failed to list buckets: {e}")
async def cmd_list_objects(args):
"""List objects in a bucket."""
print_header(f"Listing Objects in Bucket: {args.bucket}")
if args.prefix:
print_info(f"Prefix filter: {args.prefix}")
print_info(f"Max objects: {args.max_keys}")
try:
result = await list_objects(args.bucket, args.prefix, args.max_keys)
print(result)
print_success("Objects listed successfully")
except Exception as e:
print_error(f"Failed to list objects: {e}")
async def cmd_upload(args):
"""Upload data to MinIO."""
print_header("Uploading to MinIO")
print_info(f"Bucket: {args.bucket}")
print_info(f"Object: {args.object_name}")
print_info(f"Content Type: {args.content_type}")
# Read data from file or use provided text
if args.file:
try:
with open(args.file, 'r') as f:
data = f.read()
print_info(f"Reading from file: {args.file}")
except Exception as e:
print_error(f"Failed to read file: {e}")
return
else:
data = args.data
try:
result = await upload_object(args.bucket, args.object_name, data, args.content_type)
print(result)
print_success("Upload completed successfully")
except Exception as e:
print_error(f"Upload failed: {e}")
async def cmd_download(args):
"""Download object from MinIO."""
print_header("Downloading from MinIO")
print_info(f"Bucket: {args.bucket}")
print_info(f"Object: {args.object_name}")
try:
result = await download_object(args.bucket, args.object_name)
if args.output:
with open(args.output, 'w') as f:
f.write(result)
print_success(f"Downloaded to: {args.output}")
else:
print(f"\n{Colors.YELLOW}--- Object Content ---{Colors.END}")
print(result)
print(f"{Colors.YELLOW}--- End of Content ---{Colors.END}")
print_success("Object downloaded and displayed")
except Exception as e:
print_error(f"Download failed: {e}")
async def cmd_create_bucket(args):
"""Create a new MinIO bucket."""
print_header(f"Creating Bucket: {args.bucket}")
if args.region:
print_info(f"Region: {args.region}")
try:
result = await create_bucket(args.bucket, args.region)
print(result)
print_success("Bucket creation completed")
except Exception as e:
print_error(f"Bucket creation failed: {e}")
async def cmd_delete_object(args):
"""Delete object from MinIO."""
print_header("Deleting Object from MinIO")
print_info(f"Bucket: {args.bucket}")
print_info(f"Object: {args.object_name}")
if not args.yes:
confirm = input(f"{Colors.YELLOW}⚠️ Are you sure you want to delete '{args.object_name}' from '{args.bucket}'? (y/N): {Colors.END}")
if confirm.lower() not in ['y', 'yes']:
print_info("Operation cancelled")
return
try:
result = await delete_object(args.bucket, args.object_name)
print(result)
print_success("Object deleted successfully")
except Exception as e:
print_error(f"Delete failed: {e}")
# Combined Commands
async def cmd_backup_table(args):
"""Backup a table to MinIO."""
print_header(f"Backing up Table: {args.table}")
print_info(f"Destination bucket: {args.bucket}")
print_info(f"Schema: {args.schema}")
print_info(f"Limit: {args.limit}")
if args.where:
print_info(f"Filter: {args.where}")
try:
# Export table to CSV
print_info("Exporting table data...")
csv_data = await export_table_to_csv(args.table, args.limit, args.where)
# Generate object name with timestamp
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
object_name = f"{args.schema}_{args.table}_{timestamp}.csv"
# Upload to MinIO
print_info(f"Uploading to {args.bucket}/{object_name}...")
upload_result = await upload_object(args.bucket, object_name, csv_data, "text/csv")
print(f"\n{upload_result}")
print_success(f"Table backed up successfully to {args.bucket}/{object_name}")
except Exception as e:
print_error(f"Backup failed: {e}")
def create_parser():
"""Create the argument parser."""
parser = argparse.ArgumentParser(
description="DP-MCP CLI Tool - Direct access to PostgreSQL and MinIO operations",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# PostgreSQL Operations
python cli.py list-tables
python cli.py describe-table users
python cli.py query "SELECT * FROM users LIMIT 5"
python cli.py export-csv users --limit 100 --output users.csv
# MinIO Operations
python cli.py list-buckets
python cli.py list-objects default-bucket
python cli.py upload default-bucket hello.txt "Hello World"
python cli.py upload default-bucket data.json --file data.json
python cli.py download default-bucket hello.txt --output downloaded.txt
python cli.py create-bucket my-new-bucket
# Combined Operations
python cli.py backup-table users --bucket backups --limit 1000
"""
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# PostgreSQL commands
# List tables
list_tables_parser = subparsers.add_parser('list-tables', help='List all database tables')
list_tables_parser.add_argument('--schema', default='public', help='Database schema (default: public)')
# Describe table
describe_parser = subparsers.add_parser('describe-table', help='Describe a database table')
describe_parser.add_argument('table', help='Table name to describe')
describe_parser.add_argument('--schema', default='public', help='Database schema (default: public)')
# Execute query
query_parser = subparsers.add_parser('query', help='Execute a SQL query')
query_parser.add_argument('sql', help='SQL query to execute')
query_parser.add_argument('--limit', type=int, default=1000, help='Maximum rows to return (default: 1000)')
# Export CSV
export_parser = subparsers.add_parser('export-csv', help='Export table data to CSV')
export_parser.add_argument('table', help='Table name to export')
export_parser.add_argument('--limit', type=int, default=10000, help='Maximum rows to export (default: 10000)')
export_parser.add_argument('--where', help='WHERE clause for filtering')
export_parser.add_argument('--output', help='Output file path (default: display to console)')
# MinIO commands
# List buckets
subparsers.add_parser('list-buckets', help='List all MinIO buckets')
# List objects
list_objects_parser = subparsers.add_parser('list-objects', help='List objects in a bucket')
list_objects_parser.add_argument('bucket', help='Bucket name')
list_objects_parser.add_argument('--prefix', help='Object prefix to filter by')
list_objects_parser.add_argument('--max-keys', type=int, default=1000, help='Maximum objects to return (default: 1000)')
# Upload
upload_parser = subparsers.add_parser('upload', help='Upload data to MinIO')
upload_parser.add_argument('bucket', help='Bucket name')
upload_parser.add_argument('object_name', help='Object name/path')
upload_parser.add_argument('data', nargs='?', help='Data to upload (or use --file)')
upload_parser.add_argument('--file', help='Upload from file instead of command line data')
upload_parser.add_argument('--content-type', default='text/plain', help='Content type (default: text/plain)')
# Download
download_parser = subparsers.add_parser('download', help='Download object from MinIO')
download_parser.add_argument('bucket', help='Bucket name')
download_parser.add_argument('object_name', help='Object name/path')
download_parser.add_argument('--output', help='Output file path (default: display to console)')
# Create bucket
create_bucket_parser = subparsers.add_parser('create-bucket', help='Create a new MinIO bucket')
create_bucket_parser.add_argument('bucket', help='Bucket name')
create_bucket_parser.add_argument('--region', help='AWS region specification')
# Delete object
delete_parser = subparsers.add_parser('delete-object', help='Delete object from MinIO')
delete_parser.add_argument('bucket', help='Bucket name')
delete_parser.add_argument('object_name', help='Object name/path')
delete_parser.add_argument('--yes', action='store_true', help='Skip confirmation prompt')
# Combined commands
# Backup table
backup_parser = subparsers.add_parser('backup-table', help='Backup PostgreSQL table to MinIO')
backup_parser.add_argument('table', help='Table name to backup')
backup_parser.add_argument('--bucket', default='backups', help='Destination bucket (default: backups)')
backup_parser.add_argument('--schema', default='public', help='Database schema (default: public)')
backup_parser.add_argument('--limit', type=int, default=10000, help='Maximum rows to backup (default: 10000)')
backup_parser.add_argument('--where', help='WHERE clause for filtering')
return parser
async def main():
"""Main CLI function."""
parser = create_parser()
args = parser.parse_args()
if not args.command:
print_header("DP-MCP CLI Tool")
print("A command-line interface for PostgreSQL and MinIO operations.")
print("\nUse --help to see available commands:")
print(f"{Colors.CYAN}python cli.py --help{Colors.END}")
return
# Command mapping
commands = {
'list-tables': cmd_list_tables,
'describe-table': cmd_describe_table,
'query': cmd_query,
'export-csv': cmd_export_csv,
'list-buckets': cmd_list_buckets,
'list-objects': cmd_list_objects,
'upload': cmd_upload,
'download': cmd_download,
'create-bucket': cmd_create_bucket,
'delete-object': cmd_delete_object,
'backup-table': cmd_backup_table,
}
if args.command in commands:
try:
await commands[args.command](args)
except KeyboardInterrupt:
print_info("\nOperation cancelled by user")
except Exception as e:
print_error(f"Unexpected error: {e}")
sys.exit(1)
else:
print_error(f"Unknown command: {args.command}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())