Skip to main content
Glama
common.py9.99 kB
# type: ignore # This is since `behave` is really mistyped and all dynamic. # Might be handled later. from behave import given, when, then, register_type from behave.api.async_step import async_run_until_complete import dateutil.parser from utils.docker import start_service, stop_service, rm_service from typing import Literal, Any from utils.httpbin import HTTPBinHelper from utils.consts import * from utils.docker import build_service from utils.helpers import healthcheck from toolkit_testing.integration_tests.mox import MoxHelper, MoxEndpointRequest from toolkit_testing.integration_tests.client import ClientResponse import json import time import dateutil from aiohttp import ClientSession from toolkit_testing.integration_tests.docker import EnvVar _mox_helper = MoxHelper(host="http://localhost", port=8888) _httpbin_helper = HTTPBinHelper(host="http://localhost", port=8080) def parse_int(text: str) -> int: return int(text) def parse_list_of_int(text: str) -> list[int]: return [int(x) for x in text.split(",")] def parse_float(text: str) -> float: return float(text) def validate_path(path: str) -> str: if not path.startswith("/"): raise Exception(f"path must start with `/`, but received {path}") return path def parse_index(text: str) -> int: return int(text) - 1 # Indexes in tests are 1-based but in Python they are 0-based def parse_enabled(text: str) -> bool: text = text.lower() if text == "enabled": return True elif text == "disabled": return False raise ValueError(f"Invalid bool: {text}") def parse_same(text: str) -> bool: text = text.lower() if text == "same": return True elif text == "different": return False raise ValueError(f"Invalid bool: {text}") register_type(Status=parse_int) register_type(Port=parse_int) register_type(Int=parse_int) register_type(ListOfInt=parse_list_of_int) register_type(Float=parse_float) register_type(Path=validate_path) register_type(Index=parse_index) register_type(Enabled=parse_enabled) register_type(Same=parse_same) ############################# ## Environment setup steps ## ############################# @given("API Provider is {up_or_down}") @async_run_until_complete async def step_impl(_, up_or_down: Literal["up"] | Literal["down"]): if up_or_down == "up": await start_service(HTTPBIN_SERVICE_NAME, []) await start_service(MOX_SERVICE_NAME, []) assert await _mox_helper.healthcheck(retries=HEALTHCHECK_RETRIES, sleep_s=1) assert await _httpbin_helper.healthcheck(retries=HEALTHCHECK_RETRIES, sleep_s=1) assert await _mox_helper.set_mox_proxy_path(MOX_GET_UUID_ENDPOINT_REQUEST) assert await _mox_helper.set_mox_proxy_path(MOX_GET_UNMATCHED_ENDPOINT_REQUEST) else: await stop_service(HTTPBIN_SERVICE_NAME) await stop_service(MOX_SERVICE_NAME) @given("Lunar Proxy env var `{env_var}` set to `{value}`") def step_impl(context: Any, env_var: str, value: str): context.lunar_proxy_env_vars.append(EnvVar(env_var, value)) @given("Lunar Proxy is up") @async_run_until_complete async def step_impl(context: Any): await start_proxy(context) @given("Lunar Proxy is down") @then("Lunar Proxy is down") @async_run_until_complete async def step_impl(_: Any): await stop_proxy() ####################### ## General mox steps ## ####################### @given("mox is set to respond to {method} {path:Path} with status {code:Status}") @async_run_until_complete async def step_impl(context: Any, method: str, path: str, code: int): endpoint_id = await _mox_helper.set_mox_proxy_path( MoxEndpointRequest(method, path, "{}", code, {}) ) context.created_mox_endpoint_ids.append(endpoint_id) ##################################### ## General local-HTTP client steps ## ##################################### @when( "A local {method} request (id {request_id}) is made to port {port:Port} at path {path:Path}" ) @async_run_until_complete async def step_impl(context: Any, method: str, request_id: str, port: int, path: str): start_time = time.time() url = f"http://localhost:{port}{path}" async with ClientSession() as session: try: async with session.request(method=method, url=url) as resp: status = resp.status resp_body = await resp.text() response_time = time.time() runtime_s = response_time - start_time context.local_responses[request_id] = ClientResponse( resp_body, status, response_time=response_time, runtime_s=runtime_s ) print(f"response: {context.local_responses[request_id]}") return except Exception as ex: print(f"failed calling {url}: {ex}") return @then("Response (id {request_id}) status code should be {status:Status}") def step_impl(context: Any, request_id: str, status: int): print(f"context: {context.local_responses}") assert context.local_responses[request_id].status == status ############################################ ## General proxified responses assertions ## ############################################ @then("Response has status {status:Int}") @async_run_until_complete async def step_impl(context: Any, status: int): print(f"context.proxified_response: {context.proxified_response}") assert context.proxified_response.status == status @then("Response has {header_name} header") @async_run_until_complete async def step_impl(context: Any, header_name: str): print(f"Response headers: {context.proxified_response.headers}") assert context.proxified_response.headers.get(header_name, None) is not None @then("Response does not have {header_name} header") @async_run_until_complete async def step_impl(context: Any, header_name: str): print(f"Response headers: {context.proxified_response.headers}") assert not header_name in context.proxified_response.headers @then("Responses does not have {header_name} header") @async_run_until_complete async def step_impl(context: Any, header_name: str): for response in context.responses: print(f"Response headers: {response.headers}") assert header_name not in response.headers @then("Response has {header_name} header with value {header_value}") @async_run_until_complete async def step_impl(context: Any, header_name: str, header_value: str): print(f"Response headers: {context.proxified_response.headers}") assert header_name in context.proxified_response.headers print( f"Found header {context.proxified_response.headers[header_name]} but expected {header_value}" ) assert context.proxified_response.headers[header_name] == header_value ###################################### ## JSON Traversal & Assertion Steps ## ###################################### @then( "item {marked_object} is an array with item that matches {raw_json} (marked as {marker})" ) @async_run_until_complete async def step_impl(context: Any, marked_object: str, raw_json: str, marker: str): print(f"context.marked_objects: {context.marked_objects}") array = context.marked_objects[marked_object] assert array found_item = next( item for item in array if json.loads(raw_json).items() <= item.items() ) assert found_item context.marked_objects[marker] = found_item @then("item {marker} {field} json is {value}") @async_run_until_complete async def step_impl(context: Any, marker: str, field: str, value: str): print(f"context.marked_objects: {context.marked_objects}") assert context.marked_objects[marker] assert context.marked_objects[marker][field] == json.loads(value) @then("item {marker} {field} is {value}") @async_run_until_complete async def step_impl(context: Any, marker: str, field: str, value: str): try: print(f"context.marked_objects: {context.marked_objects}") assert context.marked_objects[marker] assert str(context.marked_objects[marker][field]) == value except Exception: print( f"expected value: {value}, actual value: {context.marked_objects[marker][field]}" ) @then("item {marked_object} has field {field} (marked {marker})") @async_run_until_complete async def step_impl(context: Any, marked_object: str, field: str, marker: str): print(f"context.marked_objects: {context.marked_objects}") item = context.marked_objects[marked_object] assert item found_field = item[field] assert ( found_field is not None or field in item ), f"Field '{field}' is missing or None" context.marked_objects[marker] = found_field @then("item {marked_object} is a valid date") def step_impl(context: Any, marked_object: any): item = context.marked_objects[marked_object] assert item print(f"marked_object: {item}") as_date = dateutil.parser.isoparse(item) assert as_date async def stop_proxy(proxy_service: str = LUNAR_PROXY_SERVICE_NAME): try: await stop_service(proxy_service) await rm_service(proxy_service) except Exception as exc: print(exc) async def start_proxy( context: Any, proxy_service: str = LUNAR_PROXY_SERVICE_NAME, healthcheck_port: int = 8040, handshake_port: int = 8081, ): await build_service(proxy_service, [], []) await start_service(proxy_service, context.lunar_proxy_env_vars) # Waits for the gateway to start await healthcheck( method="GET", url=f"http://localhost:{healthcheck_port}/healthcheck", status_predicate=lambda status: status == 200, attempts=20, sleep_s=0.5, ) # Waits for the engine to start await healthcheck( method="GET", url=f"http://localhost:{handshake_port}/handshake", status_predicate=lambda status: status == 200, attempts=20, sleep_s=0.5, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server