Skip to main content
Glama

hz_fetch_items

Fetch and deduplicate content from specified sources, then write results to the raw processing stage for analysis.

Instructions

抓取并去重内容,写入 run 的 raw 阶段。

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hoursNo
run_idNo
horizon_pathNo
config_pathNo
sourcesNo

Implementation Reference

  • The `fetch_items` method in `HorizonPipelineService` handles the logic for fetching items from sources, merging them, and saving the raw data.
    async def fetch_items(
        self,
        hours: int = 24,
        run_id: str | None = None,
        horizon_path: str | None = None,
        config_path: str | None = None,
        sources: list[str] | None = None,
    ) -> dict[str, Any]:
        if hours <= 0:
            raise HorizonMcpError(code="HZ_INVALID_INPUT", message="hours 必须大于 0。")
    
        ctx, selected_sources, unknown_sources = self._build_context(
            horizon_path=horizon_path,
            config_path=config_path,
            sources=sources,
        )
    
        storage = make_storage(ctx.runtime, ctx.config_path)
        orchestrator = make_orchestrator(ctx.runtime, ctx.config, storage)
    
        run_id = self.run_store.create_run(run_id)
        since = datetime.now(timezone.utc) - timedelta(hours=hours)
    
        raw_items = await orchestrator._fetch_all_sources(since)
        merged_items = orchestrator._merge_cross_source_duplicates(raw_items)
    
        self.run_store.save_items(run_id, "raw", items_to_dicts(merged_items))
        meta = self.run_store.update_meta(
            run_id,
            {
                "horizon_path": str(ctx.horizon_path),
                "config_path": str(ctx.config_path),
                "hours": hours,
                "since": since.isoformat(),
                "source_selection": selected_sources,
                "unknown_sources": unknown_sources,
                "raw_count_before_merge": len(raw_items),
                "raw_count": len(merged_items),
            },
        )
    
        return {
            "run_id": run_id,
            "fetched": len(merged_items),
            "raw_before_merge": len(raw_items),
            "source_counts": get_source_counts(merged_items),
            "artifact": str((self.run_store.run_dir(run_id) / "raw_items.json").resolve()),
            "meta": meta,
        }
  • The `hz_fetch_items` MCP tool is registered here and calls `service.fetch_items`.
    @mcp.tool()
    async def hz_fetch_items(
        hours: int = 24,
        run_id: str | None = None,
        horizon_path: str | None = None,
        config_path: str | None = None,
        sources: list[str] | None = None,
    ) -> dict[str, Any]:
        """抓取并去重内容,写入 run 的 raw 阶段。"""
    
        return await _run_tool(
            "hz_fetch_items",
            lambda: service.fetch_items(
                hours=hours,
                run_id=run_id,
                horizon_path=horizon_path,
                config_path=config_path,
                sources=sources,
            ),
        )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/henry-insomniac/Horizon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server