pytest_diagnostic.py•7.38 kB
"""Pytest plugin for enhanced diagnostic testing.
This plugin extends pytest with capabilities for detailed diagnostic reporting
while maintaining standard test pass/fail behavior.
"""
import json
import time
import traceback
from json import JSONEncoder
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional
import pytest
# Custom JSON Encoder that can handle binary data
class DiagnosticJSONEncoder(JSONEncoder):
    """Custom JSON encoder that can handle bytes and other non-serializable types."""
    def default(self, obj: Any) -> Any:
        """Convert bytes and other types to JSON-serializable objects."""
        if isinstance(obj, bytes):
            # Convert bytes to base64 string for JSON serialization
            import base64
            return {"__bytes__": True, "value": base64.b64encode(obj).decode("ascii")}
        # Handle Path objects
        if isinstance(obj, Path):
            return str(obj)
        # Handle tree-sitter specific types
        if hasattr(obj, "start_point") and hasattr(obj, "end_point") and hasattr(obj, "type"):
            # Probably a tree-sitter Node
            return {
                "type": obj.type,
                "start_point": obj.start_point,
                "end_point": obj.end_point,
                "_tsnode": True,
            }
        # Handle types with custom __dict__ but no standard serialization
        if hasattr(obj, "__dict__"):
            try:
                return obj.__dict__
            except (TypeError, AttributeError):
                pass
        # Let the base class handle any other types
        return super().default(obj)
# Global storage for test context and diagnostic results
_DIAGNOSTICS: Dict[str, "DiagnosticData"] = {}
_CURRENT_TEST: Dict[str, Any] = {}
class DiagnosticData:
    """Container for diagnostic information."""
    def __init__(self, test_id: str):
        """Initialize with test ID."""
        self.test_id = test_id
        self.start_time = time.time()
        self.end_time: Optional[float] = None
        self.status = "pending"
        self.details: Dict[str, Any] = {}
        self.errors: List[Dict[str, Any]] = []
        self.artifacts: Dict[str, Any] = {}
    def add_error(self, error_type: str, message: str, tb: Optional[str] = None) -> None:
        """Add an error to the diagnostic data."""
        error_info = {
            "type": error_type,
            "message": message,
        }
        if tb:
            error_info["traceback"] = tb
        self.errors.append(error_info)
        self.status = "error"
    def add_detail(self, key: str, value: Any) -> None:
        """Add a detail to the diagnostic data."""
        self.details[key] = value
    def add_artifact(self, name: str, content: Any) -> None:
        """Add an artifact to the diagnostic data."""
        self.artifacts[name] = content
    def finalize(self, status: str = "completed") -> None:
        """Mark the diagnostic as complete."""
        self.end_time = time.time()
        if not self.errors:
            self.status = status
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for serialization."""
        return {
            "test_id": self.test_id,
            "status": self.status,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "duration": self.end_time - self.start_time if self.end_time else None,
            "details": self.details,
            "errors": self.errors,
            "artifacts": self.artifacts,
        }
@pytest.fixture
def diagnostic(request: Any) -> Generator[DiagnosticData, None, None]:
    """Fixture to provide diagnostic functionality to tests."""
    # Get the current test ID
    test_id = f"{request.path}::{request.node.name}"
    # Create a diagnostic data instance
    diag = DiagnosticData(test_id)
    _DIAGNOSTICS[test_id] = diag
    yield diag
    # Finalize the diagnostic when the test is done
    diag.finalize()
def pytest_configure(config: Any) -> None:
    """Set up the plugin when pytest starts."""
    # Register additional markers
    config.addinivalue_line("markers", "diagnostic: mark test as producing diagnostic information")
def pytest_runtest_protocol(item: Any, nextitem: Any) -> Optional[bool]:
    """Custom test protocol that captures detailed diagnostics."""
    # Use the standard protocol
    return None
def pytest_runtest_setup(item: Any) -> None:
    """Set up the test environment."""
    # This is no longer needed as we use the request fixture
    pass
def pytest_runtest_teardown(item: Any) -> None:
    """Clean up after a test."""
    # This is no longer needed as we use the request fixture
    pass
def pytest_terminal_summary(terminalreporter: Any, exitstatus: Any, config: Any) -> None:
    """Add diagnostic summary to the terminal output."""
    if _DIAGNOSTICS:
        terminalreporter.write_sep("=", "Diagnostic Summary")
        error_count = sum(1 for d in _DIAGNOSTICS.values() if d.status == "error")
        terminalreporter.write_line(f"Collected {len(_DIAGNOSTICS)} diagnostics, {error_count} with errors")
        # If there are errors, show details
        if error_count:
            terminalreporter.write_sep("-", "Error Details")
            for test_id, diag in _DIAGNOSTICS.items():
                if diag.status == "error":
                    terminalreporter.write_line(f"- {test_id}")
                    for i, error in enumerate(diag.errors):
                        terminalreporter.write_line(f"  Error {i + 1}: {error['type']}: {error['message']}")
def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
    """Generate JSON reports at the end of the test session."""
    output_dir = Path("diagnostic_results")
    output_dir.mkdir(exist_ok=True)
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    output_file = output_dir / f"diagnostic_results_{timestamp}.json"
    # Convert diagnostics to JSON-serializable dict
    diagnostics_dict = {k: v.to_dict() for k, v in _DIAGNOSTICS.items()}
    # Write the results to a file
    with open(output_file, "w") as f:
        json.dump(
            {
                "timestamp": timestamp,
                "diagnostics": diagnostics_dict,
                "summary": {
                    "total": len(diagnostics_dict),
                    "errors": sum(1 for d in diagnostics_dict.values() if d["status"] == "error"),
                    "completed": sum(1 for d in diagnostics_dict.values() if d["status"] == "completed"),
                },
            },
            f,
            indent=2,
            cls=DiagnosticJSONEncoder,
        )
    print(f"\nDiagnostic results saved to {output_file}")
@pytest.hookimpl(tryfirst=True)
def pytest_exception_interact(node: Any, call: Any, report: Any) -> None:
    """Capture exception details for diagnostics."""
    if call.excinfo:
        try:
            test_id = f"{node.path}::{node.name}"
            if test_id in _DIAGNOSTICS:
                diag = _DIAGNOSTICS[test_id]
                exc_type = call.excinfo.type.__name__
                exc_value = str(call.excinfo.value)
                tb_str = "\n".join(traceback.format_tb(call.excinfo.tb))
                diag.add_error(exc_type, exc_value, tb_str)
        except Exception as e:
            print(f"Error recording diagnostic info: {e}")