#!/usr/bin/python
# coding: utf-8
import os
import argparse
import logging
import uvicorn
from contextlib import asynccontextmanager
from typing import Optional, Any
import json
from fastmcp import Client
from pydantic_ai import Agent, ModelSettings
from pydantic_ai.mcp import load_mcp_servers
from pydantic_ai.toolsets.fastmcp import FastMCPToolset
from pydantic_ai_skills import SkillsToolset
from fasta2a import Skill
from archivebox_api.utils import (
to_boolean,
to_integer,
to_float,
to_list,
to_dict,
get_skills_path,
get_mcp_config_path,
load_skills_from_directory,
create_model,
prune_large_messages,
)
from fastapi import FastAPI, Request
from starlette.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.ag_ui import AGUIAdapter
__version__ = "0.0.20"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
logging.getLogger("pydantic_ai").setLevel(logging.INFO)
logging.getLogger("fastmcp").setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# Default Configuration
DEFAULT_HOST = os.getenv("HOST", "0.0.0.0")
DEFAULT_PORT = to_integer(os.getenv("PORT", "9000"))
DEFAULT_DEBUG = to_boolean(os.getenv("DEBUG", "False"))
DEFAULT_PROVIDER = os.getenv("PROVIDER", "openai")
DEFAULT_MODEL_ID = os.getenv("MODEL_ID", "qwen/qwen3-4b-2507")
DEFAULT_OPENAI_BASE_URL = os.getenv(
"OPENAI_BASE_URL", "http://host.docker.internal:1234/v1"
)
DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "ollama")
DEFAULT_MCP_URL = os.getenv("MCP_URL", None)
DEFAULT_MCP_CONFIG = os.getenv("MCP_CONFIG", get_mcp_config_path())
# Calculate default skills directory relative to this file
DEFAULT_SKILLS_DIRECTORY = os.getenv("SKILLS_DIRECTORY", get_skills_path())
DEFAULT_ENABLE_WEB_UI = to_boolean(os.getenv("ENABLE_WEB_UI", "False"))
# Model Settings
DEFAULT_MAX_TOKENS = to_integer(os.getenv("MAX_TOKENS", "8192"))
DEFAULT_TEMPERATURE = to_float(os.getenv("TEMPERATURE", "0.7"))
DEFAULT_TOP_P = to_float(os.getenv("TOP_P", "1.0"))
DEFAULT_TIMEOUT = to_float(os.getenv("TIMEOUT", "32400.0"))
DEFAULT_TOOL_TIMEOUT = to_float(os.getenv("TOOL_TIMEOUT", "32400.0"))
DEFAULT_PARALLEL_TOOL_CALLS = to_boolean(os.getenv("PARALLEL_TOOL_CALLS", "True"))
DEFAULT_SEED = to_integer(os.getenv("SEED", None))
DEFAULT_PRESENCE_PENALTY = to_float(os.getenv("PRESENCE_PENALTY", "0.0"))
DEFAULT_FREQUENCY_PENALTY = to_float(os.getenv("FREQUENCY_PENALTY", "0.0"))
DEFAULT_LOGIT_BIAS = to_dict(os.getenv("LOGIT_BIAS", None))
DEFAULT_STOP_SEQUENCES = to_list(os.getenv("STOP_SEQUENCES", None))
DEFAULT_EXTRA_HEADERS = to_dict(os.getenv("EXTRA_HEADERS", None))
DEFAULT_EXTRA_BODY = to_dict(os.getenv("EXTRA_BODY", None))
AGENT_NAME = "ArchiveBox Agent"
AGENT_DESCRIPTION = (
"An intelligent agent for managing ArchiveBox snapshots and archives."
)
AGENT_SYSTEM_PROMPT = (
"You are an ArchiveBox Expert Agent.\n"
"You are an expert in web archiving and managing ArchiveBox instances.\n"
"You have access to tools for adding URLs, updating archives, listing snapshots, and managing the archive collection.\n"
"Your responsibilities:\n"
"1. Analyze the user's request regarding ArchiveBox.\n"
"2. Use the available skills and tools to interact with ArchiveBox.\n"
"3. If a task requires multiple steps (e.g., adding a URL then verifying it), orchestrate them sequentially.\n"
"4. Always be warm, professional, and helpful.\n"
"5. Explain your plan in detail before executing.\n"
)
def create_agent(
provider: str = DEFAULT_PROVIDER,
model_id: str = DEFAULT_MODEL_ID,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
mcp_url: str = DEFAULT_MCP_URL,
mcp_config: str = DEFAULT_MCP_CONFIG,
skills_directory: Optional[str] = DEFAULT_SKILLS_DIRECTORY,
) -> Agent:
agent_toolsets = []
if mcp_config and os.path.exists(mcp_config):
mcp_toolset = load_mcp_servers(mcp_config)
agent_toolsets.extend(mcp_toolset)
logger.info(f"Connected to MCP Config JSON: {mcp_toolset}")
elif mcp_url:
fastmcp_toolset = FastMCPToolset(Client[Any](mcp_url, timeout=3600))
agent_toolsets.append(fastmcp_toolset)
logger.info(f"Connected to MCP Server: {mcp_url}")
if skills_directory and os.path.exists(skills_directory):
logger.debug(f"Loading skills {skills_directory}")
skills = SkillsToolset(directories=[str(skills_directory)])
agent_toolsets.append(skills)
logger.info(f"Loaded Skills at {skills_directory}")
# Create the Model
model = create_model(provider, model_id, base_url, api_key)
logger.info("Initializing Agent...")
settings = ModelSettings(
max_tokens=DEFAULT_MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE,
top_p=DEFAULT_TOP_P,
timeout=DEFAULT_TIMEOUT,
parallel_tool_calls=DEFAULT_PARALLEL_TOOL_CALLS,
seed=DEFAULT_SEED,
presence_penalty=DEFAULT_PRESENCE_PENALTY,
frequency_penalty=DEFAULT_FREQUENCY_PENALTY,
logit_bias=DEFAULT_LOGIT_BIAS,
stop_sequences=DEFAULT_STOP_SEQUENCES,
extra_headers=DEFAULT_EXTRA_HEADERS,
extra_body=DEFAULT_EXTRA_BODY,
)
return Agent(
name=AGENT_NAME,
system_prompt=AGENT_SYSTEM_PROMPT,
model=model,
model_settings=settings,
toolsets=agent_toolsets,
tool_timeout=DEFAULT_TOOL_TIMEOUT,
deps_type=Any,
)
def create_agent_server(
provider: str = DEFAULT_PROVIDER,
model_id: str = DEFAULT_MODEL_ID,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
mcp_url: str = DEFAULT_MCP_URL,
mcp_config: str = DEFAULT_MCP_CONFIG,
skills_directory: Optional[str] = DEFAULT_SKILLS_DIRECTORY,
debug: Optional[bool] = DEFAULT_DEBUG,
host: Optional[str] = DEFAULT_HOST,
port: Optional[int] = DEFAULT_PORT,
enable_web_ui: bool = DEFAULT_ENABLE_WEB_UI,
):
print(
f"Starting {AGENT_NAME} with provider={provider}, model={model_id}, mcp={mcp_url} | {mcp_config}"
)
agent = create_agent(
provider=provider,
model_id=model_id,
base_url=base_url,
api_key=api_key,
mcp_url=mcp_url,
mcp_config=mcp_config,
skills_directory=skills_directory,
)
# Define Skills for Agent Card
if skills_directory and os.path.exists(skills_directory):
skills = load_skills_from_directory(skills_directory)
logger.info(f"Loaded {len(skills)} skills from {skills_directory}")
else:
# Fallback if no skills directory
skills = [
Skill(
id="archivebox_agent",
name="ArchiveBox Agent",
description="General access to ArchiveBox tools",
tags=["archivebox"],
input_modes=["text"],
output_modes=["text"],
)
]
# Create A2A app explicitly before main app to bind lifespan
a2a_app = agent.to_a2a(
name=AGENT_NAME,
description=AGENT_DESCRIPTION,
version=__version__,
skills=skills,
debug=debug,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Trigger A2A (sub-app) startup/shutdown events
# This is critical for TaskManager initialization in A2A
if hasattr(a2a_app, "router"):
async with a2a_app.router.lifespan_context(a2a_app):
yield
else:
yield
# Create main FastAPI app
app = FastAPI(
title=f"{AGENT_NAME} - A2A + AG-UI Server",
description=AGENT_DESCRIPTION,
debug=debug,
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
return {"status": "OK"}
# Mount A2A as sub-app at /a2a
app.mount("/a2a", a2a_app)
# Add AG-UI endpoint (POST to /ag-ui)
@app.post("/ag-ui")
async def ag_ui_endpoint(request: Request) -> Response:
accept = request.headers.get("accept", SSE_CONTENT_TYPE)
try:
# Parse incoming AG-UI RunAgentInput from request body
run_input = AGUIAdapter.build_run_input(await request.body())
except ValidationError as e:
return Response(
content=json.dumps(e.json()),
media_type="application/json",
status_code=422,
)
# Prune large messages from history
if hasattr(run_input, "messages"):
run_input.messages = prune_large_messages(run_input.messages)
# Create adapter and run the agent → stream AG-UI events
adapter = AGUIAdapter(agent=agent, run_input=run_input, accept=accept)
event_stream = adapter.run_stream() # Runs agent, yields events
sse_stream = adapter.encode_stream(event_stream) # Encodes to SSE
return StreamingResponse(
sse_stream,
media_type=accept,
)
# Mount Web UI if enabled
if enable_web_ui:
web_ui = agent.to_web(instructions=AGENT_SYSTEM_PROMPT)
app.mount("/", web_ui)
logger.info(
"Starting server on %s:%s (A2A at /a2a, AG-UI at /ag-ui, Web UI: %s)",
host,
port,
"Enabled at /" if enable_web_ui else "Disabled",
)
uvicorn.run(
app,
host=host,
port=port,
timeout_keep_alive=1800, # 30 minute timeout
timeout_graceful_shutdown=60,
log_level="debug" if debug else "info",
)
def agent_server():
print(f"archivebox_agent v{__version__}")
parser = argparse.ArgumentParser(
description=f"Run the {AGENT_NAME} A2A + AG-UI Server"
)
parser.add_argument(
"--host", default=DEFAULT_HOST, help="Host to bind the server to"
)
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Port to bind the server to"
)
parser.add_argument("--debug", type=bool, default=DEFAULT_DEBUG, help="Debug mode")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload")
parser.add_argument(
"--provider",
default=DEFAULT_PROVIDER,
choices=["openai", "anthropic", "google", "huggingface"],
help="LLM Provider",
)
parser.add_argument("--model-id", default=DEFAULT_MODEL_ID, help="LLM Model ID")
parser.add_argument(
"--base-url",
default=DEFAULT_OPENAI_BASE_URL,
help="LLM Base URL (for OpenAI compatible providers)",
)
parser.add_argument("--api-key", default=DEFAULT_OPENAI_API_KEY, help="LLM API Key")
parser.add_argument("--mcp-url", default=DEFAULT_MCP_URL, help="MCP Server URL")
parser.add_argument(
"--mcp-config", default=DEFAULT_MCP_CONFIG, help="MCP Server Config"
)
parser.add_argument(
"--skills-directory",
default=DEFAULT_SKILLS_DIRECTORY,
help="Directory containing agent skills",
)
parser.add_argument(
"--web",
action="store_true",
default=DEFAULT_ENABLE_WEB_UI,
help="Enable Pydantic AI Web UI",
)
args = parser.parse_args()
if args.debug:
# Force reconfiguration of logging
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
force=True,
)
logging.getLogger("pydantic_ai").setLevel(logging.DEBUG)
logging.getLogger("fastmcp").setLevel(logging.DEBUG)
logging.getLogger("httpcore").setLevel(logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Debug mode enabled")
# Create the agent with CLI args
# Create the agent with CLI args
create_agent_server(
provider=args.provider,
model_id=args.model_id,
base_url=args.base_url,
api_key=args.api_key,
mcp_url=args.mcp_url,
mcp_config=args.mcp_config,
skills_directory=args.skills_directory,
debug=args.debug,
host=args.host,
port=args.port,
enable_web_ui=args.web,
)
if __name__ == "__main__":
agent_server()