Skip to main content
Glama
policies_reload.py2.33 kB
# type: ignore # This is since `behave` is really mistyped and all dynamic. # Might be handled later. import os import asyncio from behave import when from behave.api.async_step import async_run_until_complete from typing import Any import yaml from utils.consts import * from utils.policies import ( PoliciesRequests, read_actual_policies_file, write_actual_policies_file, ) from toolkit_testing.integration_tests.fluent_helper import FluentHelper _FLUENT_HEALTH_HOST = "localhost" _FLUENT_HEALTH_PORT = 2020 _FLUENT_HEALTHCHECK_RETRIES = 30 _fluent_helper = FluentHelper(_FLUENT_HEALTH_HOST, _FLUENT_HEALTH_PORT) @when("policies.yaml file is updated") @async_run_until_complete async def step_impl(context: Any): context.policies_requests = PoliciesRequests() @when("policies.yaml file is saved") @async_run_until_complete async def step_impl(context: Any): policies_yaml = context.policies_requests.build_yaml() print(f"policies yaml:\n{yaml.dump(policies_yaml)}") await write_actual_policies_file(policies_yaml=policies_yaml) @when("apply_policies command is run") @async_run_until_complete async def step_impl(_: Any): assert os.system("docker exec lunar-proxy apply_policies") == 0 await _fluent_helper.healthcheck(retries=_FLUENT_HEALTHCHECK_RETRIES, sleep_s=0.1) @when("apply_policies command is run without waiting for Fluent to reload") @async_run_until_complete async def step_impl(_: Any): assert os.system("docker exec lunar-proxy apply_policies") == 0 @when("policies.yaml file is saved on {container_name}") @async_run_until_complete async def step_impl(context: Any, container_name: str): policies_yaml = context.policies_requests.build_yaml() print(f"policies yaml:\n{yaml.dump(policies_yaml)}") await write_actual_policies_file( policies_yaml=policies_yaml, container_name=container_name ) @when( "apply_policies command is run on {container_name} without waiting for Fluent to reload" ) @async_run_until_complete async def step_impl(_: Any, container_name: str): print(f"docker exec {container_name} apply_policies") for _ in range(10): if os.system(f"docker exec {container_name} apply_policies") == 0: return True await asyncio.sleep(1) assert False, "Failed to apply policies"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server