#!/usr/bin/python
# coding: utf-8
import os
import argparse
import sys
import logging
from typing import Optional, List, Dict, Union
import requests
from fastmcp.exceptions import ResourceError
from pydantic import Field
from eunomia_mcp.middleware import EunomiaMcpMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from fastmcp import FastMCP, Context
from fastmcp.server.auth.oidc_proxy import OIDCProxy
from fastmcp.server.auth import OAuthProxy, RemoteAuthProvider
from fastmcp.server.auth.providers.jwt import JWTVerifier, StaticTokenVerifier
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fastmcp.utilities.logging import get_logger
from archivebox_api.archivebox_api import Api
from archivebox_api.utils import to_boolean, to_integer
from archivebox_api.middlewares import UserTokenMiddleware, JWTClaimsLoggingMiddleware
__version__ = "0.0.20"
logger = get_logger(name="TokenMiddleware")
logger.setLevel(logging.DEBUG)
config = {
"enable_delegation": to_boolean(os.environ.get("ENABLE_DELEGATION", "False")),
"audience": os.environ.get("AUDIENCE", None),
"delegated_scopes": os.environ.get("DELEGATED_SCOPES", "api"),
"token_endpoint": None, # Will be fetched dynamically from OIDC config
"oidc_client_id": os.environ.get("OIDC_CLIENT_ID", None),
"oidc_client_secret": os.environ.get("OIDC_CLIENT_SECRET", None),
"oidc_config_url": os.environ.get("OIDC_CONFIG_URL", None),
"jwt_jwks_uri": os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI", None),
"jwt_issuer": os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER", None),
"jwt_audience": os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE", None),
"jwt_algorithm": os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM", None),
"jwt_secret": os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY", None),
"jwt_required_scopes": os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES", None),
}
DEFAULT_TRANSPORT = os.getenv("TRANSPORT", "stdio")
DEFAULT_HOST = os.getenv("HOST", "0.0.0.0")
DEFAULT_PORT = to_integer(string=os.getenv("PORT", "8000"))
def register_tools(mcp: FastMCP):
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> JSONResponse:
return JSONResponse({"status": "OK"})
# Authentication Tools
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"authentication"},
)
async def get_api_token(
username: Optional[str] = Field(
description="The username for authentication",
),
password: Optional[str] = Field(
description="The password for authentication",
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance (e.g., https://yourinstance.archivebox.com)",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Generate an API token for a given username & password.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_api_token(username=username, password=password)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"authentication"},
)
async def check_api_token(
token: str = Field(
description="The API token to validate",
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance (e.g., https://yourinstance.archivebox.com)",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token_param: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Validate an API token to make sure it's valid and non-expired.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token_param,
api_key=api_key,
verify=verify,
)
response = client.check_api_token(token=token)
return response.json()
# Core Model Tools
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"core"},
)
async def get_snapshots(
id: Optional[str] = Field(None, description="Filter by snapshot ID"),
abid: Optional[str] = Field(None, description="Filter by snapshot abid"),
created_by_id: Optional[str] = Field(None, description="Filter by creator ID"),
created_by_username: Optional[str] = Field(
None, description="Filter by creator username"
),
created_at__gte: Optional[str] = Field(
None, description="Filter by creation date >= (ISO 8601)"
),
created_at__lt: Optional[str] = Field(
None, description="Filter by creation date < (ISO 8601)"
),
created_at: Optional[str] = Field(
None, description="Filter by exact creation date (ISO 8601)"
),
modified_at: Optional[str] = Field(
None, description="Filter by exact modification date (ISO 8601)"
),
modified_at__gte: Optional[str] = Field(
None, description="Filter by modification date >= (ISO 8601)"
),
modified_at__lt: Optional[str] = Field(
None, description="Filter by modification date < (ISO 8601)"
),
search: Optional[str] = Field(
None, description="Search across url, title, tags, id, abid, timestamp"
),
url: Optional[str] = Field(None, description="Filter by URL (exact)"),
tag: Optional[str] = Field(None, description="Filter by tag name (exact)"),
title: Optional[str] = Field(None, description="Filter by title (icontains)"),
timestamp: Optional[str] = Field(
None, description="Filter by timestamp (startswith)"
),
bookmarked_at__gte: Optional[str] = Field(
None, description="Filter by bookmark date >= (ISO 8601)"
),
bookmarked_at__lt: Optional[str] = Field(
None, description="Filter by bookmark date < (ISO 8601)"
),
with_archiveresults: bool = Field(
False, description="Include archiveresults in response"
),
limit: int = Field(10, description="Number of results to return"),
offset: int = Field(0, description="Offset for pagination"),
page: int = Field(0, description="Page number for pagination"),
api_key_param: Optional[str] = Field(
None, description="API key for QueryParamTokenAuth"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Retrieve list of snapshots.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_snapshots(
id=id,
abid=abid,
created_by_id=created_by_id,
created_by_username=created_by_username,
created_at__gte=created_at__gte,
created_at__lt=created_at__lt,
created_at=created_at,
modified_at=modified_at,
modified_at__gte=modified_at__gte,
modified_at__lt=modified_at__lt,
search=search,
url=url,
tag=tag,
title=title,
timestamp=timestamp,
bookmarked_at__gte=bookmarked_at__gte,
bookmarked_at__lt=bookmarked_at__lt,
with_archiveresults=with_archiveresults,
limit=limit,
offset=offset,
page=page,
api_key=api_key_param,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"core"},
)
async def get_snapshot(
snapshot_id: str = Field(
description="The ID or abid of the snapshot",
),
with_archiveresults: bool = Field(
True, description="Whether to include archiveresults"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Get a specific Snapshot by abid or id.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_snapshot(
snapshot_id=snapshot_id,
with_archiveresults=with_archiveresults,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"core"},
)
async def get_archiveresults(
id: Optional[str] = Field(None, description="Filter by ID"),
search: Optional[str] = Field(
None,
description="Search across snapshot url, title, tags, extractor, output, id",
),
snapshot_id: Optional[str] = Field(None, description="Filter by snapshot ID"),
snapshot_url: Optional[str] = Field(None, description="Filter by snapshot URL"),
snapshot_tag: Optional[str] = Field(None, description="Filter by snapshot tag"),
status: Optional[str] = Field(None, description="Filter by status"),
output: Optional[str] = Field(None, description="Filter by output"),
extractor: Optional[str] = Field(None, description="Filter by extractor"),
cmd: Optional[str] = Field(None, description="Filter by command"),
pwd: Optional[str] = Field(None, description="Filter by working directory"),
cmd_version: Optional[str] = Field(
None, description="Filter by command version"
),
created_at: Optional[str] = Field(
None, description="Filter by exact creation date (ISO 8601)"
),
created_at__gte: Optional[str] = Field(
None, description="Filter by creation date >= (ISO 8601)"
),
created_at__lt: Optional[str] = Field(
None, description="Filter by creation date < (ISO 8601)"
),
limit: int = Field(10, description="Number of results to return"),
offset: int = Field(0, description="Offset for pagination"),
page: int = Field(0, description="Page number for pagination"),
api_key_param: Optional[str] = Field(
None, description="API key for QueryParamTokenAuth"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
List all ArchiveResult entries matching these filters.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_archiveresults(
id=id,
search=search,
snapshot_id=snapshot_id,
snapshot_url=snapshot_url,
snapshot_tag=snapshot_tag,
status=status,
output=output,
extractor=extractor,
cmd=cmd,
pwd=pwd,
cmd_version=cmd_version,
created_at=created_at,
created_at__gte=created_at__gte,
created_at__lt=created_at__lt,
limit=limit,
offset=offset,
page=page,
api_key=api_key_param,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"core"},
)
async def get_tag(
tag_id: str = Field(
description="The ID or abid of the tag",
),
with_snapshots: bool = Field(True, description="Whether to include snapshots"),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Get a specific Tag by id or abid.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_tag(
tag_id=tag_id,
with_snapshots=with_snapshots,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"core"},
)
async def get_any(
abid: str = Field(
description="The abid of the Snapshot, ArchiveResult, or Tag",
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Get a specific Snapshot, ArchiveResult, or Tag by abid.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.get_any(abid=abid)
return response.json()
# CLI Tools
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"cli"},
)
async def cli_add(
urls: List[str] = Field(
description="List of URLs to archive",
),
tag: str = Field("", description="Comma-separated tags"),
depth: int = Field(0, description="Crawl depth"),
update: bool = Field(False, description="Update existing snapshots"),
update_all: bool = Field(False, description="Update all snapshots"),
index_only: bool = Field(False, description="Index without archiving"),
overwrite: bool = Field(False, description="Overwrite existing files"),
init: bool = Field(False, description="Initialize collection if needed"),
extractors: str = Field(
"", description="Comma-separated list of extractors to use"
),
parser: str = Field("auto", description="Parser type"),
extra_data: Optional[Dict] = Field(
None, description="Additional parameters as a dictionary"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
ctx: Context = None,
) -> dict:
"""
Execute archivebox add command.
"""
if ctx:
message = f"Are you sure you want to ADD {len(urls)} URLs to ArchiveBox?"
result = await ctx.elicit(message, response_type=bool)
if result.action != "accept" or not result.data:
return {
"status": "cancelled",
"message": "Operation cancelled by user.",
}
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.cli_add(
urls=urls,
tag=tag,
depth=depth,
update=update,
update_all=update_all,
index_only=index_only,
overwrite=overwrite,
init=init,
extractors=extractors,
parser=parser,
extra_data=extra_data,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"cli"},
)
async def cli_update(
resume: Optional[float] = Field(0, description="Resume from timestamp"),
only_new: bool = Field(True, description="Update only new snapshots"),
index_only: bool = Field(False, description="Index without archiving"),
overwrite: bool = Field(False, description="Overwrite existing files"),
after: Optional[float] = Field(
0, description="Filter snapshots after timestamp"
),
before: Optional[float] = Field(
999999999999999, description="Filter snapshots before timestamp"
),
status: Optional[str] = Field("unarchived", description="Filter by status"),
filter_type: Optional[str] = Field("substring", description="Filter type"),
filter_patterns: Optional[List[str]] = Field(
None, description="List of filter patterns"
),
extractors: Optional[str] = Field(
"", description="Comma-separated list of extractors"
),
extra_data: Optional[Dict] = Field(
None, description="Additional parameters as a dictionary"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
ctx: Context = None,
) -> dict:
"""
Execute archivebox update command.
"""
if ctx:
message = "Are you sure you want to UPDATE snapshots?"
result = await ctx.elicit(message, response_type=bool)
if result.action != "accept" or not result.data:
return {
"status": "cancelled",
"message": "Operation cancelled by user.",
}
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.cli_update(
resume=resume,
only_new=only_new,
index_only=index_only,
overwrite=overwrite,
after=after,
before=before,
status=status,
filter_type=filter_type,
filter_patterns=filter_patterns,
extractors=extractors,
extra_data=extra_data,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"cli"},
)
async def cli_schedule(
import_path: Optional[str] = Field(None, description="Path to import file"),
add: bool = Field(False, description="Enable adding new URLs"),
every: Optional[str] = Field(
None, description="Schedule frequency (e.g., 'daily')"
),
tag: str = Field("", description="Comma-separated tags"),
depth: int = Field(0, description="Crawl depth"),
overwrite: bool = Field(False, description="Overwrite existing files"),
update: bool = Field(False, description="Update existing snapshots"),
clear: bool = Field(False, description="Clear existing schedules"),
extra_data: Optional[Dict] = Field(
None, description="Additional parameters as a dictionary"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Execute archivebox schedule command.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.cli_schedule(
import_path=import_path,
add=add,
every=every,
tag=tag,
depth=depth,
overwrite=overwrite,
update=update,
clear=clear,
extra_data=extra_data,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"cli"},
)
async def cli_list(
filter_patterns: Optional[List[str]] = Field(
None, description="List of filter patterns"
),
filter_type: str = Field("substring", description="Filter type"),
status: Optional[str] = Field("indexed", description="Filter by status"),
after: Optional[float] = Field(
0, description="Filter snapshots after timestamp"
),
before: Optional[float] = Field(
999999999999999, description="Filter snapshots before timestamp"
),
sort: str = Field("bookmarked_at", description="Sort field"),
as_json: bool = Field(True, description="Output as JSON"),
as_html: bool = Field(False, description="Output as HTML"),
as_csv: Union[str, bool] = Field(
"timestamp,url", description="Output as CSV or fields to include"
),
with_headers: bool = Field(False, description="Include headers in output"),
extra_data: Optional[Dict] = Field(
None, description="Additional parameters as a dictionary"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
) -> dict:
"""
Execute archivebox list command.
"""
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.cli_list(
filter_patterns=filter_patterns,
filter_type=filter_type,
status=status,
after=after,
before=before,
sort=sort,
as_json=as_json,
as_html=as_html,
as_csv=as_csv,
with_headers=with_headers,
extra_data=extra_data,
)
return response.json()
@mcp.tool(
exclude_args=[
"archivebox_url",
"username",
"password",
"token",
"api_key",
"verify",
],
tags={"cli"},
)
async def cli_remove(
delete: bool = Field(True, description="Delete matching snapshots"),
after: Optional[float] = Field(
0, description="Filter snapshots after timestamp"
),
before: Optional[float] = Field(
999999999999999, description="Filter snapshots before timestamp"
),
filter_type: str = Field("exact", description="Filter type"),
filter_patterns: Optional[List[str]] = Field(
None, description="List of filter patterns"
),
extra_data: Optional[Dict] = Field(
None, description="Additional parameters as a dictionary"
),
archivebox_url: str = Field(
default=os.environ.get("ARCHIVEBOX_URL", None),
description="The URL of the ArchiveBox instance",
),
username: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_USERNAME", None),
description="Username for authentication",
),
password: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_PASSWORD", None),
description="Password for authentication",
),
token: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_TOKEN", None),
description="Bearer token for authentication",
),
api_key: Optional[str] = Field(
default=os.environ.get("ARCHIVEBOX_API_KEY", None),
description="API key for authentication",
),
verify: Optional[bool] = Field(
default=to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
description="Whether to verify SSL certificates",
),
ctx: Context = None,
) -> dict:
"""
Execute archivebox remove command.
"""
if ctx:
message = "Are you sure you want to REMOVE matching snapshots?"
result = await ctx.elicit(message, response_type=bool)
if result.action != "accept" or not result.data:
return {
"status": "cancelled",
"message": "Operation cancelled by user.",
}
client = Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
response = client.cli_remove(
delete=delete,
after=after,
before=before,
filter_type=filter_type,
filter_patterns=filter_patterns,
extra_data=extra_data,
)
return response.json()
def register_resources(mcp: FastMCP):
@mcp.resource("data://instance_config")
async def get_instance_config() -> dict:
"""
Provides the current ArchiveBox instance configuration.
"""
return {
"url": os.environ.get("ARCHIVEBOX_URL"),
"verify": to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True")),
}
def register_prompts(mcp: FastMCP):
# Prompts
@mcp.prompt
def add_url_prompt(
url: str,
tag: str = "",
depth: int = 0,
extractors: str = "",
) -> str:
"""
Generates a prompt for adding a new URL to ArchiveBox.
"""
return f"Add the URL '{url}' to ArchiveBox. Tags: '{tag}', Depth: {depth}, Extractors: '{extractors}'. Use the `cli_add` tool."
@mcp.prompt
def search_snapshots_prompt(
query: str,
limit: int = 10,
tag: str = "",
) -> str:
"""
Generates a prompt for searching snapshots in ArchiveBox.
"""
return f"Search for snapshots matching '{query}' in ArchiveBox. Limit: {limit}, Tag: '{tag}'. Use the `get_snapshots` tool with the `search` parameter."
@mcp.prompt
def get_snapshot_details_prompt(
id: str,
) -> str:
"""
Generates a prompt for retrieving details of a specific snapshot.
"""
return f"Get details for the snapshot with ID '{id}'. Use the `get_snapshot` tool (or `get_any` if unsure of the ID type)."
@mcp.prompt
def list_recent_snapshots_prompt(
limit: int = 10,
) -> str:
"""
Generates a prompt for listing the most recent snapshots.
"""
return f"List the {limit} most recently created snapshots. Use the `get_snapshots` tool sorted by creation date."
@mcp.prompt
def update_snapshots_prompt(
filter_patterns: str = "",
only_new: bool = True,
) -> str:
"""
Generates a prompt for updating existing snapshots.
"""
return f"Update existing snapshots. Filter patterns: '{filter_patterns}', Only new: {only_new}. Use the `cli_update` tool."
@mcp.prompt
def schedule_archiving_prompt(
url: str = "",
tag: str = "",
every: str = "day",
) -> str:
"""
Generates a prompt for scheduling a recurring archiving job.
"""
return f"Schedule a recurring archiving job every '{every}'. URL: '{url}', Tag: '{tag}'. Use the `cli_schedule` tool."
@mcp.prompt
def remove_snapshots_prompt(
filter_patterns: str,
before: float = 0,
) -> str:
"""
Generates a prompt for removing snapshots.
"""
return f"Remove snapshots matching patterns '{filter_patterns}'. Before timestamp: {before}. Use the `cli_remove` tool."
@mcp.prompt
def list_archiveresults_prompt(
snapshot_id: str = "",
status: str = "",
extractor: str = "",
) -> str:
"""
Generates a prompt for listing specific archive results.
"""
return f"List archive results. Snapshot ID: '{snapshot_id}', Status: '{status}', Extractor: '{extractor}'. Use the `get_archiveresults` tool."
@mcp.prompt
def get_tag_details_prompt(
tag_id: str,
) -> str:
"""
Generates a prompt for getting details about a tag.
"""
return f"Get details for the tag with ID '{tag_id}'. Use the `get_tag` tool."
def get_archivebox_client() -> Api:
"""
Creates and returns an ArchiveBox API client using environment variables.
"""
archivebox_url = os.environ.get("ARCHIVEBOX_URL")
username = os.environ.get("ARCHIVEBOX_USERNAME")
password = os.environ.get("ARCHIVEBOX_PASSWORD")
token = os.environ.get("ARCHIVEBOX_TOKEN")
api_key = os.environ.get("ARCHIVEBOX_API_KEY")
verify = to_boolean(os.environ.get("ARCHIVEBOX_VERIFY", "True"))
if not archivebox_url:
raise ResourceError("ArchiveBox URL not configured")
return Api(
url=archivebox_url,
username=username,
password=password,
token=token,
api_key=api_key,
verify=verify,
)
def archivebox_mcp():
print(f"archivebox_mcp v{__version__}")
parser = argparse.ArgumentParser(description="ArchiveBox MCP Runner")
parser.add_argument(
"-t",
"--transport",
default=DEFAULT_TRANSPORT,
choices=["stdio", "streamable-http", "sse"],
help="Transport method: 'stdio', 'streamable-http', or 'sse' [legacy] (default: stdio)",
)
parser.add_argument(
"-s",
"--host",
default=DEFAULT_HOST,
help="Host address for HTTP transport (default: 0.0.0.0)",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=DEFAULT_PORT,
help="Port number for HTTP transport (default: 8000)",
)
parser.add_argument(
"--auth-type",
default="none",
choices=["none", "static", "jwt", "oauth-proxy", "oidc-proxy", "remote-oauth"],
help="Authentication type for MCP server: 'none' (disabled), 'static' (internal), 'jwt' (external token verification), 'oauth-proxy', 'oidc-proxy', 'remote-oauth' (external) (default: none)",
)
# JWT/Token params
parser.add_argument(
"--token-jwks-uri", default=None, help="JWKS URI for JWT verification"
)
parser.add_argument(
"--token-issuer", default=None, help="Issuer for JWT verification"
)
parser.add_argument(
"--token-audience", default=None, help="Audience for JWT verification"
)
parser.add_argument(
"--token-algorithm",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM"),
choices=[
"HS256",
"HS384",
"HS512",
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512",
],
help="JWT signing algorithm (required for HMAC or static key). Auto-detected for JWKS.",
)
parser.add_argument(
"--token-secret",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"),
help="Shared secret for HMAC (HS*) or PEM public key for static asymmetric verification.",
)
parser.add_argument(
"--token-public-key",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"),
help="Path to PEM public key file or inline PEM string (for static asymmetric keys).",
)
parser.add_argument(
"--required-scopes",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES"),
help="Comma-separated list of required scopes (e.g., archivebox.read,archivebox.write).",
)
# OAuth Proxy params
parser.add_argument(
"--oauth-upstream-auth-endpoint",
default=None,
help="Upstream authorization endpoint for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-token-endpoint",
default=None,
help="Upstream token endpoint for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-client-id",
default=None,
help="Upstream client ID for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-client-secret",
default=None,
help="Upstream client secret for OAuth Proxy",
)
parser.add_argument(
"--oauth-base-url", default=None, help="Base URL for OAuth Proxy"
)
# OIDC Proxy params
parser.add_argument(
"--oidc-config-url", default=None, help="OIDC configuration URL"
)
parser.add_argument("--oidc-client-id", default=None, help="OIDC client ID")
parser.add_argument("--oidc-client-secret", default=None, help="OIDC client secret")
parser.add_argument("--oidc-base-url", default=None, help="Base URL for OIDC Proxy")
# Remote OAuth params
parser.add_argument(
"--remote-auth-servers",
default=None,
help="Comma-separated list of authorization servers for Remote OAuth",
)
parser.add_argument(
"--remote-base-url", default=None, help="Base URL for Remote OAuth"
)
# Common
parser.add_argument(
"--allowed-client-redirect-uris",
default=None,
help="Comma-separated list of allowed client redirect URIs",
)
# Eunomia params
parser.add_argument(
"--eunomia-type",
default="none",
choices=["none", "embedded", "remote"],
help="Eunomia authorization type: 'none' (disabled), 'embedded' (built-in), 'remote' (external) (default: none)",
)
parser.add_argument(
"--eunomia-policy-file",
default="mcp_policies.json",
help="Policy file for embedded Eunomia (default: mcp_policies.json)",
)
parser.add_argument(
"--eunomia-remote-url", default=None, help="URL for remote Eunomia server"
)
# Delegation params
parser.add_argument(
"--enable-delegation",
action="store_true",
default=to_boolean(os.environ.get("ENABLE_DELEGATION", "False")),
help="Enable OIDC token delegation",
)
parser.add_argument(
"--audience",
default=os.environ.get("AUDIENCE", None),
help="Audience for the delegated token",
)
parser.add_argument(
"--delegated-scopes",
default=os.environ.get("DELEGATED_SCOPES", "api"),
help="Scopes for the delegated token (space-separated)",
)
parser.add_argument(
"--openapi-file",
default=None,
help="Path to the OpenAPI JSON file to import additional tools from",
)
parser.add_argument(
"--openapi-base-url",
default=None,
help="Base URL for the OpenAPI client (overrides instance URL)",
)
parser.add_argument(
"--openapi-use-token",
action="store_true",
help="Use the incoming Bearer token (from MCP request) to authenticate OpenAPI import",
)
parser.add_argument(
"--openapi-username",
default=os.getenv("OPENAPI_USERNAME"),
help="Username for basic auth during OpenAPI import",
)
parser.add_argument(
"--openapi-password",
default=os.getenv("OPENAPI_PASSWORD"),
help="Password for basic auth during OpenAPI import",
)
parser.add_argument(
"--openapi-client-id",
default=os.getenv("OPENAPI_CLIENT_ID"),
help="OAuth client ID for OpenAPI import",
)
parser.add_argument(
"--openapi-client-secret",
default=os.getenv("OPENAPI_CLIENT_SECRET"),
help="OAuth client secret for OpenAPI import",
)
args = parser.parse_args()
if args.port < 0 or args.port > 65535:
print(f"Error: Port {args.port} is out of valid range (0-65535).")
sys.exit(1)
# Update config with CLI arguments
config["enable_delegation"] = args.enable_delegation
config["audience"] = args.audience or config["audience"]
config["delegated_scopes"] = args.delegated_scopes or config["delegated_scopes"]
config["oidc_config_url"] = args.oidc_config_url or config["oidc_config_url"]
config["oidc_client_id"] = args.oidc_client_id or config["oidc_client_id"]
config["oidc_client_secret"] = (
args.oidc_client_secret or config["oidc_client_secret"]
)
# Configure delegation if enabled
if config["enable_delegation"]:
if args.auth_type != "oidc-proxy":
logger.error("Token delegation requires auth-type=oidc-proxy")
sys.exit(1)
if not config["audience"]:
logger.error("audience is required for delegation")
sys.exit(1)
if not all(
[
config["oidc_config_url"],
config["oidc_client_id"],
config["oidc_client_secret"],
]
):
logger.error(
"Delegation requires complete OIDC configuration (oidc-config-url, oidc-client-id, oidc-client-secret)"
)
sys.exit(1)
# Fetch OIDC configuration to get token_endpoint
try:
logger.info(
"Fetching OIDC configuration",
extra={"oidc_config_url": config["oidc_config_url"]},
)
oidc_config_resp = requests.get(config["oidc_config_url"])
oidc_config_resp.raise_for_status()
oidc_config = oidc_config_resp.json()
config["token_endpoint"] = oidc_config.get("token_endpoint")
if not config["token_endpoint"]:
logger.error("No token_endpoint found in OIDC configuration")
raise ValueError("No token_endpoint found in OIDC configuration")
logger.info(
"OIDC configuration fetched successfully",
extra={"token_endpoint": config["token_endpoint"]},
)
except Exception as e:
print(f"Failed to fetch OIDC configuration: {e}")
logger.error(
"Failed to fetch OIDC configuration",
extra={"error_type": type(e).__name__, "error_message": str(e)},
)
sys.exit(1)
# Set auth based on type
auth = None
allowed_uris = (
args.allowed_client_redirect_uris.split(",")
if args.allowed_client_redirect_uris
else None
)
if args.auth_type == "none":
auth = None
elif args.auth_type == "static":
auth = StaticTokenVerifier(
tokens={
"test-token": {"client_id": "test-user", "scopes": ["read", "write"]},
"admin-token": {"client_id": "admin", "scopes": ["admin"]},
}
)
elif args.auth_type == "jwt":
# Fallback to env vars if not provided via CLI
jwks_uri = args.token_jwks_uri or os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI")
issuer = args.token_issuer or os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER")
audience = args.token_audience or os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE")
algorithm = args.token_algorithm
secret_or_key = args.token_secret or args.token_public_key
public_key_pem = None
if not (jwks_uri or secret_or_key):
logger.error(
"JWT auth requires either --token-jwks-uri or --token-secret/--token-public-key"
)
sys.exit(1)
if not (issuer and audience):
logger.error("JWT requires --token-issuer and --token-audience")
sys.exit(1)
# Load static public key from file if path is given
if args.token_public_key and os.path.isfile(args.token_public_key):
try:
with open(args.token_public_key, "r") as f:
public_key_pem = f.read()
logger.info(f"Loaded static public key from {args.token_public_key}")
except Exception as e:
print(f"Failed to read public key file: {e}")
logger.error(f"Failed to read public key file: {e}")
sys.exit(1)
elif args.token_public_key:
public_key_pem = args.token_public_key # Inline PEM
# Validation: Conflicting options
if jwks_uri and (algorithm or secret_or_key):
logger.warning(
"JWKS mode ignores --token-algorithm and --token-secret/--token-public-key"
)
# HMAC mode
if algorithm and algorithm.startswith("HS"):
if not secret_or_key:
logger.error(f"HMAC algorithm {algorithm} requires --token-secret")
sys.exit(1)
if jwks_uri:
logger.error("Cannot use --token-jwks-uri with HMAC")
sys.exit(1)
public_key = secret_or_key
else:
public_key = public_key_pem
# Required scopes
required_scopes = None
if args.required_scopes:
required_scopes = [
s.strip() for s in args.required_scopes.split(",") if s.strip()
]
try:
auth = JWTVerifier(
jwks_uri=jwks_uri,
public_key=public_key,
issuer=issuer,
audience=audience,
algorithm=(
algorithm if algorithm and algorithm.startswith("HS") else None
),
required_scopes=required_scopes,
)
logger.info(
"JWTVerifier configured",
extra={
"mode": (
"JWKS"
if jwks_uri
else (
"HMAC"
if algorithm and algorithm.startswith("HS")
else "Static Key"
)
),
"algorithm": algorithm,
"required_scopes": required_scopes,
},
)
except Exception as e:
print(f"Failed to initialize JWTVerifier: {e}")
logger.error(f"Failed to initialize JWTVerifier: {e}")
sys.exit(1)
elif args.auth_type == "oauth-proxy":
if not (
args.oauth_upstream_auth_endpoint
and args.oauth_upstream_token_endpoint
and args.oauth_upstream_client_id
and args.oauth_upstream_client_secret
and args.oauth_base_url
and args.token_jwks_uri
and args.token_issuer
and args.token_audience
):
print(
"oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, "
"oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, "
"token-issuer, token-audience"
)
logger.error(
"oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, "
"oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, "
"token-issuer, token-audience",
extra={
"auth_endpoint": args.oauth_upstream_auth_endpoint,
"token_endpoint": args.oauth_upstream_token_endpoint,
"client_id": args.oauth_upstream_client_id,
"base_url": args.oauth_base_url,
"jwks_uri": args.token_jwks_uri,
"issuer": args.token_issuer,
"audience": args.token_audience,
},
)
sys.exit(1)
token_verifier = JWTVerifier(
jwks_uri=args.token_jwks_uri,
issuer=args.token_issuer,
audience=args.token_audience,
)
auth = OAuthProxy(
upstream_authorization_endpoint=args.oauth_upstream_auth_endpoint,
upstream_token_endpoint=args.oauth_upstream_token_endpoint,
upstream_client_id=args.oauth_upstream_client_id,
upstream_client_secret=args.oauth_upstream_client_secret,
token_verifier=token_verifier,
base_url=args.oauth_base_url,
allowed_client_redirect_uris=allowed_uris,
)
elif args.auth_type == "oidc-proxy":
if not (
args.oidc_config_url
and args.oidc_client_id
and args.oidc_client_secret
and args.oidc_base_url
):
logger.error(
"oidc-proxy requires oidc-config-url, oidc-client-id, oidc-client-secret, oidc-base-url",
extra={
"config_url": args.oidc_config_url,
"client_id": args.oidc_client_id,
"base_url": args.oidc_base_url,
},
)
sys.exit(1)
auth = OIDCProxy(
config_url=args.oidc_config_url,
client_id=args.oidc_client_id,
client_secret=args.oidc_client_secret,
base_url=args.oidc_base_url,
allowed_client_redirect_uris=allowed_uris,
)
elif args.auth_type == "remote-oauth":
if not (
args.remote_auth_servers
and args.remote_base_url
and args.token_jwks_uri
and args.token_issuer
and args.token_audience
):
logger.error(
"remote-oauth requires remote-auth-servers, remote-base-url, token-jwks-uri, token-issuer, token-audience",
extra={
"auth_servers": args.remote_auth_servers,
"base_url": args.remote_base_url,
"jwks_uri": args.token_jwks_uri,
"issuer": args.token_issuer,
"audience": args.token_audience,
},
)
sys.exit(1)
auth_servers = [url.strip() for url in args.remote_auth_servers.split(",")]
token_verifier = JWTVerifier(
jwks_uri=args.token_jwks_uri,
issuer=args.token_issuer,
audience=args.token_audience,
)
auth = RemoteAuthProvider(
token_verifier=token_verifier,
authorization_servers=auth_servers,
base_url=args.remote_base_url,
)
# === 2. Build Middleware List ===
middlewares: List[
Union[
UserTokenMiddleware,
ErrorHandlingMiddleware,
RateLimitingMiddleware,
TimingMiddleware,
LoggingMiddleware,
JWTClaimsLoggingMiddleware,
EunomiaMcpMiddleware,
]
] = [
ErrorHandlingMiddleware(include_traceback=True, transform_errors=True),
RateLimitingMiddleware(max_requests_per_second=10.0, burst_capacity=20),
TimingMiddleware(),
LoggingMiddleware(),
JWTClaimsLoggingMiddleware(),
]
if config["enable_delegation"] or args.auth_type == "jwt":
middlewares.insert(0, UserTokenMiddleware(config=config)) # Must be first
if args.eunomia_type in ["embedded", "remote"]:
try:
from eunomia_mcp import create_eunomia_middleware
policy_file = args.eunomia_policy_file or "mcp_policies.json"
eunomia_endpoint = (
args.eunomia_remote_url if args.eunomia_type == "remote" else None
)
eunomia_mw = create_eunomia_middleware(
policy_file=policy_file, eunomia_endpoint=eunomia_endpoint
)
middlewares.append(eunomia_mw)
logger.info(f"Eunomia middleware enabled ({args.eunomia_type})")
except Exception as e:
print(f"Failed to load Eunomia middleware: {e}")
logger.error("Failed to load Eunomia middleware", extra={"error": str(e)})
sys.exit(1)
mcp = FastMCP("ArchiveBox", auth=auth)
register_tools(mcp)
register_prompts(mcp)
register_resources(mcp)
for mw in middlewares:
mcp.add_middleware(mw)
print(f"ArchiveBox MCP v{__version__}")
print("\nStarting ArchiveBox MCP Server")
print(f" Transport: {args.transport.upper()}")
print(f" Auth: {args.auth_type}")
print(f" Delegation: {'ON' if config['enable_delegation'] else 'OFF'}")
print(f" Eunomia: {args.eunomia_type}")
if args.transport == "stdio":
mcp.run(transport="stdio")
elif args.transport == "streamable-http":
mcp.run(transport="streamable-http", host=args.host, port=args.port)
elif args.transport == "sse":
mcp.run(transport="sse", host=args.host, port=args.port)
else:
logger.error("Invalid transport", extra={"transport": args.transport})
sys.exit(1)
if __name__ == "__main__":
archivebox_mcp()