Skip to main content
Glama
by cycodehq
scan_result.py8.5 kB
import os from typing import TYPE_CHECKING, Optional import typer from cycode.cli import consts from cycode.cli.apps.scan.aggregation_report import try_get_aggregation_report_url_if_needed from cycode.cli.apps.scan.detection_excluder import exclude_irrelevant_document_detections from cycode.cli.models import Document, DocumentDetections, LocalScanResult from cycode.cli.utils.path_utils import get_path_by_os, normalize_file_path from cycode.cyclient.models import ( Detection, DetectionSchema, DetectionsPerFile, ScanResultsSyncFlow, ZippedFileScanResult, ) from cycode.logger import get_logger if TYPE_CHECKING: from cycode.cli.models import CliError from cycode.cyclient.models import ScanDetailsResponse from cycode.cyclient.scan_client import ScanClient logger = get_logger('Scan Results') def _get_document_by_file_name( documents: list[Document], file_name: str, unique_id: Optional[str] = None ) -> Optional[Document]: for document in documents: if normalize_file_path(document.path) == normalize_file_path(file_name) and document.unique_id == unique_id: return document return None def _get_document_detections( scan_result: ZippedFileScanResult, documents_to_scan: list[Document] ) -> list[DocumentDetections]: logger.debug('Getting document detections') document_detections = [] for detections_per_file in scan_result.detections_per_file: file_name = get_path_by_os(detections_per_file.file_name) commit_id = detections_per_file.commit_id logger.debug( 'Going to find the document of the violated file, %s', {'file_name': file_name, 'commit_id': commit_id} ) document = _get_document_by_file_name(documents_to_scan, file_name, commit_id) document_detections.append(DocumentDetections(document=document, detections=detections_per_file.detections)) return document_detections def create_local_scan_result( scan_result: ZippedFileScanResult, documents_to_scan: list[Document], command_scan_type: str, scan_type: str, severity_threshold: str, ) -> LocalScanResult: document_detections = _get_document_detections(scan_result, documents_to_scan) relevant_document_detections_list = exclude_irrelevant_document_detections( document_detections, scan_type, command_scan_type, severity_threshold ) detections_count = sum([len(document_detection.detections) for document_detection in document_detections]) relevant_detections_count = sum( [len(document_detections.detections) for document_detections in relevant_document_detections_list] ) return LocalScanResult( scan_id=scan_result.scan_id, report_url=scan_result.report_url, document_detections=relevant_document_detections_list, issue_detected=len(relevant_document_detections_list) > 0, detections_count=detections_count, relevant_detections_count=relevant_detections_count, ) def _get_file_name_from_detection(scan_type: str, raw_detection: dict) -> str: if scan_type == consts.SAST_SCAN_TYPE: return raw_detection['detection_details']['file_path'] if scan_type == consts.SECRET_SCAN_TYPE: return _get_secret_file_name_from_detection(raw_detection) return raw_detection['detection_details']['file_name'] def _get_secret_file_name_from_detection(raw_detection: dict) -> str: file_path: str = raw_detection['detection_details']['file_path'] file_name: str = raw_detection['detection_details']['file_name'] return os.path.join(file_path, file_name) def _map_detections_per_file_and_commit_id(scan_type: str, raw_detections: list[dict]) -> list[DetectionsPerFile]: """Convert a list of detections (async flow) to list of DetectionsPerFile objects (sync flow). Args: scan_type: Type of the scan. raw_detections: List of detections as is returned from the server. Note: This method fakes server response structure to be able to use the same logic for both async and sync scans. Note: Aggregation is performed by file name and commit ID (if available) """ detections_per_files = {} for raw_detection in raw_detections: try: # FIXME(MarshalX): investigate this field mapping raw_detection['message'] = raw_detection['correlation_message'] file_name = _get_file_name_from_detection(scan_type, raw_detection) detection: Detection = DetectionSchema().load(raw_detection) commit_id: Optional[str] = detection.detection_details.get('commit_id') # could be None group_by_key = (file_name, commit_id) if group_by_key in detections_per_files: detections_per_files[group_by_key].append(detection) else: detections_per_files[group_by_key] = [detection] except Exception as e: logger.debug('Failed to parse detection', exc_info=e) continue return [ DetectionsPerFile(file_name=file_name, detections=file_detections, commit_id=commit_id) for (file_name, commit_id), file_detections in detections_per_files.items() ] def init_default_scan_result(scan_id: str) -> ZippedFileScanResult: return ZippedFileScanResult( did_detect=False, detections_per_file=[], scan_id=scan_id, ) def get_scan_result( cycode_client: 'ScanClient', scan_type: str, scan_id: str, scan_details: 'ScanDetailsResponse', scan_parameters: dict, ) -> ZippedFileScanResult: if not scan_details.detections_count: return init_default_scan_result(scan_id) scan_raw_detections = cycode_client.get_scan_raw_detections(scan_id) return ZippedFileScanResult( did_detect=True, detections_per_file=_map_detections_per_file_and_commit_id(scan_type, scan_raw_detections), scan_id=scan_id, report_url=try_get_aggregation_report_url_if_needed(scan_parameters, cycode_client, scan_type), ) def get_sync_scan_result(scan_type: str, scan_results: 'ScanResultsSyncFlow') -> ZippedFileScanResult: return ZippedFileScanResult( did_detect=True, detections_per_file=_map_detections_per_file_and_commit_id(scan_type, scan_results.detection_messages), scan_id=scan_results.id, ) def print_local_scan_results( ctx: typer.Context, local_scan_results: list[LocalScanResult], errors: Optional[dict[str, 'CliError']] = None ) -> None: printer = ctx.obj.get('console_printer') printer.update_ctx(ctx) printer.print_scan_results(local_scan_results, errors) def enrich_scan_result_with_data_from_detection_rules( cycode_client: 'ScanClient', scan_result: ZippedFileScanResult ) -> None: detection_rule_ids = set() for detections_per_file in scan_result.detections_per_file: for detection in detections_per_file.detections: detection_rule_ids.add(detection.detection_rule_id) detection_rules = cycode_client.get_detection_rules(detection_rule_ids) detection_rules_by_id = {detection_rule.detection_rule_id: detection_rule for detection_rule in detection_rules} for detections_per_file in scan_result.detections_per_file: for detection in detections_per_file.detections: detection_rule = detection_rules_by_id.get(detection.detection_rule_id) if not detection_rule: # we want to make sure that BE returned it. better to not map data instead of failed scan continue if not detection.severity and detection_rule.classification_data: # it's fine to take the first one, because: # - for "secrets" and "iac" there is only one classification rule per-detection rule # - for "sca" and "sast" we get severity from detection service detection.severity = detection_rule.classification_data[0].severity # detection_details never was typed properly. so not a problem for now detection.detection_details['custom_remediation_guidelines'] = detection_rule.custom_remediation_guidelines detection.detection_details['remediation_guidelines'] = detection_rule.remediation_guidelines detection.detection_details['description'] = detection_rule.description detection.detection_details['policy_display_name'] = detection_rule.display_name

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cycodehq/cycode-cli'

If you have feedback or need assistance with the MCP directory API, please join our Discord server