We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/marc-shade/threat-intel-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Async HTTP fetcher with timeout, retry, rate limiting, and circuit breaker integration.
All external HTTP calls in world-intel-mcp go through this module.
Stale-data fallback: when an API call fails, the last-known-good cached
response is returned (marked with _stale=True) so dashboards never go blank.
"""
import asyncio
import logging
import time
from typing import Any
import httpx
from .cache import Cache
from .circuit_breaker import CircuitBreaker
logger = logging.getLogger("world-intel-mcp.fetcher")
# Yahoo Finance requires serialized access (600ms gap)
_yahoo_lock = asyncio.Lock()
_yahoo_last_call: float = 0.0
_YAHOO_MIN_INTERVAL = 0.6 # seconds
# Per-source rate limits (min seconds between calls).
# Sources not listed here have no enforced limit.
_SOURCE_RATE_LIMITS: dict[str, float] = {
"yahoo-finance": 0.6, # unofficial — ~100 req/min safe
"opensky": 6.0, # free tier: 10 req/min
"coingecko": 2.0, # free tier: 30 calls/min
"cloudflare-radar": 3.0, # 20 req/min
"reddit": 1.5, # ~60 req/min (be conservative)
"nasa-firms": 2.0, # API key: ~1000 req/day
"adsblol": 5.0, # community API — be very polite
"polymarket": 1.0, # be polite
"faa": 1.0, # govt API
"usgs": 1.0, # generous but be polite
"acled": 2.0, # API key based
"nga": 2.0, # govt API
}
_source_locks: dict[str, asyncio.Lock] = {}
_source_last_call: dict[str, float] = {}
class Fetcher:
"""Centralized HTTP fetcher with caching, retries, and circuit breaking."""
def __init__(
self,
cache: Cache,
breaker: CircuitBreaker,
default_timeout: float = 15.0,
max_retries: int = 2,
client: httpx.AsyncClient | None = None,
):
self.cache = cache
self.breaker = breaker
self.default_timeout = default_timeout
self.max_retries = max_retries
self._client: httpx.AsyncClient | None = client
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.default_timeout),
follow_redirects=True,
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
headers={"User-Agent": "PhoenixAGI-WorldIntel/0.1"},
proxy=None, # never inherit system SOCKS proxy
)
return self._client
async def close(self) -> None:
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def get_json(
self,
url: str,
source: str,
cache_key: str | None = None,
cache_ttl: int = 300,
headers: dict[str, str] | None = None,
params: dict[str, Any] | None = None,
timeout: float | None = None,
yahoo_rate_limit: bool = False,
) -> dict | list | None:
"""Fetch JSON with caching, circuit breaking, and retries.
Args:
url: Target URL.
source: Source name for circuit breaker tracking.
cache_key: Cache key. If None, uses url+params hash.
cache_ttl: Cache TTL in seconds.
headers: Extra HTTP headers.
params: Query parameters.
timeout: Per-request timeout override.
yahoo_rate_limit: If True, enforce Yahoo Finance 600ms serialization.
Returns:
Parsed JSON or None on failure.
"""
# Check cache (live)
effective_key = cache_key or f"{source}:{url}:{params}"
cached = self.cache.get(effective_key)
if cached is not None:
return cached
# Check circuit breaker — fall back to stale data if open
if not self.breaker.is_available(source):
logger.debug("Circuit open for %s, trying stale cache", source)
return self._stale_fallback(effective_key, source)
# Per-source rate limiting
if yahoo_rate_limit:
await self._yahoo_throttle()
await self._source_throttle(source)
# Fetch with retries
client = await self._get_client()
last_error: Exception | None = None
for attempt in range(self.max_retries + 1):
try:
resp = await client.get(
url,
headers=headers,
params=params,
timeout=timeout or self.default_timeout,
)
resp.raise_for_status()
data = resp.json()
self.breaker.record_success(source)
self.cache.set(effective_key, data, cache_ttl)
return data
except (httpx.HTTPStatusError, httpx.RequestError, Exception) as exc:
last_error = exc
if attempt < self.max_retries:
wait = 1.0 * (attempt + 1)
logger.debug("Retry %d/%d for %s (%s), waiting %.1fs",
attempt + 1, self.max_retries, source, exc, wait)
await asyncio.sleep(wait)
# All retries failed — try stale cache before giving up
self.breaker.record_failure(source)
logger.warning("Fetch failed for %s: %s (url=%s)", source, last_error, url)
return self._stale_fallback(effective_key, source)
async def get_text(
self,
url: str,
source: str,
cache_key: str | None = None,
cache_ttl: int = 300,
headers: dict[str, str] | None = None,
params: dict[str, Any] | None = None,
timeout: float | None = None,
) -> str | None:
"""Fetch raw text with caching and circuit breaking."""
effective_key = cache_key or f"{source}:text:{url}:{params}"
cached = self.cache.get(effective_key)
if cached is not None:
return cached
if not self.breaker.is_available(source):
return self._stale_fallback(effective_key, source)
await self._source_throttle(source)
client = await self._get_client()
last_error: Exception | None = None
for attempt in range(self.max_retries + 1):
try:
resp = await client.get(
url,
headers=headers,
params=params,
timeout=timeout or self.default_timeout,
)
resp.raise_for_status()
text = resp.text
self.breaker.record_success(source)
self.cache.set(effective_key, text, cache_ttl)
return text
except (httpx.HTTPStatusError, httpx.RequestError, Exception) as exc:
last_error = exc
if attempt < self.max_retries:
await asyncio.sleep(1.0 * (attempt + 1))
self.breaker.record_failure(source)
logger.warning("Text fetch failed for %s: %s", source, last_error)
return self._stale_fallback(effective_key, source)
async def get_xml(
self,
url: str,
source: str,
cache_key: str | None = None,
cache_ttl: int = 300,
timeout: float | None = None,
) -> str | None:
"""Fetch XML content (returns raw text for feedparser/ET parsing)."""
return await self.get_text(url, source, cache_key, cache_ttl, timeout=timeout)
def _stale_fallback(self, cache_key: str, source: str) -> Any | None:
"""Return stale (expired) cached data as last-known-good fallback."""
stale = self.cache.get_stale(cache_key)
if stale is not None:
logger.info("Serving stale cache for %s (key=%s)", source, cache_key)
return stale
async def _source_throttle(self, source: str) -> None:
"""Enforce per-source rate limit from _SOURCE_RATE_LIMITS."""
min_interval = _SOURCE_RATE_LIMITS.get(source)
if min_interval is None:
return
if source not in _source_locks:
_source_locks[source] = asyncio.Lock()
async with _source_locks[source]:
now = time.time()
last = _source_last_call.get(source, 0.0)
elapsed = now - last
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
_source_last_call[source] = time.time()
async def _yahoo_throttle(self) -> None:
"""Enforce Yahoo Finance rate limit (600ms between calls)."""
global _yahoo_last_call
async with _yahoo_lock:
now = time.time()
elapsed = now - _yahoo_last_call
if elapsed < _YAHOO_MIN_INTERVAL:
await asyncio.sleep(_YAHOO_MIN_INTERVAL - elapsed)
_yahoo_last_call = time.time()