"""Apify API client."""
import httpx
from typing import Any
class ApifyClient:
"""Client for interacting with the Apify API."""
BASE_URL = "https://api.apify.com/v2"
def __init__(self, token: str):
self.token = token
self._client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
async def close(self):
"""Close the HTTP client."""
await self._client.aclose()
async def _request(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make an API request."""
response = await self._client.request(method, path, params=params, json=json)
response.raise_for_status()
return response.json()
# ==================== ACTORS ====================
async def list_actors(
self,
offset: int = 0,
limit: int = 20,
my: bool = True,
) -> dict[str, Any]:
"""Get list of actors.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
my: If True, only return actors created by the user.
"""
params = {"offset": offset, "limit": limit, "my": my}
return await self._request("GET", "/acts", params=params)
async def get_actor(self, actor_id: str) -> dict[str, Any]:
"""Get details of a specific actor.
Args:
actor_id: Actor ID or username~actor-name.
"""
return await self._request("GET", f"/acts/{actor_id}")
# ==================== ACTOR RUNS ====================
async def list_actor_runs(
self,
actor_id: str,
offset: int = 0,
limit: int = 20,
status: str | None = None,
) -> dict[str, Any]:
"""Get list of runs for an actor.
Args:
actor_id: Actor ID or username~actor-name.
offset: Number of records to skip.
limit: Maximum number of records to return.
status: Filter by status (READY, RUNNING, SUCCEEDED, FAILED, TIMING-OUT, TIMED-OUT, ABORTING, ABORTED).
"""
params = {"offset": offset, "limit": limit}
if status:
params["status"] = status
return await self._request("GET", f"/acts/{actor_id}/runs", params=params)
async def get_run(self, run_id: str, wait_for_finish: int | None = None) -> dict[str, Any]:
"""Get details of a specific run.
Args:
run_id: The run ID.
wait_for_finish: If provided, wait up to this many seconds for the run to finish.
"""
params = {}
if wait_for_finish is not None:
params["waitForFinish"] = wait_for_finish
return await self._request("GET", f"/actor-runs/{run_id}", params=params or None)
async def get_actor_run(
self, actor_id: str, run_id: str, wait_for_finish: int | None = None
) -> dict[str, Any]:
"""Get details of a specific actor run.
Args:
actor_id: Actor ID or username~actor-name.
run_id: The run ID.
wait_for_finish: If provided, wait up to this many seconds for the run to finish.
"""
params = {}
if wait_for_finish is not None:
params["waitForFinish"] = wait_for_finish
return await self._request(
"GET", f"/acts/{actor_id}/runs/{run_id}", params=params or None
)
async def get_last_run(self, actor_id: str, status: str | None = None) -> dict[str, Any]:
"""Get the last run of an actor.
Args:
actor_id: Actor ID or username~actor-name.
status: Filter by status.
"""
params = {}
if status:
params["status"] = status
return await self._request("GET", f"/acts/{actor_id}/runs/last", params=params or None)
async def run_actor(
self,
actor_id: str,
input_data: dict[str, Any] | None = None,
memory_mbytes: int | None = None,
timeout_secs: int | None = None,
build: str | None = None,
wait_for_finish: int | None = None,
) -> dict[str, Any]:
"""Run an actor.
Args:
actor_id: Actor ID or username~actor-name.
input_data: Input data for the actor.
memory_mbytes: Memory limit in MB.
timeout_secs: Timeout in seconds.
build: Build tag or number to run.
wait_for_finish: If provided, wait up to this many seconds for the run to finish.
"""
params = {}
if memory_mbytes:
params["memory"] = memory_mbytes
if timeout_secs:
params["timeout"] = timeout_secs
if build:
params["build"] = build
if wait_for_finish is not None:
params["waitForFinish"] = wait_for_finish
return await self._request(
"POST", f"/acts/{actor_id}/runs", params=params or None, json=input_data
)
async def abort_run(self, run_id: str, gracefully: bool = False) -> dict[str, Any]:
"""Abort a running actor.
Args:
run_id: The run ID.
gracefully: If True, the actor will have some time to finish gracefully.
"""
params = {"gracefully": gracefully} if gracefully else None
return await self._request("POST", f"/actor-runs/{run_id}/abort", params=params)
async def resurrect_run(self, run_id: str) -> dict[str, Any]:
"""Resurrect a finished run.
Args:
run_id: The run ID.
"""
return await self._request("POST", f"/actor-runs/{run_id}/resurrect")
async def get_run_log(self, run_id: str) -> str:
"""Get the log of a run.
Args:
run_id: The run ID.
"""
response = await self._client.get(f"/actor-runs/{run_id}/log")
response.raise_for_status()
return response.text
# ==================== USER RUNS ====================
async def list_user_runs(
self,
offset: int = 0,
limit: int = 20,
status: str | None = None,
desc: bool = True,
) -> dict[str, Any]:
"""Get list of all runs for the current user.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
status: Filter by status.
desc: Sort in descending order (newest first).
"""
params = {"offset": offset, "limit": limit, "desc": desc}
if status:
params["status"] = status
return await self._request("GET", "/actor-runs", params=params)
# ==================== TASKS ====================
async def list_tasks(
self,
offset: int = 0,
limit: int = 20,
) -> dict[str, Any]:
"""Get list of tasks.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
"""
params = {"offset": offset, "limit": limit}
return await self._request("GET", "/actor-tasks", params=params)
async def get_task(self, task_id: str) -> dict[str, Any]:
"""Get details of a specific task.
Args:
task_id: Task ID or username~task-name.
"""
return await self._request("GET", f"/actor-tasks/{task_id}")
async def run_task(
self,
task_id: str,
input_data: dict[str, Any] | None = None,
wait_for_finish: int | None = None,
) -> dict[str, Any]:
"""Run a task.
Args:
task_id: Task ID or username~task-name.
input_data: Input data override for the task.
wait_for_finish: If provided, wait up to this many seconds for the run to finish.
"""
params = {}
if wait_for_finish is not None:
params["waitForFinish"] = wait_for_finish
return await self._request(
"POST", f"/actor-tasks/{task_id}/runs", params=params or None, json=input_data
)
async def list_task_runs(
self,
task_id: str,
offset: int = 0,
limit: int = 20,
status: str | None = None,
) -> dict[str, Any]:
"""Get list of runs for a task.
Args:
task_id: Task ID or username~task-name.
offset: Number of records to skip.
limit: Maximum number of records to return.
status: Filter by status.
"""
params = {"offset": offset, "limit": limit}
if status:
params["status"] = status
return await self._request("GET", f"/actor-tasks/{task_id}/runs", params=params)
async def get_task_last_run(self, task_id: str, status: str | None = None) -> dict[str, Any]:
"""Get the last run of a task.
Args:
task_id: Task ID or username~task-name.
status: Filter by status.
"""
params = {}
if status:
params["status"] = status
return await self._request(
"GET", f"/actor-tasks/{task_id}/runs/last", params=params or None
)
# ==================== DATASETS ====================
async def list_datasets(
self,
offset: int = 0,
limit: int = 20,
unnamed: bool = False,
) -> dict[str, Any]:
"""Get list of datasets.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
unnamed: Include unnamed datasets.
"""
params = {"offset": offset, "limit": limit, "unnamed": unnamed}
return await self._request("GET", "/datasets", params=params)
async def get_dataset(self, dataset_id: str) -> dict[str, Any]:
"""Get details of a specific dataset.
Args:
dataset_id: Dataset ID or username~dataset-name.
"""
return await self._request("GET", f"/datasets/{dataset_id}")
async def get_dataset_items(
self,
dataset_id: str,
offset: int = 0,
limit: int = 100,
clean: bool = False,
fields: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Get items from a dataset.
Args:
dataset_id: Dataset ID or username~dataset-name.
offset: Number of records to skip.
limit: Maximum number of records to return.
clean: If True, remove empty items and hidden fields.
fields: List of fields to include (comma-separated in API).
"""
params = {"offset": offset, "limit": limit, "clean": clean}
if fields:
params["fields"] = ",".join(fields)
return await self._request("GET", f"/datasets/{dataset_id}/items", params=params)
async def get_run_dataset_items(
self,
run_id: str,
offset: int = 0,
limit: int = 100,
clean: bool = False,
fields: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Get items from a run's default dataset.
Args:
run_id: The run ID.
offset: Number of records to skip.
limit: Maximum number of records to return.
clean: If True, remove empty items and hidden fields.
fields: List of fields to include.
"""
params = {"offset": offset, "limit": limit, "clean": clean}
if fields:
params["fields"] = ",".join(fields)
return await self._request("GET", f"/actor-runs/{run_id}/dataset/items", params=params)
# ==================== KEY-VALUE STORES ====================
async def list_key_value_stores(
self,
offset: int = 0,
limit: int = 20,
unnamed: bool = False,
) -> dict[str, Any]:
"""Get list of key-value stores.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
unnamed: Include unnamed stores.
"""
params = {"offset": offset, "limit": limit, "unnamed": unnamed}
return await self._request("GET", "/key-value-stores", params=params)
async def get_key_value_store(self, store_id: str) -> dict[str, Any]:
"""Get details of a specific key-value store.
Args:
store_id: Store ID or username~store-name.
"""
return await self._request("GET", f"/key-value-stores/{store_id}")
async def list_keys(
self,
store_id: str,
limit: int = 100,
exclusive_start_key: str | None = None,
) -> dict[str, Any]:
"""List keys in a key-value store.
Args:
store_id: Store ID or username~store-name.
limit: Maximum number of keys to return.
exclusive_start_key: Key to start after.
"""
params = {"limit": limit}
if exclusive_start_key:
params["exclusiveStartKey"] = exclusive_start_key
return await self._request("GET", f"/key-value-stores/{store_id}/keys", params=params)
async def get_record(self, store_id: str, key: str) -> Any:
"""Get a record from a key-value store.
Args:
store_id: Store ID or username~store-name.
key: The key of the record.
"""
response = await self._client.get(f"/key-value-stores/{store_id}/records/{key}")
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
return response.text
async def get_run_key_value_store_record(self, run_id: str, key: str) -> Any:
"""Get a record from a run's default key-value store.
Args:
run_id: The run ID.
key: The key of the record (e.g., 'OUTPUT', 'INPUT').
"""
response = await self._client.get(f"/actor-runs/{run_id}/key-value-store/records/{key}")
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
return response.text
# ==================== SCHEDULES ====================
async def list_schedules(
self,
offset: int = 0,
limit: int = 20,
) -> dict[str, Any]:
"""Get list of schedules.
Args:
offset: Number of records to skip.
limit: Maximum number of records to return.
"""
params = {"offset": offset, "limit": limit}
return await self._request("GET", "/schedules", params=params)
async def get_schedule(self, schedule_id: str) -> dict[str, Any]:
"""Get details of a specific schedule.
Args:
schedule_id: The schedule ID.
"""
return await self._request("GET", f"/schedules/{schedule_id}")
async def get_schedule_log(self, schedule_id: str) -> dict[str, Any]:
"""Get the log of a schedule.
Args:
schedule_id: The schedule ID.
"""
return await self._request("GET", f"/schedules/{schedule_id}/log")
# ==================== USER INFO ====================
async def get_user_info(self) -> dict[str, Any]:
"""Get information about the current user."""
return await self._request("GET", "/users/me")