hz_fetch_items
Fetch and deduplicate content from specified sources, then write results to the raw processing stage for analysis.
Instructions
抓取并去重内容,写入 run 的 raw 阶段。
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hours | No | ||
| run_id | No | ||
| horizon_path | No | ||
| config_path | No | ||
| sources | No |
Implementation Reference
- horizon_mcp/service.py:199-247 (handler)The `fetch_items` method in `HorizonPipelineService` handles the logic for fetching items from sources, merging them, and saving the raw data.
async def fetch_items( self, hours: int = 24, run_id: str | None = None, horizon_path: str | None = None, config_path: str | None = None, sources: list[str] | None = None, ) -> dict[str, Any]: if hours <= 0: raise HorizonMcpError(code="HZ_INVALID_INPUT", message="hours 必须大于 0。") ctx, selected_sources, unknown_sources = self._build_context( horizon_path=horizon_path, config_path=config_path, sources=sources, ) storage = make_storage(ctx.runtime, ctx.config_path) orchestrator = make_orchestrator(ctx.runtime, ctx.config, storage) run_id = self.run_store.create_run(run_id) since = datetime.now(timezone.utc) - timedelta(hours=hours) raw_items = await orchestrator._fetch_all_sources(since) merged_items = orchestrator._merge_cross_source_duplicates(raw_items) self.run_store.save_items(run_id, "raw", items_to_dicts(merged_items)) meta = self.run_store.update_meta( run_id, { "horizon_path": str(ctx.horizon_path), "config_path": str(ctx.config_path), "hours": hours, "since": since.isoformat(), "source_selection": selected_sources, "unknown_sources": unknown_sources, "raw_count_before_merge": len(raw_items), "raw_count": len(merged_items), }, ) return { "run_id": run_id, "fetched": len(merged_items), "raw_before_merge": len(raw_items), "source_counts": get_source_counts(merged_items), "artifact": str((self.run_store.run_dir(run_id) / "raw_items.json").resolve()), "meta": meta, } - horizon_mcp/server.py:151-170 (registration)The `hz_fetch_items` MCP tool is registered here and calls `service.fetch_items`.
@mcp.tool() async def hz_fetch_items( hours: int = 24, run_id: str | None = None, horizon_path: str | None = None, config_path: str | None = None, sources: list[str] | None = None, ) -> dict[str, Any]: """抓取并去重内容,写入 run 的 raw 阶段。""" return await _run_tool( "hz_fetch_items", lambda: service.fetch_items( hours=hours, run_id=run_id, horizon_path=horizon_path, config_path=config_path, sources=sources, ), )