"""MCP server for Apify API."""
import json
import os
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .client import ApifyClient
# Initialize server
server = Server("mcp-apify")
# Client instance (initialized on first use)
_client: ApifyClient | None = None
def get_client() -> ApifyClient:
"""Get or create the Apify client."""
global _client
if _client is None:
token = os.environ.get("APIFY_API_TOKEN")
if not token:
raise ValueError(
"APIFY_API_TOKEN environment variable is required. "
"Get your token from https://console.apify.com/settings/integrations"
)
_client = ApifyClient(token)
return _client
def format_result(data: Any) -> str:
"""Format API result as JSON string."""
return json.dumps(data, indent=2, default=str)
# ==================== TOOL DEFINITIONS ====================
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
# User
Tool(
name="get_user_info",
description="Get information about the current authenticated user",
inputSchema={"type": "object", "properties": {}, "required": []},
),
# Actors
Tool(
name="list_actors",
description="Get list of actors. Returns actors created or used by the user.",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"my": {
"type": "boolean",
"description": "If true, only return actors created by the user (default: true)",
"default": True,
},
},
"required": [],
},
),
Tool(
name="get_actor",
description="Get details of a specific actor including versions, builds, and runs info.",
inputSchema={
"type": "object",
"properties": {
"actor_id": {
"type": "string",
"description": "Actor ID or username~actor-name (e.g., 'apify~web-scraper')",
},
},
"required": ["actor_id"],
},
),
# Runs
Tool(
name="list_actor_runs",
description="Get list of runs for a specific actor.",
inputSchema={
"type": "object",
"properties": {
"actor_id": {
"type": "string",
"description": "Actor ID or username~actor-name",
},
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"status": {
"type": "string",
"description": "Filter by status: READY, RUNNING, SUCCEEDED, FAILED, TIMING-OUT, TIMED-OUT, ABORTING, ABORTED",
"enum": [
"READY",
"RUNNING",
"SUCCEEDED",
"FAILED",
"TIMING-OUT",
"TIMED-OUT",
"ABORTING",
"ABORTED",
],
},
},
"required": ["actor_id"],
},
),
Tool(
name="list_user_runs",
description="Get list of all runs for the current user across all actors.",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"status": {
"type": "string",
"description": "Filter by status: READY, RUNNING, SUCCEEDED, FAILED, TIMING-OUT, TIMED-OUT, ABORTING, ABORTED",
"enum": [
"READY",
"RUNNING",
"SUCCEEDED",
"FAILED",
"TIMING-OUT",
"TIMED-OUT",
"ABORTING",
"ABORTED",
],
},
"desc": {
"type": "boolean",
"description": "Sort in descending order, newest first (default: true)",
"default": True,
},
},
"required": [],
},
),
Tool(
name="get_run",
description="Get details of a specific run including status, stats, and storage IDs.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID",
},
"wait_for_finish": {
"type": "integer",
"description": "Wait up to this many seconds for the run to finish (max 300)",
},
},
"required": ["run_id"],
},
),
Tool(
name="get_last_run",
description="Get the last run of an actor.",
inputSchema={
"type": "object",
"properties": {
"actor_id": {
"type": "string",
"description": "Actor ID or username~actor-name",
},
"status": {
"type": "string",
"description": "Filter by status to get the last run with this status",
"enum": [
"READY",
"RUNNING",
"SUCCEEDED",
"FAILED",
"TIMING-OUT",
"TIMED-OUT",
"ABORTING",
"ABORTED",
],
},
},
"required": ["actor_id"],
},
),
Tool(
name="run_actor",
description="Start a new run of an actor with optional input and configuration.",
inputSchema={
"type": "object",
"properties": {
"actor_id": {
"type": "string",
"description": "Actor ID or username~actor-name",
},
"input_data": {
"type": "object",
"description": "Input data for the actor (JSON object)",
},
"memory_mbytes": {
"type": "integer",
"description": "Memory limit in MB (128, 256, 512, 1024, 2048, 4096, etc.)",
},
"timeout_secs": {
"type": "integer",
"description": "Timeout in seconds",
},
"build": {
"type": "string",
"description": "Build tag or number to run (default: latest)",
},
"wait_for_finish": {
"type": "integer",
"description": "Wait up to this many seconds for the run to finish (max 300)",
},
},
"required": ["actor_id"],
},
),
Tool(
name="abort_run",
description="Abort a running actor execution.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID to abort",
},
"gracefully": {
"type": "boolean",
"description": "If true, the actor will have some time to finish gracefully (default: false)",
"default": False,
},
},
"required": ["run_id"],
},
),
Tool(
name="resurrect_run",
description="Resurrect a finished run to continue processing.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID to resurrect",
},
},
"required": ["run_id"],
},
),
Tool(
name="get_run_log",
description="Get the log output of a run.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID",
},
},
"required": ["run_id"],
},
),
# Tasks
Tool(
name="list_tasks",
description="Get list of actor tasks (saved actor configurations).",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
},
"required": [],
},
),
Tool(
name="get_task",
description="Get details of a specific task including its input configuration.",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID or username~task-name",
},
},
"required": ["task_id"],
},
),
Tool(
name="run_task",
description="Run a task with optional input override.",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID or username~task-name",
},
"input_data": {
"type": "object",
"description": "Input data override (merged with task's saved input)",
},
"wait_for_finish": {
"type": "integer",
"description": "Wait up to this many seconds for the run to finish (max 300)",
},
},
"required": ["task_id"],
},
),
Tool(
name="list_task_runs",
description="Get list of runs for a specific task.",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID or username~task-name",
},
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"status": {
"type": "string",
"description": "Filter by status",
"enum": [
"READY",
"RUNNING",
"SUCCEEDED",
"FAILED",
"TIMING-OUT",
"TIMED-OUT",
"ABORTING",
"ABORTED",
],
},
},
"required": ["task_id"],
},
),
Tool(
name="get_task_last_run",
description="Get the last run of a task.",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID or username~task-name",
},
"status": {
"type": "string",
"description": "Filter by status to get the last run with this status",
"enum": [
"READY",
"RUNNING",
"SUCCEEDED",
"FAILED",
"TIMING-OUT",
"TIMED-OUT",
"ABORTING",
"ABORTED",
],
},
},
"required": ["task_id"],
},
),
# Datasets
Tool(
name="list_datasets",
description="Get list of datasets.",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"unnamed": {
"type": "boolean",
"description": "Include unnamed datasets (default: false)",
"default": False,
},
},
"required": [],
},
),
Tool(
name="get_dataset",
description="Get details of a specific dataset.",
inputSchema={
"type": "object",
"properties": {
"dataset_id": {
"type": "string",
"description": "Dataset ID or username~dataset-name",
},
},
"required": ["dataset_id"],
},
),
Tool(
name="get_dataset_items",
description="Get items from a dataset.",
inputSchema={
"type": "object",
"properties": {
"dataset_id": {
"type": "string",
"description": "Dataset ID or username~dataset-name",
},
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 100)",
"default": 100,
},
"clean": {
"type": "boolean",
"description": "Remove empty items and hidden fields (default: false)",
"default": False,
},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "List of fields to include in the response",
},
},
"required": ["dataset_id"],
},
),
Tool(
name="get_run_dataset_items",
description="Get items from a run's default dataset.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID",
},
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 100)",
"default": 100,
},
"clean": {
"type": "boolean",
"description": "Remove empty items and hidden fields (default: false)",
"default": False,
},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "List of fields to include in the response",
},
},
"required": ["run_id"],
},
),
# Key-Value Stores
Tool(
name="list_key_value_stores",
description="Get list of key-value stores.",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
"unnamed": {
"type": "boolean",
"description": "Include unnamed stores (default: false)",
"default": False,
},
},
"required": [],
},
),
Tool(
name="get_key_value_store",
description="Get details of a specific key-value store.",
inputSchema={
"type": "object",
"properties": {
"store_id": {
"type": "string",
"description": "Store ID or username~store-name",
},
},
"required": ["store_id"],
},
),
Tool(
name="list_keys",
description="List keys in a key-value store.",
inputSchema={
"type": "object",
"properties": {
"store_id": {
"type": "string",
"description": "Store ID or username~store-name",
},
"limit": {
"type": "integer",
"description": "Maximum number of keys to return (default: 100)",
"default": 100,
},
"exclusive_start_key": {
"type": "string",
"description": "Key to start after (for pagination)",
},
},
"required": ["store_id"],
},
),
Tool(
name="get_record",
description="Get a record from a key-value store.",
inputSchema={
"type": "object",
"properties": {
"store_id": {
"type": "string",
"description": "Store ID or username~store-name",
},
"key": {
"type": "string",
"description": "The key of the record",
},
},
"required": ["store_id", "key"],
},
),
Tool(
name="get_run_output",
description="Get the OUTPUT record from a run's default key-value store.",
inputSchema={
"type": "object",
"properties": {
"run_id": {
"type": "string",
"description": "The run ID",
},
},
"required": ["run_id"],
},
),
# Schedules
Tool(
name="list_schedules",
description="Get list of schedules for automatic actor/task execution.",
inputSchema={
"type": "object",
"properties": {
"offset": {
"type": "integer",
"description": "Number of records to skip (default: 0)",
"default": 0,
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return (default: 20)",
"default": 20,
},
},
"required": [],
},
),
Tool(
name="get_schedule",
description="Get details of a specific schedule.",
inputSchema={
"type": "object",
"properties": {
"schedule_id": {
"type": "string",
"description": "The schedule ID",
},
},
"required": ["schedule_id"],
},
),
Tool(
name="get_schedule_log",
description="Get the execution log of a schedule.",
inputSchema={
"type": "object",
"properties": {
"schedule_id": {
"type": "string",
"description": "The schedule ID",
},
},
"required": ["schedule_id"],
},
),
]
# ==================== TOOL HANDLERS ====================
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
client = get_client()
try:
result: Any = None
# User
if name == "get_user_info":
result = await client.get_user_info()
# Actors
elif name == "list_actors":
result = await client.list_actors(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
my=arguments.get("my", True),
)
elif name == "get_actor":
result = await client.get_actor(arguments["actor_id"])
# Runs
elif name == "list_actor_runs":
result = await client.list_actor_runs(
actor_id=arguments["actor_id"],
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
status=arguments.get("status"),
)
elif name == "list_user_runs":
result = await client.list_user_runs(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
status=arguments.get("status"),
desc=arguments.get("desc", True),
)
elif name == "get_run":
result = await client.get_run(
run_id=arguments["run_id"],
wait_for_finish=arguments.get("wait_for_finish"),
)
elif name == "get_last_run":
result = await client.get_last_run(
actor_id=arguments["actor_id"],
status=arguments.get("status"),
)
elif name == "run_actor":
result = await client.run_actor(
actor_id=arguments["actor_id"],
input_data=arguments.get("input_data"),
memory_mbytes=arguments.get("memory_mbytes"),
timeout_secs=arguments.get("timeout_secs"),
build=arguments.get("build"),
wait_for_finish=arguments.get("wait_for_finish"),
)
elif name == "abort_run":
result = await client.abort_run(
run_id=arguments["run_id"],
gracefully=arguments.get("gracefully", False),
)
elif name == "resurrect_run":
result = await client.resurrect_run(run_id=arguments["run_id"])
elif name == "get_run_log":
log = await client.get_run_log(run_id=arguments["run_id"])
return [TextContent(type="text", text=log)]
# Tasks
elif name == "list_tasks":
result = await client.list_tasks(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
)
elif name == "get_task":
result = await client.get_task(task_id=arguments["task_id"])
elif name == "run_task":
result = await client.run_task(
task_id=arguments["task_id"],
input_data=arguments.get("input_data"),
wait_for_finish=arguments.get("wait_for_finish"),
)
elif name == "list_task_runs":
result = await client.list_task_runs(
task_id=arguments["task_id"],
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
status=arguments.get("status"),
)
elif name == "get_task_last_run":
result = await client.get_task_last_run(
task_id=arguments["task_id"],
status=arguments.get("status"),
)
# Datasets
elif name == "list_datasets":
result = await client.list_datasets(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
unnamed=arguments.get("unnamed", False),
)
elif name == "get_dataset":
result = await client.get_dataset(dataset_id=arguments["dataset_id"])
elif name == "get_dataset_items":
result = await client.get_dataset_items(
dataset_id=arguments["dataset_id"],
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 100),
clean=arguments.get("clean", False),
fields=arguments.get("fields"),
)
elif name == "get_run_dataset_items":
result = await client.get_run_dataset_items(
run_id=arguments["run_id"],
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 100),
clean=arguments.get("clean", False),
fields=arguments.get("fields"),
)
# Key-Value Stores
elif name == "list_key_value_stores":
result = await client.list_key_value_stores(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
unnamed=arguments.get("unnamed", False),
)
elif name == "get_key_value_store":
result = await client.get_key_value_store(store_id=arguments["store_id"])
elif name == "list_keys":
result = await client.list_keys(
store_id=arguments["store_id"],
limit=arguments.get("limit", 100),
exclusive_start_key=arguments.get("exclusive_start_key"),
)
elif name == "get_record":
result = await client.get_record(
store_id=arguments["store_id"],
key=arguments["key"],
)
elif name == "get_run_output":
result = await client.get_run_key_value_store_record(
run_id=arguments["run_id"],
key="OUTPUT",
)
# Schedules
elif name == "list_schedules":
result = await client.list_schedules(
offset=arguments.get("offset", 0),
limit=arguments.get("limit", 20),
)
elif name == "get_schedule":
result = await client.get_schedule(schedule_id=arguments["schedule_id"])
elif name == "get_schedule_log":
result = await client.get_schedule_log(schedule_id=arguments["schedule_id"])
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
return [TextContent(type="text", text=format_result(result))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
def main():
"""Run the MCP server."""
import asyncio
async def run():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
asyncio.run(run())
if __name__ == "__main__":
main()