"""
AI Hallucination Detector
Main orchestrator for detecting AI coding assistant hallucinations in Python scripts.
Combines AST analysis, knowledge graph validation, and comprehensive reporting.
"""
import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
from src.core.exceptions import AnalysisError, ParsingError, QueryError
from src.knowledge_graph.ai_script_analyzer import AIScriptAnalyzer
from src.knowledge_graph.hallucination_reporter import HallucinationReporter
from src.knowledge_graph.knowledge_graph_validator import KnowledgeGraphValidator
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class AIHallucinationDetector:
"""Main detector class that orchestrates the entire process"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str):
self.validator = KnowledgeGraphValidator(neo4j_uri, neo4j_user, neo4j_password)
self.reporter = HallucinationReporter()
self.analyzer = AIScriptAnalyzer()
async def initialize(self) -> None:
"""Initialize connections and components"""
await self.validator.initialize()
logger.info("AI Hallucination Detector initialized successfully")
async def close(self) -> None:
"""Close connections"""
await self.validator.close()
async def detect_hallucinations(
self,
script_path: str,
output_dir: str | None = None,
*,
save_json: bool = True,
save_markdown: bool = True,
print_summary: bool = True,
) -> dict[str, Any]:
"""
Main detection function that analyzes a script and generates reports
Args:
script_path: Path to the AI-generated Python script
output_dir: Directory to save reports (defaults to script directory)
save_json: Whether to save JSON report
save_markdown: Whether to save Markdown report
print_summary: Whether to print summary to console
Returns:
Complete validation report as dictionary
"""
logger.info("Starting hallucination detection for: %s", script_path)
# Validate input
if not Path(script_path).exists():
msg = f"Script not found: {script_path}"
raise FileNotFoundError(msg)
if not script_path.endswith(".py"):
msg = "Only Python (.py) files are supported"
raise ValueError(msg)
# Set output directory
if output_dir is None:
output_dir = str(Path(script_path).parent)
Path(output_dir).mkdir(parents=True, exist_ok=True)
try:
# Step 1: Analyze the script using AST
logger.info("Step 1: Analyzing script structure...")
analysis_result = self.analyzer.analyze_script(script_path)
if analysis_result.errors:
logger.warning("Analysis warnings: %s", analysis_result.errors)
logger.info(
"Found: %s imports, %s class instantiations, %s method calls, "
"%s function calls, %s attribute accesses",
len(analysis_result.imports),
len(analysis_result.class_instantiations),
len(analysis_result.method_calls),
len(analysis_result.function_calls),
len(analysis_result.attribute_accesses),
)
# Step 2: Validate against knowledge graph
logger.info("Step 2: Validating against knowledge graph...")
validation_result = await self.validator.validate_script(analysis_result)
logger.info(
"Validation complete. Overall confidence: %.1f%%",
validation_result.overall_confidence * 100,
)
# Step 3: Generate comprehensive report
logger.info("Step 3: Generating reports...")
report = self.reporter.generate_comprehensive_report(validation_result)
# Step 4: Save reports
script_name = Path(script_path).stem
if save_json:
json_path = str(
Path(output_dir) / f"{script_name}_hallucination_report.json",
)
self.reporter.save_json_report(report, json_path)
if save_markdown:
md_path = str(
Path(output_dir) / f"{script_name}_hallucination_report.md",
)
self.reporter.save_markdown_report(report, md_path)
# Step 5: Print summary
if print_summary:
self.reporter.print_summary(report)
logger.info("Hallucination detection completed successfully")
except (ParsingError, AnalysisError, QueryError):
logger.exception("Analysis/Query error during hallucination detection")
raise
except Exception:
logger.exception("Unexpected error during hallucination detection")
raise
else:
return report
async def batch_detect(self, script_paths: list[str],
output_dir: str | None = None) -> list[dict[str, Any]]:
"""
Detect hallucinations in multiple scripts
Args:
script_paths: List of paths to Python scripts
output_dir: Directory to save all reports
Returns:
List of validation reports
"""
logger.info("Starting batch detection for %s scripts", len(script_paths))
results = []
for i, script_path in enumerate(script_paths, 1):
logger.info(
"Processing script %s/%s: %s",
i,
len(script_paths),
script_path,
)
try:
result = await self.detect_hallucinations(
script_path=script_path,
output_dir=output_dir,
print_summary=False,
)
results.append(result)
except (ParsingError, AnalysisError, QueryError):
logger.exception("Analysis/Query error processing %s", script_path)
# Continue with other scripts
continue
except Exception:
logger.exception("Unexpected error processing %s", script_path)
# Continue with other scripts
continue
# Print batch summary
self._print_batch_summary(results)
return results
def _print_batch_summary(self, results: list[dict[str, Any]]) -> None:
"""Print summary of batch processing results"""
if not results:
print("No scripts were successfully processed.")
return
print("\n" + "="*80)
print("š BATCH HALLUCINATION DETECTION SUMMARY")
print("="*80)
total_scripts = len(results)
total_validations = sum(
r["validation_summary"]["total_validations"] for r in results
)
total_valid = sum(
r["validation_summary"]["valid_count"] for r in results
)
total_invalid = sum(
r["validation_summary"]["invalid_count"] for r in results
)
total_not_found = sum(
r["validation_summary"]["not_found_count"] for r in results
)
total_hallucinations = sum(
len(r["hallucinations_detected"]) for r in results
)
avg_confidence = (
sum(r["validation_summary"]["overall_confidence"] for r in results)
/ total_scripts
)
print(f"Scripts Processed: {total_scripts}")
print(f"Total Validations: {total_validations}")
print(f"Average Confidence: {avg_confidence:.1%}")
print(f"Total Hallucinations: {total_hallucinations}")
print("\nAggregated Results:")
print(
f" ā
Valid: {total_valid} ({total_valid/total_validations:.1%})",
)
print(
f" ā Invalid: {total_invalid} ({total_invalid/total_validations:.1%})",
)
print(
f" š Not Found: {total_not_found} "
f"({total_not_found/total_validations:.1%})",
)
# Show worst performing scripts
print("\nšØ Scripts with Most Hallucinations:")
sorted_results = sorted(
results,
key=lambda x: len(x["hallucinations_detected"]),
reverse=True,
)
for result in sorted_results[:5]:
script_name = Path(result["analysis_metadata"]["script_path"]).name
hall_count = len(result["hallucinations_detected"])
confidence = result["validation_summary"]["overall_confidence"]
print(
f" - {script_name}: {hall_count} hallucinations "
f"({confidence:.1%} confidence)",
)
print("="*80)
async def main() -> None:
"""Command-line interface for the AI Hallucination Detector"""
parser = argparse.ArgumentParser(
description="Detect AI coding assistant hallucinations in Python scripts",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Analyze single script
python ai_hallucination_detector.py script.py
# Analyze multiple scripts
python ai_hallucination_detector.py script1.py script2.py script3.py
# Specify output directory
python ai_hallucination_detector.py script.py --output-dir reports/
# Skip markdown report
python ai_hallucination_detector.py script.py --no-markdown
""",
)
parser.add_argument(
"scripts",
nargs="+",
help="Python script(s) to analyze for hallucinations",
)
parser.add_argument(
"--output-dir",
help="Directory to save reports (defaults to script directory)",
)
parser.add_argument(
"--no-json",
action="store_true",
help="Skip JSON report generation",
)
parser.add_argument(
"--no-markdown",
action="store_true",
help="Skip Markdown report generation",
)
parser.add_argument(
"--no-summary",
action="store_true",
help="Skip printing summary to console",
)
parser.add_argument(
"--neo4j-uri",
default=None,
help="Neo4j URI (default: from environment NEO4J_URI)",
)
parser.add_argument(
"--neo4j-user",
default=None,
help="Neo4j username (default: from environment NEO4J_USERNAME)",
)
parser.add_argument(
"--neo4j-password",
default=None,
help="Neo4j password (default: from environment NEO4J_PASSWORD)",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging",
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
# Only enable debug for our modules, not neo4j
logging.getLogger("neo4j").setLevel(logging.WARNING)
logging.getLogger("neo4j.pool").setLevel(logging.WARNING)
logging.getLogger("neo4j.io").setLevel(logging.WARNING)
# Load environment variables
load_dotenv()
# Get Neo4j credentials
default_neo4j_uri = "bolt://localhost:7687"
neo4j_uri = args.neo4j_uri or os.environ.get("NEO4J_URI", default_neo4j_uri)
neo4j_user = args.neo4j_user or os.environ.get("NEO4J_USERNAME", "neo4j")
default_password = "password" # noqa: S105
neo4j_password = (
args.neo4j_password
or os.environ.get("NEO4J_PASSWORD", default_password)
)
if not neo4j_password or neo4j_password == default_password:
logger.error(
"Please set NEO4J_PASSWORD environment variable "
"or use --neo4j-password",
)
sys.exit(1)
# Initialize detector
detector = AIHallucinationDetector(neo4j_uri, neo4j_user, neo4j_password)
try:
await detector.initialize()
# Process scripts
if len(args.scripts) == 1:
# Single script mode
await detector.detect_hallucinations(
script_path=args.scripts[0],
output_dir=args.output_dir,
save_json=not args.no_json,
save_markdown=not args.no_markdown,
print_summary=not args.no_summary,
)
else:
# Batch mode
await detector.batch_detect(
script_paths=args.scripts,
output_dir=args.output_dir,
)
except KeyboardInterrupt:
logger.info("Detection interrupted by user")
sys.exit(1)
except (ParsingError, AnalysisError, QueryError):
logger.exception("Analysis/Query error - detection failed")
sys.exit(1)
except Exception:
logger.exception("Unexpected error - detection failed")
sys.exit(1)
finally:
await detector.close()
if __name__ == "__main__":
asyncio.run(main())