#!/usr/bin/python
import sys
# coding: utf-8
import os
import argparse
import logging
import uvicorn
import httpx
from contextlib import asynccontextmanager
from typing import Optional, Any
import json
from pydantic_ai import Agent, ModelSettings
from pydantic_ai.mcp import load_mcp_servers, MCPServerStreamableHTTP, MCPServerSSE
from pydantic_ai_skills import SkillsToolset
from fasta2a import Skill
from archivebox_api.utils import (
to_boolean,
to_integer,
to_float,
to_list,
to_dict,
get_skills_path,
get_mcp_config_path,
load_skills_from_directory,
create_model,
prune_large_messages,
)
from fastapi import FastAPI, Request
from starlette.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.ag_ui import AGUIAdapter
__version__ = "0.1.14"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
logging.getLogger("pydantic_ai").setLevel(logging.INFO)
logging.getLogger("fastmcp").setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_HOST = os.getenv("HOST", "0.0.0.0")
DEFAULT_PORT = to_integer(os.getenv("PORT", "9000"))
DEFAULT_DEBUG = to_boolean(os.getenv("DEBUG", "False"))
DEFAULT_PROVIDER = os.getenv("PROVIDER", "openai")
DEFAULT_MODEL_ID = os.getenv("MODEL_ID", "qwen/qwen3-coder-next")
DEFAULT_LLM_BASE_URL = os.getenv("LLM_BASE_URL", "http://host.docker.internal:1234/v1")
DEFAULT_LLM_API_KEY = os.getenv("LLM_API_KEY", "ollama")
DEFAULT_MCP_URL = os.getenv("MCP_URL", None)
DEFAULT_MCP_CONFIG = os.getenv("MCP_CONFIG", get_mcp_config_path())
DEFAULT_CUSTOM_SKILLS_DIRECTORY = os.getenv("CUSTOM_SKILLS_DIRECTORY", None)
DEFAULT_ENABLE_WEB_UI = to_boolean(os.getenv("ENABLE_WEB_UI", "False"))
DEFAULT_SSL_VERIFY = to_boolean(os.getenv("SSL_VERIFY", "True"))
DEFAULT_MAX_TOKENS = to_integer(os.getenv("MAX_TOKENS", "16384"))
DEFAULT_TEMPERATURE = to_float(os.getenv("TEMPERATURE", "0.7"))
DEFAULT_TOP_P = to_float(os.getenv("TOP_P", "1.0"))
DEFAULT_TIMEOUT = to_float(os.getenv("TIMEOUT", "32400.0"))
DEFAULT_TOOL_TIMEOUT = to_float(os.getenv("TOOL_TIMEOUT", "32400.0"))
DEFAULT_PARALLEL_TOOL_CALLS = to_boolean(os.getenv("PARALLEL_TOOL_CALLS", "True"))
DEFAULT_SEED = to_integer(os.getenv("SEED", None))
DEFAULT_PRESENCE_PENALTY = to_float(os.getenv("PRESENCE_PENALTY", "0.0"))
DEFAULT_FREQUENCY_PENALTY = to_float(os.getenv("FREQUENCY_PENALTY", "0.0"))
DEFAULT_LOGIT_BIAS = to_dict(os.getenv("LOGIT_BIAS", None))
DEFAULT_STOP_SEQUENCES = to_list(os.getenv("STOP_SEQUENCES", None))
DEFAULT_EXTRA_HEADERS = to_dict(os.getenv("EXTRA_HEADERS", None))
DEFAULT_EXTRA_BODY = to_dict(os.getenv("EXTRA_BODY", None))
AGENT_NAME = "ArchiveBox Agent"
AGENT_DESCRIPTION = (
"An intelligent agent for managing ArchiveBox snapshots and archives."
)
AGENT_SYSTEM_PROMPT = (
"You are an ArchiveBox Expert Agent.\n"
"You are an expert in web archiving and managing ArchiveBox instances.\n"
"You have access to tools for adding URLs, updating archives, listing snapshots, and managing the archive collection.\n"
"Your responsibilities:\n"
"1. Analyze the user's request regarding ArchiveBox.\n"
"2. Use the available skills and tools to interact with ArchiveBox.\n"
"3. If a task requires multiple steps (e.g., adding a URL then verifying it), orchestrate them sequentially.\n"
"4. Always be warm, professional, and helpful.\n"
"5. Explain your plan in detail before executing.\n"
)
def create_agent(
provider: str = DEFAULT_PROVIDER,
model_id: str = DEFAULT_MODEL_ID,
base_url: Optional[str] = DEFAULT_LLM_BASE_URL,
api_key: Optional[str] = DEFAULT_LLM_API_KEY,
mcp_url: str = DEFAULT_MCP_URL,
mcp_config: str = DEFAULT_MCP_CONFIG,
custom_skills_directory: Optional[str] = DEFAULT_CUSTOM_SKILLS_DIRECTORY,
ssl_verify: bool = DEFAULT_SSL_VERIFY,
) -> Agent:
agent_toolsets = []
if mcp_url:
if "sse" in mcp_url.lower():
server = MCPServerSSE(
mcp_url,
http_client=httpx.AsyncClient(
verify=ssl_verify, timeout=DEFAULT_TIMEOUT
),
)
else:
server = MCPServerStreamableHTTP(
mcp_url,
http_client=httpx.AsyncClient(
verify=ssl_verify, timeout=DEFAULT_TIMEOUT
),
)
agent_toolsets.append(server)
logger.info(f"Connected to MCP Server: {mcp_url}")
elif mcp_config:
mcp_toolset = load_mcp_servers(mcp_config)
for server in mcp_toolset:
if hasattr(server, "http_client"):
server.http_client = httpx.AsyncClient(
verify=ssl_verify, timeout=DEFAULT_TIMEOUT
)
agent_toolsets.extend(mcp_toolset)
logger.info(f"Connected to MCP Config JSON: {mcp_toolset}")
# Always load default skills
skill_dirs = [get_skills_path()]
# Load custom skills if provided
if custom_skills_directory and os.path.exists(custom_skills_directory):
logger.debug(f"Loading custom skills {custom_skills_directory}")
skill_dirs.append(str(custom_skills_directory))
logger.info(f"Loaded Custom Skills at {custom_skills_directory}")
skills = SkillsToolset(directories=skill_dirs)
agent_toolsets.append(skills)
logger.info(f"Loaded Default Skills at {get_skills_path()}")
model = create_model(
provider=provider,
model_id=model_id,
base_url=base_url,
api_key=api_key,
ssl_verify=ssl_verify,
timeout=DEFAULT_TIMEOUT,
)
logger.info("Initializing Agent...")
settings = ModelSettings(
max_tokens=DEFAULT_MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE,
top_p=DEFAULT_TOP_P,
timeout=DEFAULT_TIMEOUT,
parallel_tool_calls=DEFAULT_PARALLEL_TOOL_CALLS,
seed=DEFAULT_SEED,
presence_penalty=DEFAULT_PRESENCE_PENALTY,
frequency_penalty=DEFAULT_FREQUENCY_PENALTY,
logit_bias=DEFAULT_LOGIT_BIAS,
stop_sequences=DEFAULT_STOP_SEQUENCES,
extra_headers=DEFAULT_EXTRA_HEADERS,
extra_body=DEFAULT_EXTRA_BODY,
)
return Agent(
name=AGENT_NAME,
system_prompt=AGENT_SYSTEM_PROMPT,
model=model,
model_settings=settings,
toolsets=agent_toolsets,
tool_timeout=DEFAULT_TOOL_TIMEOUT,
deps_type=Any,
)
def create_agent_server(
provider: str = DEFAULT_PROVIDER,
model_id: str = DEFAULT_MODEL_ID,
base_url: Optional[str] = DEFAULT_LLM_BASE_URL,
api_key: Optional[str] = DEFAULT_LLM_API_KEY,
mcp_url: str = DEFAULT_MCP_URL,
mcp_config: str = DEFAULT_MCP_CONFIG,
custom_skills_directory: Optional[str] = DEFAULT_CUSTOM_SKILLS_DIRECTORY,
debug: Optional[bool] = DEFAULT_DEBUG,
host: Optional[str] = DEFAULT_HOST,
port: Optional[int] = DEFAULT_PORT,
enable_web_ui: bool = DEFAULT_ENABLE_WEB_UI,
ssl_verify: bool = DEFAULT_SSL_VERIFY,
):
print(
f"Starting {AGENT_NAME}:"
f"\tprovider={provider}"
f"\tmodel={model_id}"
f"\tbase_url={base_url}"
f"\tmcp={mcp_url} | {mcp_config}"
f"\tssl_verify={ssl_verify}"
)
agent = create_agent(
provider=provider,
model_id=model_id,
base_url=base_url,
api_key=api_key,
mcp_url=mcp_url,
mcp_config=mcp_config,
custom_skills_directory=custom_skills_directory,
ssl_verify=ssl_verify,
)
# Always load default skills
skills = load_skills_from_directory(get_skills_path())
logger.info(f"Loaded {len(skills)} default skills from {get_skills_path()}")
# Load custom skills if provided
if custom_skills_directory and os.path.exists(custom_skills_directory):
custom_skills = load_skills_from_directory(custom_skills_directory)
skills.extend(custom_skills)
logger.info(
f"Loaded {len(custom_skills)} custom skills from {custom_skills_directory}"
)
if not skills:
skills = [
Skill(
id="archivebox_agent",
name="ArchiveBox Agent",
description="General access to ArchiveBox tools",
tags=["archivebox"],
input_modes=["text"],
output_modes=["text"],
)
]
a2a_app = agent.to_a2a(
name=AGENT_NAME,
description=AGENT_DESCRIPTION,
version=__version__,
skills=skills,
debug=debug,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
if hasattr(a2a_app, "router") and hasattr(a2a_app.router, "lifespan_context"):
async with a2a_app.router.lifespan_context(a2a_app):
yield
else:
yield
app = FastAPI(
title=f"{AGENT_NAME} - A2A + AG-UI Server",
description=AGENT_DESCRIPTION,
debug=debug,
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
return {"status": "OK"}
app.mount("/a2a", a2a_app)
@app.post("/ag-ui")
async def ag_ui_endpoint(request: Request) -> Response:
accept = request.headers.get("accept", SSE_CONTENT_TYPE)
try:
run_input = AGUIAdapter.build_run_input(await request.body())
except ValidationError as e:
return Response(
content=json.dumps(e.json()),
media_type="application/json",
status_code=422,
)
if hasattr(run_input, "messages"):
run_input.messages = prune_large_messages(run_input.messages)
adapter = AGUIAdapter(agent=agent, run_input=run_input, accept=accept)
event_stream = adapter.run_stream()
sse_stream = adapter.encode_stream(event_stream)
return StreamingResponse(
sse_stream,
media_type=accept,
)
if enable_web_ui:
web_ui = agent.to_web(instructions=AGENT_SYSTEM_PROMPT)
app.mount("/", web_ui)
logger.info(
"Starting server on %s:%s (A2A at /a2a, AG-UI at /ag-ui, Web UI: %s)",
host,
port,
"Enabled at /" if enable_web_ui else "Disabled",
)
uvicorn.run(
app,
host=host,
port=port,
timeout_keep_alive=1800,
timeout_graceful_shutdown=60,
log_level="debug" if debug else "info",
)
def agent_server():
print(f"archivebox_agent v{__version__}")
parser = argparse.ArgumentParser(
add_help=False, description=f"Run the {AGENT_NAME} A2A + AG-UI Server"
)
parser.add_argument(
"--host", default=DEFAULT_HOST, help="Host to bind the server to"
)
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Port to bind the server to"
)
parser.add_argument("--debug", type=bool, default=DEFAULT_DEBUG, help="Debug mode")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload")
parser.add_argument(
"--provider",
default=DEFAULT_PROVIDER,
choices=["openai", "anthropic", "google", "huggingface"],
help="LLM Provider",
)
parser.add_argument("--model-id", default=DEFAULT_MODEL_ID, help="LLM Model ID")
parser.add_argument(
"--base-url",
default=DEFAULT_LLM_BASE_URL,
help="LLM Base URL (for OpenAI compatible providers)",
)
parser.add_argument("--api-key", default=DEFAULT_LLM_API_KEY, help="LLM API Key")
parser.add_argument("--mcp-url", default=DEFAULT_MCP_URL, help="MCP Server URL")
parser.add_argument(
"--mcp-config", default=DEFAULT_MCP_CONFIG, help="MCP Server Config"
)
parser.add_argument(
"--custom-skills-directory",
default=DEFAULT_CUSTOM_SKILLS_DIRECTORY,
help="Directory containing additional custom agent skills",
)
parser.add_argument(
"--web",
action="store_true",
default=DEFAULT_ENABLE_WEB_UI,
help="Enable Pydantic AI Web UI",
)
parser.add_argument(
"--insecure",
action="store_true",
help="Disable SSL verification for LLM requests (Use with caution)",
)
parser.add_argument("--help", action="store_true", help="Show usage")
args = parser.parse_args()
if hasattr(args, "help") and args.help:
parser.print_help()
sys.exit(0)
if args.debug:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
force=True,
)
logging.getLogger("pydantic_ai").setLevel(logging.DEBUG)
logging.getLogger("fastmcp").setLevel(logging.DEBUG)
logging.getLogger("httpcore").setLevel(logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Debug mode enabled")
create_agent_server(
provider=args.provider,
model_id=args.model_id,
base_url=args.base_url,
api_key=args.api_key,
mcp_url=args.mcp_url,
mcp_config=args.mcp_config,
custom_skills_directory=args.custom_skills_directory,
debug=args.debug,
host=args.host,
port=args.port,
enable_web_ui=args.web,
ssl_verify=not args.insecure,
)
if __name__ == "__main__":
agent_server()