Skip to main content
Glama
streaming_demo_tool.py1.54 kB
from __future__ import annotations from typing import Dict, Any from streaming.streaming_adapter import MoonshotStreamingAdapter, ZaiStreamingAdapter async def async_run_stream(prompt: str, provider: str = "moonshot", stream: bool = True) -> Dict[str, Any]: """Async-friendly streaming demo. - If stream=False, return deterministic fallback (no network) - If stream=True, yield first chunk via provider adapter without managing event loops here """ if not stream: return {"provider": provider, "stream": False, "text": prompt.upper()[:50]} adapter = MoonshotStreamingAdapter() if provider == "moonshot" else ZaiStreamingAdapter() first = "" async for chunk in adapter.iter_stream({"prompt": prompt}): first = chunk break return {"provider": provider, "stream": True, "text": first} def run_stream(prompt: str, provider: str = "moonshot", stream: bool = True) -> Dict[str, Any]: """Synchronous wrapper retained for legacy callers. NOTE: Prefer using async_run_stream in async contexts. This wrapper will only run when no event loop is active. """ import asyncio try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop; safe to use asyncio.run return asyncio.run(async_run_stream(prompt=prompt, provider=provider, stream=stream)) # A loop is already running; do not try to nest loops in sync context raise RuntimeError("run_stream called inside a running event loop; use async_run_stream instead")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Zazzles2908/EX_AI-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server