Skip to main content
Glama
diagnosis_har_exporter.py11.2 kB
# type: ignore import json from datetime import datetime, timedelta from enum import Enum import json from typing import Any, Optional from dataclasses import dataclass from typing import Any from behave import then, when, register_type from behave.api.async_step import async_run_until_complete from utils.consts import * from utils.policies import EndpointPolicy, PoliciesRequests from toolkit_testing.integration_tests.s3 import S3ClientHelper, AWSAccess from toolkit_testing.integration_tests.routing import Routing from utils.client import make_request _DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" _minio_client_helper = S3ClientHelper( AWSAccess(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY), Routing("127.0.0.1", 9000), ) _HTTPBIN_ANYTHING_JSON_FIELD = "json" _CONTENT_TYPE = "content-type" def parse_with_obfuscation_enabled(text: str) -> bool: if text == "with obfuscation enabled": return True if text == "with obfuscation disabled": return False raise Exception(f"{text} is not recognized") def parse_with_exclusions(text: str) -> bool: if text == "with exclusions": return True if text == "without exclusions": return False raise Exception(f"{text} is not recognized") def parse_obfuscated(text: str) -> bool: if text == "obfuscated": return True if text == "not obfuscated": return False raise Exception(f"{text} is not recognized") def parse_status_code(text: str) -> int: return int(text) register_type(WithObfuscation=parse_with_obfuscation_enabled) register_type(WithExclusions=parse_with_exclusions) register_type(Obfuscated=parse_obfuscated) register_type(StatusCode=parse_status_code) @dataclass class ExpectedValues: method: str url: str status_code: int # Inspecting headers and body values is only needed in obfuscation tests content_type_header: Optional[str] = None response_body_field_name: Optional[str] = None response_body_field_value: Optional[str] = None class _ObfuscationMode(Enum): Disabled = "disabled" Enabled = "enabled" EnabledWithExclusions = "enabled with exclusions" @when( "Request to an endpoint which is configured with an HAR export diagnosis policy with obfuscation {obfuscation_mode}" ) @async_run_until_complete async def step_impl(context: Any, obfuscation_mode: _ObfuscationMode): host = "httpbinmock" if obfuscation_mode == _ObfuscationMode.Enabled.value: path = "/anything/obfuscated_har/sensitiveData" elif obfuscation_mode == _ObfuscationMode.Disabled.value: path = "/anything/non_obfuscated_har/sensitiveData" elif obfuscation_mode == _ObfuscationMode.EnabledWithExclusions.value: path = "/anything/obfuscated_har_with_exclusions/22/placeholder" context.request_time = datetime.utcnow() response = await make_request(host, path, is_proxified=True) context.response = response @when( "Request to an endpoint which is not configured with an HAR export diagnosis policy" ) @async_run_until_complete async def step_impl(context: Any): host = "httpbinmock" path = "/anything/undiagnosed" expected_har_url = f"http://httpbinmock/anything/undiagnosed" context.expected_values = ExpectedValues( method="GET", url=expected_har_url, status_code=200, ) context.request_time = datetime.utcnow() response = await make_request(host, path, is_proxified=True) context.response = response @when( "Request to an endpoint which is configured with a fixed response remedy and an HAR export diagnosis policy" ) @async_run_until_complete async def step_impl(context: Any): host = "httpbinmock" path = "/anything/remediated_and_diagnosed" expected_har_url = "http://httpbinmock/anything/remediated_and_diagnosed" context.expected_values = ExpectedValues( method="GET", url=expected_har_url, status_code=418, ) context.request_time = datetime.utcnow() response = await make_request( host, path, is_proxified=True, header_key="Early-Response", header_value="true", ) context.response = response @when( "policies.yaml includes a har_exporter diagnosis for {method} {host} {path:Path} requests {with_obfuscation:WithObfuscation} and {with_exclusions:WithExclusions}" ) def step_impl( context: Any, method: str, host: str, path: str, with_obfuscation: bool, with_exclusions: bool, ): policies_requests: PoliciesRequests = context.policies_requests diagnosis = { "name": "har_exporter for {method} {host}/{path}", "enabled": True, "export": "s3_minio", "config": { "har_exporter": { "transaction_max_size": 25000, "obfuscate": {"enabled": with_obfuscation}, } }, } if with_exclusions: diagnosis["config"]["har_exporter"]["obfuscate"]["exclusions"] = { "path_params": ["id"], "response_headers": [_CONTENT_TYPE], "response_body_paths": [".json"], } policies_requests.endpoints.append( EndpointPolicy(method, f"{host}{path}", diagnosis=[diagnosis]) ) @when( "policies.yaml includes a {exporter_type} exporter with {key1}: {value1} and {key2}: {value2}" ) @async_run_until_complete async def step_impl( context: Any, exporter_type: str, key1: str, value1: str, key2: str, value2: str ): policies_requests: PoliciesRequests = context.policies_requests for exporter, config in policies_requests.exporters.items(): print(f"Exporter: {exporter}") print(f"Config: {config}") if exporter == exporter_type: print( f"Updating exporter {exporter_type} with {key1}: {value1} and {key2}: {value2}" ) config[key1] = value1 config[key2] = value2 return print(f"Adding exporter {exporter_type} with {key1}: {value1} and {key2}: {value2}") policies_requests.exporters[exporter_type] = {key1: value1, key2: value2} @then("Transaction data is written") @async_run_until_complete async def step_impl(context: Any): now = datetime.utcnow() content = _minio_client_helper.get_object_content_of_last_modified_object( LUNAR_BUCKET_NAME, retries=20 ) har, start_time = _read_last_har(content) assert content is not None assert len(har) > 0 assert start_time < now + timedelta(seconds=10) assert start_time > now - timedelta(seconds=10) request = har["log"]["entries"][0]["request"] response = har["log"]["entries"][0]["response"] assert request is not None assert response is not None context.request = request context.response = response @then("Transaction data is written to bucket {expected_bucket_name}") @async_run_until_complete async def step_impl(context: Any, expected_bucket_name: str): now = datetime.utcnow() content = _minio_client_helper.get_object_content_of_last_modified_object( expected_bucket_name, retries=20 ) har, start_time = _read_last_har(content) assert content is not None assert len(har) > 0 assert start_time < now + timedelta(seconds=10) assert start_time > now - timedelta(seconds=10) request = har["log"]["entries"][0]["request"] response = har["log"]["entries"][0]["response"] assert request is not None assert response is not None context.request = request context.response = response @then("Entry Status should be {status_code:StatusCode}") @async_run_until_complete async def step_impl(context: Any, status_code: int): assert context.response["status"] == status_code @then("Entry Method should be {method}") @async_run_until_complete async def step_impl(context: Any, method: str): assert context.request["method"] == method @then("Entry URL should be {url}") @async_run_until_complete async def step_impl(context: Any, url: str): assert context.request["url"] == url @then("Entry URL should start with {partial_url}") @async_run_until_complete async def step_impl(context: Any, partial_url: str): assert context.request["url"].startswith(partial_url) @then("Entry URL should end with {partial_url}") @async_run_until_complete async def step_impl(context: Any, partial_url: str): assert context.request["url"].endswith(partial_url) @then("Entry Body `json` field should be obfuscated") @async_run_until_complete async def step_impl(context: Any): response_body = json.loads(context.response["content"]) # literally, the string `null` (JSON's null) hashed with MD5: json_null_md5 = "37a6259cc0c1dae299a7866489dff0bd" assert response_body[_HTTPBIN_ANYTHING_JSON_FIELD] == json_null_md5 @then("Entry Body `json` field should not be obfuscated") @async_run_until_complete async def step_impl(context: Any): response_body = json.loads(context.response["content"]) # The field is `null`, so Python's `None` actually means it was not obfuscated assert response_body[_HTTPBIN_ANYTHING_JSON_FIELD] == None @then("Entry Body field compressed with `{compression}` should be obfuscated") @async_run_until_complete async def step_impl(context: Any, compression: str): response_body = json.loads(context.response["content"]) # originally JSON's `true`, here in md5 https://md5.gromweb.com/?string=true assert response_body[compression] == "b326b5062b2f0e69046810717534cb09" @then("Entry Content-Type header should be obfuscated") @async_run_until_complete async def step_impl(context: Any): har_content_type = get_header(context.response["headers"], _CONTENT_TYPE) assert har_content_type == "d1f5a9d446c6cec2cf63545e8163e585" @then("Entry Content-Type header should not be obfuscated") @async_run_until_complete async def step_impl(context: Any): print(context.response["headers"]) assert context.response["headers"] har_content_type = get_header(context.response["headers"], _CONTENT_TYPE) assert har_content_type == "application/json" def get_header(headers: dict[str, str], header_name: str) -> Optional[str]: return [header["value"] for header in headers if header["name"] == header_name][0] @then("Transaction data is not written") @async_run_until_complete async def step_impl(context: Any): content = _minio_client_helper.get_object_content_of_last_modified_object( LUNAR_BUCKET_NAME, retries=20 ) har, start_time = _read_last_har(content) assert ( content is None # No file or len(har) == 0 # Empty file or start_time < context.request_time # Old HAR data ) def _read_last_har(content: str | None) -> tuple[dict[str, Any], datetime]: if content is None: return {}, datetime.utcnow() har_data = content.splitlines() assert len(har_data) > 0 raw_har = har_data[-1] har = json.loads(raw_har) start_time = _parse_start_time(har) return har, start_time def _parse_start_time(har: dict[str, Any]): start_time_str = har["log"]["entries"][0]["startedDateTime"][:-4] + "Z" return datetime.strptime(start_time_str, _DATE_FORMAT)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server