Skip to main content
Glama
lunar_hub.py3.36 kB
# type: ignore import asyncio from json import loads from typing import Any from behave import then, given from behave.api.async_step import async_run_until_complete from prometheus_client.parser import text_string_to_metric_families from utils.consts import * from utils.hub_client import HubClient from utils.docker import start_service, stop_service _hub_client = HubClient() @then("Discovery event is sent to Lunar Hub") @async_run_until_complete async def step_impl(_): assert await _hub_client.healthcheck(retries=10, sleep_s=1) for _ in range(4): try: discovery_response = await _hub_client.get_discovery() discovery_data = loads(discovery_response.body) print("****- Discovery Event -****") print(discovery_data) print("********") payload = discovery_data.get("data", {}) if len(payload.get("endpoints", {})) > 0: return except Exception as e: pass await asyncio.sleep(3) assert False @then("Metrics event is sent to Lunar Hub") @async_run_until_complete async def step_impl(_): assert await _hub_client.healthcheck(retries=10, sleep_s=1) for _ in range(6): try: metrics_response = await _hub_client.get_metrics() metrics_data = loads(metrics_response.body) print("****- Metrics Event -****") print(metrics_data) print("********") payload = metrics_data.get("data", {}) if len(payload) > 0: metrics_counter = count_metrics(metrics_data.get("data", {})) if metrics_counter: return except Exception as e: pass await asyncio.sleep(3) assert False @then("Configuration Load event is sent to Lunar Hub") @async_run_until_complete async def step_impl(_): assert await _hub_client.healthcheck(retries=10, sleep_s=1) for _ in range(4): try: configuration_load_response = await _hub_client.get_configuration_load() configuration_load_data = loads(configuration_load_response.body) print("****- Configuration Load -****") print(configuration_load_data) print("********") payload = configuration_load_data.get("data", {}) if ( len(payload.get("data", {})) > 0 ): # schema has `data` key, an array, in top level `data` field, return except Exception as e: pass await asyncio.sleep(3) assert False @given("Lunar Hub Mock is down") @async_run_until_complete async def step_impl(_: Any): await stop_service(LUNAR_HUB_MOCK_SERVICE_NAME) @given("Lunar Hub Mock is up") @async_run_until_complete async def step_impl(_: Any): await stop_service(LUNAR_HUB_MOCK_SERVICE_NAME) await start_service(LUNAR_HUB_MOCK_SERVICE_NAME) def count_metrics(metrics_text, prefix=None): metric_counts = {} # Parse the metrics text for family in text_string_to_metric_families(metrics_text): metric_name = family.name if prefix and not metric_name.startswith(prefix): continue # Skip metrics that don't match the prefix metric_counts[metric_name] = len(family.samples) return metric_counts

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server