Skip to main content
Glama
by cycodehq
scan_client.pyโ€ข6.06 kB
import json from pathlib import Path from typing import Optional from uuid import UUID, uuid4 import responses from cycode.cyclient.scan_client import ScanClient from tests.conftest import MOCKED_RESPONSES_PATH def get_zipped_file_scan_async_url(scan_type: str, scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_zipped_file_scan_async_url_path(scan_type) return f'{api_url}/{service_url}' def get_zipped_file_scan_async_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: if not scan_id: scan_id = uuid4() json_response = { 'scan_id': str(scan_id), # not always as expected due to _get_scan_id and passing scan_id to cxt of CLI } return responses.Response(method=responses.POST, url=url, json=json_response, status=200) def get_scan_details_url(scan_type: str, scan_id: Optional[UUID], scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_scan_details_path(scan_type, str(scan_id)) return f'{api_url}/{service_url}' def get_scan_aggregation_report_url(aggregation_id: Optional[UUID], scan_client: ScanClient, scan_type: str) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_scan_aggregation_report_url_path(str(aggregation_id), scan_type) return f'{api_url}/{service_url}' def get_scan_aggregation_report_url_response(url: str, aggregation_id: Optional[UUID] = None) -> responses.Response: if not aggregation_id: aggregation_id = uuid4() json_response = {'report_url': f'https://app.domain/cli-logs-aggregation/{aggregation_id}'} return responses.Response(method=responses.GET, url=url, json=json_response, status=200) def get_scan_details_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: if not scan_id: scan_id = uuid4() json_response = { 'id': str(scan_id), 'scan_type': 'Secrets', 'metadata': f'Path: {scan_id}, Folder: scans, Size: 465', 'entity_type': 'ZippedFile', 'entity_id': 'Repository', 'parent_entity_id': 'Organization', 'organization_id': 'Organization', 'scm_provider': 'CycodeCli', 'started_at': '2023-10-25T10:02:23.048282+00:00', 'finished_at': '2023-10-25T10:02:26.867082+00:00', 'scan_status': 'Completed', # mark as completed to avoid mocking repeated requests 'message': None, 'results_count': 1, 'scan_update_at': '2023-10-25T10:02:26.867216+00:00', 'duration': {'days': 0, 'hours': 0, 'minutes': 0, 'seconds': 3, 'milliseconds': 818}, 'is_hidden': False, 'is_initial_scan': False, 'detection_messages': [], } return responses.Response(method=responses.GET, url=url, json=json_response, status=200) def get_scan_detections_url(scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url path = scan_client.get_scan_detections_list_path() return f'{api_url}/{path}' def get_scan_detections_response(url: str, scan_id: UUID, zip_content_path: Path) -> responses.Response: with open(MOCKED_RESPONSES_PATH.joinpath('detections.json'), encoding='UTF-8') as f: content = f.read() content = content.replace('%FILEPATH%', str(zip_content_path.absolute().as_posix())) content = content.replace('%SCAN_ID%', str(scan_id)) json_response = json.loads(content) return responses.Response(method=responses.GET, url=url, json=json_response, status=200) def get_report_scan_status_url(scan_type: str, scan_id: UUID, scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_report_scan_status_path(scan_type, str(scan_id)) return f'{api_url}/{service_url}' def get_report_scan_status_response(url: str) -> responses.Response: return responses.Response(method=responses.POST, url=url, status=200) def get_detection_rules_url(scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_detection_rules_path() return f'{api_url}/{service_url}' def get_detection_rules_response(url: str) -> responses.Response: with open(MOCKED_RESPONSES_PATH.joinpath('detection_rules.json'), encoding='UTF-8') as f: json_response = json.load(f) return responses.Response(method=responses.GET, url=url, json=json_response, status=200) def get_scan_configuration_url(scan_type: str, scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_scan_configuration_path(scan_type) return f'{api_url}/{service_url}' def get_scan_configuration_response(url: str) -> responses.Response: json_response = { 'scannable_extensions': None, } return responses.Response(method=responses.GET, url=url, json=json_response, status=200) def mock_remote_config_responses(responses_module: responses, scan_type: str, scan_client: ScanClient) -> None: responses_module.add(get_scan_configuration_response(get_scan_configuration_url(scan_type, scan_client))) def mock_scan_async_responses( responses_module: responses, scan_type: str, scan_client: ScanClient, scan_id: UUID, zip_content_path: Path ) -> None: mock_remote_config_responses(responses_module, scan_type, scan_client) responses_module.add( get_zipped_file_scan_async_response(get_zipped_file_scan_async_url(scan_type, scan_client), scan_id) ) responses_module.add(get_scan_details_response(get_scan_details_url(scan_type, scan_id, scan_client), scan_id)) responses_module.add(get_detection_rules_response(get_detection_rules_url(scan_client))) responses_module.add(get_scan_detections_response(get_scan_detections_url(scan_client), scan_id, zip_content_path)) responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client)))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cycodehq/cycode-cli'

If you have feedback or need assistance with the MCP directory API, please join our Discord server