session_demo.py•6.55 kB
"""Demo script for session-aware LLM scanning."""
import asyncio
import sys
from pathlib import Path
from ..credentials import get_credential_manager
from ..logger import get_logger
from ..scanner.session_aware_llm_scanner import SessionAwareLLMScanner
logger = get_logger("session_demo")
async def demo_session_scanning():
"""Demonstrate session-aware LLM scanning capabilities."""
logger.info("Session-Aware LLM Security Analysis Demo")
logger.info("=" * 50)
# Initialize scanner
try:
credential_manager = get_credential_manager()
scanner = SessionAwareLLMScanner(credential_manager)
if not scanner.is_available():
logger.error("LLM scanner not available. Please configure your API keys.")
return
logger.info("LLM scanner initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize scanner: {e}")
return
# Get project root (current directory or examples)
project_root = Path.cwd()
examples_dir = project_root / "examples"
if examples_dir.exists():
project_root = examples_dir
logger.info(f"Using examples directory: {project_root}")
else:
logger.info(f"Using current directory: {project_root}")
try:
logger.info("🧠 Starting session-aware analysis...")
logger.info("This will:")
logger.info(" 1. Load the entire project context into the LLM")
logger.info(" 2. Perform comprehensive security analysis")
logger.info(" 3. Look for cross-file vulnerabilities")
logger.info(" 4. Provide architectural security insights")
# Run session-aware analysis
threat_matches = await scanner.analyze_project_with_session(
project_root=project_root,
analysis_focus="comprehensive security analysis with architectural review",
)
logger.info(f"Analysis Results: {len(threat_matches)} findings")
logger.info("-" * 30)
if not threat_matches:
logger.info("No security vulnerabilities detected!")
else:
for i, threat in enumerate(threat_matches, 1):
logger.info(f"{i}. {threat.rule_name}")
logger.info(f" Severity: {threat.severity.value.upper()}")
logger.info(f" File: {threat.file_path}")
if threat.line_number > 1:
logger.info(f" Line: {threat.line_number}")
logger.info(f" Description: {threat.description}")
if threat.code_snippet:
logger.info(f" Code: {threat.code_snippet[:100]}...")
logger.info(f" Confidence: {threat.confidence:.1%}")
# Show session-specific context if available
if hasattr(threat, "metadata") and threat.metadata:
session_context = threat.metadata.get("session_context", {})
if "architectural_context" in session_context:
logger.info(
f" Architectural Context: {session_context['architectural_context'][:100]}..."
)
logger.info("Analysis Summary:")
logger.info(f" • Total findings: {len(threat_matches)}")
# Count by severity
severity_counts = {}
for threat in threat_matches:
severity = threat.severity.value
severity_counts[severity] = severity_counts.get(severity, 0) + 1
for severity, count in severity_counts.items():
logger.info(f" • {severity.title()}: {count}")
logger.info("Session-aware analysis complete!")
logger.info("Key advantages of session-aware analysis:")
logger.info(" [+] Full project context understanding")
logger.info(" [+] Cross-file vulnerability detection")
logger.info(" [+] Architectural security insights")
logger.info(" [+] Reduced false positives through context")
logger.info(" [+] More intelligent threat analysis")
except Exception as e:
logger.error(f"Analysis failed: {e}")
logger.error(f"Demo analysis failed: {e}", exc_info=True)
finally:
# Cleanup any sessions
scanner.cleanup_expired_sessions()
async def demo_file_analysis():
"""Demonstrate file analysis with project context."""
logger.info("=" * 50)
logger.info("File Analysis with Project Context Demo")
logger.info("=" * 50)
try:
credential_manager = get_credential_manager()
scanner = SessionAwareLLMScanner(credential_manager)
if not scanner.is_available():
logger.error("LLM scanner not available")
return
# Find a Python file to analyze
project_root = Path.cwd()
examples_dir = project_root / "examples"
target_file = None
if examples_dir.exists():
# Look for Python files in examples
python_files = list(examples_dir.glob("**/*.py"))
if python_files:
target_file = python_files[0]
if not target_file:
# Look in current project
python_files = list(project_root.glob("src/**/*.py"))
if python_files:
target_file = python_files[0]
if not target_file:
logger.error("No Python files found to analyze")
return
logger.info(f"Analyzing file with context: {target_file}")
threat_matches = await scanner.analyze_file_with_context(
file_path=target_file,
context_hint="Focus on input validation and injection vulnerabilities",
)
logger.info(f"File Analysis Results: {len(threat_matches)} findings")
for i, threat in enumerate(threat_matches, 1):
logger.info(f"{i}. {threat.rule_name}")
logger.info(f" Severity: {threat.severity.value.upper()}")
logger.info(f" Line: {threat.line_number}")
logger.info(f" Description: {threat.description}")
logger.info(f" Confidence: {threat.confidence:.1%}")
if not threat_matches:
logger.info("No vulnerabilities found in this file!")
except Exception as e:
logger.error(f"File analysis failed: {e}")
def main():
"""Main demo function."""
if len(sys.argv) > 1 and sys.argv[1] == "file":
asyncio.run(demo_file_analysis())
else:
asyncio.run(demo_session_scanning())
if __name__ == "__main__":
main()