trends_digest
Collect and summarize information from academic publications, trending code repositories, and medical device regulatory sources into a single digest for any topic.
Instructions
One-shot multi-source digest for a topic. Calls sources in parallel; partial failures don't break the report.
PRESENTATION: source content (paper abstracts, descriptions, recall reasons) is in its original language (mostly English). When showing this digest to the user, translate the content into the user's current conversation language. Keep proper nouns, IDs (PMID, k_number, arXiv IDs), URLs, and metric values (stars, downloads) untouched.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes | ||
| sources | No | ||
| per_source_limit | No | ||
| days | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- trends_mcp.py:1605-1657 (handler)The `trends_digest` async function that implements the tool logic. It accepts a topic, optional sources list, per_source_limit, and days; validates inputs; calls _digest_call for each source in parallel; and assembles a markdown digest report.
async def trends_digest( topic: str, sources: list[str] | None = None, per_source_limit: int = 5, days: int = 14, ) -> str: if not topic or not topic.strip(): return "Error: `topic` is required." topic = topic.strip() sources = sources or _DEFAULT_SOURCES if not sources: return ( "Error: no sources enabled. Set TRENDS_ENABLED_SOURCES env var " f"to a subset of {sorted(ALL_SOURCES)}." ) valid = {s.value for s in DigestSource} bad = [s for s in sources if s not in valid] if bad: return f"Error: unknown sources {bad}. Valid: {sorted(valid)}" disabled = [s for s in sources if s not in ENABLED_SOURCES] if disabled: return ( f"Error: sources {disabled} are disabled by TRENDS_ENABLED_SOURCES env var. " f"Currently enabled: {sorted(ENABLED_SOURCES)}." ) per_source_limit = max(1, min(per_source_limit, 15)) days = max(1, min(days, 90)) tasks = [_digest_call(s, topic, per_source_limit, days) for s in sources] results = await asyncio.gather(*tasks, return_exceptions=False) lines = [ f"# 트렌드 다이제스트: `{topic}`", f"_최근 {days}일 · 소스 {len(sources)}개_", "", ] for source, err, items in results: title, render = _DIGEST_SECTION[source] lines.append(f"## {title}") if err: lines.append(f"_{err}_\n") continue if not items: lines.append("_결과 없음_\n") continue lines.extend(render(items)) lines.append("") lines.append( "<!-- Presentation hint: source text is in its original language. " "Translate the content into the user's current conversation language " "for the user-facing response. Preserve URLs, IDs, and metric values. -->" ) return "\n".join(lines) - trends_mcp.py:1317-1325 (schema)The DigestSource enum defining valid source identifiers for the digest (arxiv, github, huggingface, paperswithcode, pubmed, fda_510k, fda_recalls).
class DigestSource(str, Enum): ARXIV = "arxiv" GITHUB = "github" HUGGINGFACE = "huggingface" PAPERSWITHCODE = "paperswithcode" PUBMED = "pubmed" FDA_510K = "fda_510k" FDA_RECALLS = "fda_recalls" - trends_mcp.py:1587-1604 (registration)The MCP tool registration for 'trends_digest' using @mcp.tool() decorator with name, description, and annotations.
@mcp.tool( name="trends_digest", description=( "One-shot multi-source digest for a topic. Calls sources in parallel; " "partial failures don't break the report.\n\n" "PRESENTATION: source content (paper abstracts, descriptions, recall " "reasons) is in its original language (mostly English). When showing " "this digest to the user, translate the content into the user's " "current conversation language. Keep proper nouns, IDs (PMID, k_number, " "arXiv IDs), URLs, and metric values (stars, downloads) untouched." ), annotations={ "readOnlyHint": True, "destructiveHint": False, "openWorldHint": True, "idempotentHint": False, }, ) - trends_mcp.py:1408-1573 (helper)The _digest_call helper function that fetches data from a single source (arxiv, github, huggingface, paperswithcode, pubmed, fda_510k, fda_recalls) for the digest.
async def _digest_call( source: str, topic: str, per_source_limit: int, days: int ) -> tuple[str, str | None, list[dict[str, Any]]]: """Returns (source, error_or_none, items_list).""" try: if source == DigestSource.ARXIV.value: params = { "search_query": f"all:{topic}", "sortBy": "submittedDate", "sortOrder": "descending", "max_results": per_source_limit, } text = await _http_get_text(ARXIV_API, params=params, ttl=TTL_DEFAULT) return source, None, _parse_arxiv_atom(text)[:per_source_limit] if source == DigestSource.GITHUB.value: since_date = (_utc_now() - timedelta(days=days)).strftime("%Y-%m-%d") params = { "q": f"{topic} created:>{since_date}", "per_page": per_source_limit, "sort": "stars", "order": "desc", } headers = { "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", } tok = os.environ.get("GITHUB_TOKEN") if tok: headers["Authorization"] = f"Bearer {tok}" data = await _http_get_json(GITHUB_API, params=params, headers=headers, ttl=TTL_TRENDING) items = [ { "full_name": x.get("full_name", ""), "url": x.get("html_url", ""), "description": x.get("description") or "", "language": x.get("language"), "stars": x.get("stargazers_count", 0), } for x in (data.get("items") or [])[:per_source_limit] ] return source, None, items if source == DigestSource.HUGGINGFACE.value: params = { "sort": "trendingScore", "direction": -1, "limit": per_source_limit, "search": topic, } headers: dict[str, str] = {} tok = os.environ.get("HF_TOKEN") if tok: headers["Authorization"] = f"Bearer {tok}" raw = await _http_get_json( f"{HF_API}/models", params=params, headers=headers or None, ttl=TTL_TRENDING ) if not isinstance(raw, list): raw = [] items = [ { "id": x.get("id", ""), "url": f"https://huggingface.co/{x.get('id', '')}", "downloads": x.get("downloads"), "likes": x.get("likes", 0), } for x in raw[:per_source_limit] ] return source, None, items if source == DigestSource.PAPERSWITHCODE.value: raw = await _http_get_json(HF_DAILY_PAPERS_API, ttl=TTL_TRENDING) if not isinstance(raw, list): raw = [] q_lower = topic.lower() items = [] for entry in raw: paper = entry.get("paper") or {} title = entry.get("title") or paper.get("title") or "" summary = entry.get("summary") or paper.get("summary") or "" if q_lower and q_lower not in title.lower() and q_lower not in summary.lower(): continue arxiv_id = paper.get("id") or "" items.append({ "title": title.strip(), "url_abs": f"https://huggingface.co/papers/{arxiv_id}" if arxiv_id else "", "published": entry.get("publishedAt") or paper.get("publishedAt") or "", }) if len(items) >= per_source_limit: break return source, None, items if source == DigestSource.PUBMED.value: term = f"({topic}) AND (\"last {days} days\"[PDat])" common = {"db": "pubmed", "retmode": "json"} api_key = os.environ.get("NCBI_API_KEY") if api_key: common["api_key"] = api_key r1 = await _http_get_json( f"{PUBMED_BASE}/esearch.fcgi", params={**common, "term": term, "retmax": per_source_limit, "sort": "pub_date"}, ttl=TTL_STATIC, ) pmids = r1.get("esearchresult", {}).get("idlist", []) if not pmids: return source, None, [] r2 = await _http_get_json( f"{PUBMED_BASE}/esummary.fcgi", params={**common, "id": ",".join(pmids)}, ttl=TTL_STATIC, ) result = r2.get("result", {}) uids = result.get("uids", pmids) abstracts = await _pubmed_fetch_abstracts(list(uids)) items = [] for uid in uids: rec = result.get(uid) if not rec: continue items.append( { "pmid": uid, "url": f"https://pubmed.ncbi.nlm.nih.gov/{uid}/", "title": rec.get("title", "").rstrip("."), "journal": rec.get("fulljournalname") or rec.get("source") or "", "pubdate": rec.get("pubdate") or "", "abstract": abstracts.get(uid, ""), } ) return source, None, items if source == DigestSource.FDA_510K.value: end = _utc_now().strftime("%Y%m%d") start = (_utc_now() - timedelta(days=max(days, 30))).strftime("%Y%m%d") search = f"decision_date:[{start}+TO+{end}]+AND+device_name:{topic}" url = f"{OPENFDA_510K}?search={search}&limit={per_source_limit}&sort=decision_date:desc" api_key = os.environ.get("OPENFDA_API_KEY") if api_key: url += f"&api_key={api_key}" try: data = await _http_get_json(url, ttl=TTL_STATIC) return source, None, (data.get("results") or [])[:per_source_limit] except httpx.HTTPStatusError as e: if e.response.status_code == 404: return source, None, [] raise if source == DigestSource.FDA_RECALLS.value: end = _utc_now().strftime("%Y%m%d") start = (_utc_now() - timedelta(days=max(days, 90))).strftime("%Y%m%d") search = f"event_date_initiated:[{start}+TO+{end}]+AND+product_description:{topic}" url = f"{OPENFDA_RECALL}?search={search}&limit={per_source_limit}&sort=event_date_initiated:desc" api_key = os.environ.get("OPENFDA_API_KEY") if api_key: url += f"&api_key={api_key}" try: data = await _http_get_json(url, ttl=TTL_STATIC) return source, None, (data.get("results") or [])[:per_source_limit] except httpx.HTTPStatusError as e: if e.response.status_code == 404: return source, None, [] raise return source, f"Unknown source: {source}", [] except Exception as e: return source, _handle_error(e, f"digest:{source}"), [] - trends_mcp.py:1576-1584 (helper)The _DIGEST_SECTION mapping from source names to (section title, render function) pairs for formatting digest output.
_DIGEST_SECTION = { DigestSource.ARXIV.value: ("📌 arXiv", _digest_arxiv_md), DigestSource.GITHUB.value: ("📌 GitHub", _digest_github_md), DigestSource.HUGGINGFACE.value: ("📌 Hugging Face (models)", _digest_hf_md), DigestSource.PAPERSWITHCODE.value: ("📌 Papers with Code", _digest_pwc_md), DigestSource.PUBMED.value: ("📌 PubMed", _digest_pubmed_md), DigestSource.FDA_510K.value: ("📌 FDA 510(k)", _digest_510k_md), DigestSource.FDA_RECALLS.value: ("📌 FDA Recalls", _digest_recall_md), }