Skip to main content
Glama

DP-MCP Server

by devraj21
cli.py14.5 kB
#!/usr/bin/env python3 """ DP-MCP CLI Tool A command-line interface for testing and using DP-MCP tools directly without needing an MCP client. Usage: python cli.py <command> [options] Examples: python cli.py list-tables python cli.py describe-table users python cli.py query "SELECT * FROM users LIMIT 5" python cli.py export-csv users --limit 100 python cli.py list-buckets python cli.py upload test-bucket hello.txt "Hello World" """ import argparse import asyncio import sys from typing import Optional from dotenv import load_dotenv # Load environment variables load_dotenv() # Import tool functions from src.dp_mcp.tools.postgres_tools import ( describe_table, list_tables, execute_query, export_table_to_csv ) from src.dp_mcp.tools.minio_tools import ( list_buckets, list_objects, upload_object, download_object, create_bucket, delete_object ) class Colors: """ANSI color codes for pretty output.""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def print_header(text: str): """Print a colored header.""" print(f"\n{Colors.HEADER}{Colors.BOLD}🚀 {text}{Colors.END}") print(f"{Colors.HEADER}{'=' * (len(text) + 4)}{Colors.END}") def print_success(text: str): """Print success message.""" print(f"{Colors.GREEN}✅ {text}{Colors.END}") def print_error(text: str): """Print error message.""" print(f"{Colors.RED}❌ {text}{Colors.END}") def print_info(text: str): """Print info message.""" print(f"{Colors.CYAN}ℹ️ {text}{Colors.END}") # PostgreSQL Commands async def cmd_list_tables(args): """List all database tables.""" print_header("Listing Database Tables") try: result = await list_tables(args.schema) print(result) print_success("Tables listed successfully") except Exception as e: print_error(f"Failed to list tables: {e}") async def cmd_describe_table(args): """Describe a database table.""" print_header(f"Describing Table: {args.table}") try: result = await describe_table(args.table, args.schema) print(result) print_success("Table described successfully") except Exception as e: print_error(f"Failed to describe table: {e}") async def cmd_query(args): """Execute a SQL query.""" print_header("Executing SQL Query") print_info(f"Query: {args.sql}") print_info(f"Limit: {args.limit}") try: result = await execute_query(args.sql, args.limit) print(f"\n{result}") print_success("Query executed successfully") except Exception as e: print_error(f"Query failed: {e}") async def cmd_export_csv(args): """Export table data to CSV.""" print_header(f"Exporting Table: {args.table}") print_info(f"Limit: {args.limit}") if args.where: print_info(f"Filter: {args.where}") try: result = await export_table_to_csv(args.table, args.limit, args.where) if args.output: with open(args.output, 'w') as f: f.write(result) print_success(f"CSV exported to: {args.output}") else: print(f"\n{result}") print_success("CSV data displayed above") except Exception as e: print_error(f"Export failed: {e}") # MinIO Commands async def cmd_list_buckets(args): """List all MinIO buckets.""" print_header("Listing MinIO Buckets") try: result = await list_buckets() print(result) print_success("Buckets listed successfully") except Exception as e: print_error(f"Failed to list buckets: {e}") async def cmd_list_objects(args): """List objects in a bucket.""" print_header(f"Listing Objects in Bucket: {args.bucket}") if args.prefix: print_info(f"Prefix filter: {args.prefix}") print_info(f"Max objects: {args.max_keys}") try: result = await list_objects(args.bucket, args.prefix, args.max_keys) print(result) print_success("Objects listed successfully") except Exception as e: print_error(f"Failed to list objects: {e}") async def cmd_upload(args): """Upload data to MinIO.""" print_header("Uploading to MinIO") print_info(f"Bucket: {args.bucket}") print_info(f"Object: {args.object_name}") print_info(f"Content Type: {args.content_type}") # Read data from file or use provided text if args.file: try: with open(args.file, 'r') as f: data = f.read() print_info(f"Reading from file: {args.file}") except Exception as e: print_error(f"Failed to read file: {e}") return else: data = args.data try: result = await upload_object(args.bucket, args.object_name, data, args.content_type) print(result) print_success("Upload completed successfully") except Exception as e: print_error(f"Upload failed: {e}") async def cmd_download(args): """Download object from MinIO.""" print_header("Downloading from MinIO") print_info(f"Bucket: {args.bucket}") print_info(f"Object: {args.object_name}") try: result = await download_object(args.bucket, args.object_name) if args.output: with open(args.output, 'w') as f: f.write(result) print_success(f"Downloaded to: {args.output}") else: print(f"\n{Colors.YELLOW}--- Object Content ---{Colors.END}") print(result) print(f"{Colors.YELLOW}--- End of Content ---{Colors.END}") print_success("Object downloaded and displayed") except Exception as e: print_error(f"Download failed: {e}") async def cmd_create_bucket(args): """Create a new MinIO bucket.""" print_header(f"Creating Bucket: {args.bucket}") if args.region: print_info(f"Region: {args.region}") try: result = await create_bucket(args.bucket, args.region) print(result) print_success("Bucket creation completed") except Exception as e: print_error(f"Bucket creation failed: {e}") async def cmd_delete_object(args): """Delete object from MinIO.""" print_header("Deleting Object from MinIO") print_info(f"Bucket: {args.bucket}") print_info(f"Object: {args.object_name}") if not args.yes: confirm = input(f"{Colors.YELLOW}⚠️ Are you sure you want to delete '{args.object_name}' from '{args.bucket}'? (y/N): {Colors.END}") if confirm.lower() not in ['y', 'yes']: print_info("Operation cancelled") return try: result = await delete_object(args.bucket, args.object_name) print(result) print_success("Object deleted successfully") except Exception as e: print_error(f"Delete failed: {e}") # Combined Commands async def cmd_backup_table(args): """Backup a table to MinIO.""" print_header(f"Backing up Table: {args.table}") print_info(f"Destination bucket: {args.bucket}") print_info(f"Schema: {args.schema}") print_info(f"Limit: {args.limit}") if args.where: print_info(f"Filter: {args.where}") try: # Export table to CSV print_info("Exporting table data...") csv_data = await export_table_to_csv(args.table, args.limit, args.where) # Generate object name with timestamp from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") object_name = f"{args.schema}_{args.table}_{timestamp}.csv" # Upload to MinIO print_info(f"Uploading to {args.bucket}/{object_name}...") upload_result = await upload_object(args.bucket, object_name, csv_data, "text/csv") print(f"\n{upload_result}") print_success(f"Table backed up successfully to {args.bucket}/{object_name}") except Exception as e: print_error(f"Backup failed: {e}") def create_parser(): """Create the argument parser.""" parser = argparse.ArgumentParser( description="DP-MCP CLI Tool - Direct access to PostgreSQL and MinIO operations", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # PostgreSQL Operations python cli.py list-tables python cli.py describe-table users python cli.py query "SELECT * FROM users LIMIT 5" python cli.py export-csv users --limit 100 --output users.csv # MinIO Operations python cli.py list-buckets python cli.py list-objects default-bucket python cli.py upload default-bucket hello.txt "Hello World" python cli.py upload default-bucket data.json --file data.json python cli.py download default-bucket hello.txt --output downloaded.txt python cli.py create-bucket my-new-bucket # Combined Operations python cli.py backup-table users --bucket backups --limit 1000 """ ) subparsers = parser.add_subparsers(dest='command', help='Available commands') # PostgreSQL commands # List tables list_tables_parser = subparsers.add_parser('list-tables', help='List all database tables') list_tables_parser.add_argument('--schema', default='public', help='Database schema (default: public)') # Describe table describe_parser = subparsers.add_parser('describe-table', help='Describe a database table') describe_parser.add_argument('table', help='Table name to describe') describe_parser.add_argument('--schema', default='public', help='Database schema (default: public)') # Execute query query_parser = subparsers.add_parser('query', help='Execute a SQL query') query_parser.add_argument('sql', help='SQL query to execute') query_parser.add_argument('--limit', type=int, default=1000, help='Maximum rows to return (default: 1000)') # Export CSV export_parser = subparsers.add_parser('export-csv', help='Export table data to CSV') export_parser.add_argument('table', help='Table name to export') export_parser.add_argument('--limit', type=int, default=10000, help='Maximum rows to export (default: 10000)') export_parser.add_argument('--where', help='WHERE clause for filtering') export_parser.add_argument('--output', help='Output file path (default: display to console)') # MinIO commands # List buckets subparsers.add_parser('list-buckets', help='List all MinIO buckets') # List objects list_objects_parser = subparsers.add_parser('list-objects', help='List objects in a bucket') list_objects_parser.add_argument('bucket', help='Bucket name') list_objects_parser.add_argument('--prefix', help='Object prefix to filter by') list_objects_parser.add_argument('--max-keys', type=int, default=1000, help='Maximum objects to return (default: 1000)') # Upload upload_parser = subparsers.add_parser('upload', help='Upload data to MinIO') upload_parser.add_argument('bucket', help='Bucket name') upload_parser.add_argument('object_name', help='Object name/path') upload_parser.add_argument('data', nargs='?', help='Data to upload (or use --file)') upload_parser.add_argument('--file', help='Upload from file instead of command line data') upload_parser.add_argument('--content-type', default='text/plain', help='Content type (default: text/plain)') # Download download_parser = subparsers.add_parser('download', help='Download object from MinIO') download_parser.add_argument('bucket', help='Bucket name') download_parser.add_argument('object_name', help='Object name/path') download_parser.add_argument('--output', help='Output file path (default: display to console)') # Create bucket create_bucket_parser = subparsers.add_parser('create-bucket', help='Create a new MinIO bucket') create_bucket_parser.add_argument('bucket', help='Bucket name') create_bucket_parser.add_argument('--region', help='AWS region specification') # Delete object delete_parser = subparsers.add_parser('delete-object', help='Delete object from MinIO') delete_parser.add_argument('bucket', help='Bucket name') delete_parser.add_argument('object_name', help='Object name/path') delete_parser.add_argument('--yes', action='store_true', help='Skip confirmation prompt') # Combined commands # Backup table backup_parser = subparsers.add_parser('backup-table', help='Backup PostgreSQL table to MinIO') backup_parser.add_argument('table', help='Table name to backup') backup_parser.add_argument('--bucket', default='backups', help='Destination bucket (default: backups)') backup_parser.add_argument('--schema', default='public', help='Database schema (default: public)') backup_parser.add_argument('--limit', type=int, default=10000, help='Maximum rows to backup (default: 10000)') backup_parser.add_argument('--where', help='WHERE clause for filtering') return parser async def main(): """Main CLI function.""" parser = create_parser() args = parser.parse_args() if not args.command: print_header("DP-MCP CLI Tool") print("A command-line interface for PostgreSQL and MinIO operations.") print("\nUse --help to see available commands:") print(f"{Colors.CYAN}python cli.py --help{Colors.END}") return # Command mapping commands = { 'list-tables': cmd_list_tables, 'describe-table': cmd_describe_table, 'query': cmd_query, 'export-csv': cmd_export_csv, 'list-buckets': cmd_list_buckets, 'list-objects': cmd_list_objects, 'upload': cmd_upload, 'download': cmd_download, 'create-bucket': cmd_create_bucket, 'delete-object': cmd_delete_object, 'backup-table': cmd_backup_table, } if args.command in commands: try: await commands[args.command](args) except KeyboardInterrupt: print_info("\nOperation cancelled by user") except Exception as e: print_error(f"Unexpected error: {e}") sys.exit(1) else: print_error(f"Unknown command: {args.command}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/devraj21/dp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server