import argparse
import logging
import json
import sys
from typing import Optional
from .client import SparkHistoryClient
from .llm_client import LLMClient
from .optimizer.engine import OptimizationEngine
def setup_logging(verbose: bool):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')
def run_cli():
parser = argparse.ArgumentParser(description="Agentic Spark Job Optimization System")
parser.add_argument("--appId", required=True, help="Spark Application ID to analyze")
parser.add_argument("--jobCode", help="Path to the Spark job source code file")
parser.add_argument("--historyUrl", default="http://localhost:18080", help="Spark History Server URL")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--output", help="Output file for the report (JSON)")
args = parser.parse_args()
setup_logging(args.verbose)
# Initialize components
spark_client = SparkHistoryClient(base_url=args.historyUrl)
llm_client = LLMClient() # Requires GEMINI_API_KEY environment variable
engine = OptimizationEngine(spark_client, llm_client)
print(f"🤖 Starting Analysis for App: {args.appId}...")
try:
report = engine.analyze_application(args.appId, code_path=args.jobCode)
report_dict = report.to_dict()
# Output handling
json_output = json.dumps(report_dict, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(json_output)
print(f"✅ Report saved to {args.output}")
else:
print("\n" + "="*50)
print("OPTIMIZATION REPORT")
print("="*50)
print(json_output)
print("="*50)
except Exception as e:
logging.error(f"Analysis Failed: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
run_cli()