Skip to main content
Glama
main.py1.64 kB
import lunar_interceptor import asyncio import aiohttp from aioconsole import ainput SLEEP_INTERVAL_IN_SEC = 2 URL = "https://catfact.ninja/fact" X_LUNAR_CONSUMER_TAG = "lunar-example-app" async def get_cat_fact(session, stop_event): while not stop_event.is_set(): try: headers = { "x-lunar-consumer-tag": X_LUNAR_CONSUMER_TAG } async with session.get(URL, headers=headers) as response: if response.status == 200: fact = (await response.json()).get("fact") print(f"Cat Fact: {fact}") else: print( f"Failed to retrieve cat fact. Status code {response.status_code}" ) except Exception as e: print(f"An error occurred: {e}") await asyncio.sleep(SLEEP_INTERVAL_IN_SEC) async def wait_for_input(stop_event): await ainput("Press Enter to stop...\n") stop_event.set() async def main(): # Event to signal when to stop the asyncio loop stop_event = asyncio.Event() await ainput("Press Enter to get a cat fact...") print(f"Sending a request to {URL} every {SLEEP_INTERVAL_IN_SEC} seconds") async with aiohttp.ClientSession() as session: fact_task = asyncio.create_task(get_cat_fact(session, stop_event)) input_task = asyncio.create_task(wait_for_input(stop_event)) # Wait for the fact_task and input_task to complete await asyncio.wait([fact_task, input_task], return_when=asyncio.ALL_COMPLETED) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server