获取股票/加密货币相关新闻
stock_newsFetch recent news for stocks or cryptocurrencies using their symbol. Get relevant financial updates to inform trading decisions.
Instructions
根据股票代码或加密货币符号获取近期相关新闻
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| symbol | Yes | 股票代码/加密货币符号 | |
| limit | No | 返回数量(int) |
Implementation Reference
- mcp_aktools/__init__.py:130-133 (registration)Tool registration via @mcp.tool decorator with title='获取股票/加密货币相关新闻' and description
@mcp.tool( title="获取股票/加密货币相关新闻", description="根据股票代码或加密货币符号获取近期相关新闻", ) - mcp_aktools/__init__.py:134-145 (handler)Main handler function that fetches stock news by symbol, deduplicates, and returns joined news strings up to limit
def stock_news( symbol: str = Field(description="股票代码/加密货币符号"), limit: int = Field(15, description="返回数量(int)", strict=False), ): news = list(dict.fromkeys([ v["新闻内容"] for v in ak_cache(stock_news_em, symbol=symbol, ttl=3600).to_dict(orient="records") if isinstance(v, dict) ])) if news: return "\n".join(news[0:limit]) return f"Not Found for {symbol}" - mcp_aktools/__init__.py:147-170 (helper)Helper function that queries eastmoney.com search API for news articles about the given symbol, returns a DataFrame with sorted news content
def stock_news_em(symbol, limit=20): cbk = "jQuery351013927587392975826_1763361926020" resp = requests.get( "http://search-api-web.eastmoney.com/search/jsonp", headers={ "User-Agent": USER_AGENT, "Referer": f"https://so.eastmoney.com/news/s?keyword={symbol}", }, params={ "cb": cbk, "param": '{"uid":"",' f'"keyword":"{symbol}",' '"type":["cmsArticleWebOld"],"client":"web","clientType":"web","clientVersion":"curr",' '"param":{"cmsArticleWebOld":{"searchScope":"default","sort":"default","pageIndex":1,"pageSize":10,' '"preTag":"<em>","postTag":"</em>"}}}', }, ) text = resp.text.replace(cbk, "").strip().strip("()") data = json.loads(text) or {} dfs = pd.DataFrame(data.get("result", {}).get("cmsArticleWebOld") or []) dfs.sort_values("date", ascending=False, inplace=True) dfs = dfs.head(limit) dfs["新闻内容"] = dfs["content"].str.replace(r"</?em>", "", regex=True) return dfs - mcp_aktools/__init__.py:564-579 (helper)Caching utility ak_cache that wraps function calls with TTL-based disk and memory caching, used to cache stock_news_em results with ttl=3600
def ak_cache(fun, *args, **kwargs) -> pd.DataFrame | None: key = kwargs.pop("key", None) if not key: key = f"{fun.__name__}-{args}-{kwargs}" ttl1 = kwargs.pop("ttl", 86400) ttl2 = kwargs.pop("ttl2", None) cache = CacheKey.init(key, ttl1, ttl2) all = cache.get() if all is None: try: _LOGGER.info("Request akshare: %s", [key, args, kwargs]) all = fun(*args, **kwargs) cache.set(all) except Exception as exc: _LOGGER.exception(str(exc)) return all - mcp_aktools/__init__.py:135-137 (schema)Input schema: symbol (str, stock/crypto code) and limit (int, default 15) via pydantic Field
symbol: str = Field(description="股票代码/加密货币符号"), limit: int = Field(15, description="返回数量(int)", strict=False), ):