import argparse
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional, Annotated
from dotenv import load_dotenv
from fastmcp import FastMCP, Context
from pydantic import Field
from .config import CloudflareConfig
from .cloudflare_client import CloudflareClient
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def format_zones_response(zones: List[Dict[str, Any]]) -> str:
"""Format zones response for human readability."""
if not zones:
return "No zones found."
lines = ["Available Zones:"]
for zone in zones:
name = str(zone.get("name", "Unknown"))
zone_id = str(zone.get("id", "Unknown"))
status = str(zone.get("status", "Unknown"))
lines.append(f"- {name} (ID: {zone_id}, Status: {status})")
return "\n".join(lines)
class AppContext:
def __init__(self, client: CloudflareClient) -> None:
self.client = client
@asynccontextmanager
async def lifespan(app: FastMCP) -> AsyncIterator[AppContext]:
config = CloudflareConfig()
client = CloudflareClient(config)
try:
yield AppContext(client)
finally:
try:
await client.close()
except Exception:
pass
mcp = FastMCP(name="Cloudflare", lifespan=lifespan)
def _ruleset_engine_root() -> Path:
# Allow override via environment variable for portability when packaged/deployed
env_root = os.getenv("CLOUDFLARE_DOCS_ROOT")
if env_root:
try:
p = Path(env_root)
return p
except Exception:
# Fallback to default on any parsing error
pass
return Path(__file__).parent / "ruleset-engine"
@mcp.resource("cloudflare://docs/{relative_path}")
async def cloudflare_docs_resource(relative_path: str) -> str:
"""Return full markdown content for a documentation resource under ruleset-engine.
URI format: cloudflare://docs/{relative_path}
Example: cloudflare://docs/managed-rulesets/override-managed-ruleset.md
"""
root = _ruleset_engine_root()
# Normalize and prevent path traversal
rel = Path(relative_path)
if rel.is_absolute() or ".." in rel.parts:
raise ValueError("Invalid relative_path")
full_path = root / rel
if not full_path.exists() or not full_path.is_file():
raise FileNotFoundError(f"Resource not found: {relative_path}")
if not str(full_path).endswith(".md"):
raise ValueError("Only markdown files are exposed as resources")
with open(full_path, "r", encoding="utf-8") as f:
return f.read()
def _enumerate_documentation_resources() -> List[Dict[str, Any]]:
"""Enumerate markdown files under ruleset-engine and return resource metadata."""
resources: List[Dict[str, Any]] = []
root = _ruleset_engine_root()
if not root.exists():
return resources
for dirpath, dirnames, filenames in os.walk(root):
for filename in filenames:
if not filename.endswith(".md"):
continue
file_path = Path(dirpath) / filename
relative_path = file_path.relative_to(root)
uri = f"cloudflare://docs/{relative_path.as_posix()}"
resource = {
"uri": uri,
"name": filename,
"relativePath": relative_path.as_posix(),
"title": f"Cloudflare {relative_path.stem.replace('-', ' ').title()}",
"description": f"Cloudflare documentation: {relative_path.as_posix()}",
"mimeType": "text/markdown",
}
resources.append(resource)
return resources
@mcp.resource("cloudflare://docs-catalog/{scope}")
async def cloudflare_docs_catalog(scope: str) -> str:
"""Return a JSON catalog of documentation resources.
scope: "all" for full list or a substring to filter URIs (e.g., "rulesets-api").
"""
resources = _enumerate_documentation_resources()
scope_l = (scope or "").lower()
if scope_l and scope_l != "all":
resources = [r for r in resources if scope_l in r.get("uri", "").lower()]
import json as _json
return _json.dumps({"resources": resources, "total": len(resources)})
# Note: We expose resources via a URI template (cloudflare://docs/{relative_path}).
# To have resources/list enumerate every file, use FastMCP's static resource registration API.
# Zone Management Tools
@mcp.tool()
async def cloudflare_list_document_names(
category: Annotated[Optional[str], Field(description="Optional substring to filter by path or name.")] = None,
ctx: Context = None,
) -> Dict[str, Any]:
"""
Fallback: List documentation names and relative paths for clients without MCP resources.
"""
try:
items = _enumerate_documentation_resources()
if category:
q = category.lower()
items = [r for r in items if q in r.get("relativePath", "").lower() or q in r.get("name", "").lower()]
names = [r.get("name", "") for r in items]
return {"names": names, "files": items, "total": len(items)}
except Exception as exc:
logger.exception("cloudflare_list_document_names failed")
return {"error": str(exc)}
@mcp.tool()
async def cloudflare_read_document(
relative_path: Annotated[str, Field(description="Relative path under ruleset-engine, e.g., 'rulesets-api/add-rule.md'.")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
Fallback: Read a specific documentation file by relative path and return its content.
"""
try:
root = _ruleset_engine_root()
rel = Path(relative_path)
if rel.is_absolute() or ".." in rel.parts:
return {"error": "Invalid relative_path"}
full_path = root / rel
if not full_path.exists() or not full_path.is_file():
return {"error": f"Not found: {relative_path}"}
if not str(full_path).endswith(".md"):
return {"error": "Only markdown files are supported"}
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
return {
"uri": f"cloudflare://docs/{rel.as_posix()}",
"relativePath": rel.as_posix(),
"name": full_path.name,
"mimeType": "text/markdown",
"text": content,
}
except Exception as exc:
logger.exception("cloudflare_read_document failed")
return {"error": str(exc)}
@mcp.tool()
async def cloudflare_list_zones(
account_id: Annotated[Optional[str], Field(description="Optional account ID to filter zones by specific account. If not provided, returns all zones the user has access to.")] = None,
ctx: Context = None,
) -> str:
"""
List all Cloudflare zones (domains) accessible to the authenticated user.
This method retrieves a list of all zones (domains) that the authenticated user has access to.
You can optionally filter zones by a specific account ID. This is particularly useful when:
- You want to see all zones across all accounts you have access to (default behavior)
- You want to see only zones belonging to a specific account (provide account_id)
The response includes zone names, IDs, status, and other metadata for each zone.
Args:
account_id: Optional account ID to filter zones by specific account.
If None, returns all zones the user has access to across all accounts.
If provided, returns only zones belonging to that specific account.
ctx: Injected request context (provided by FastMCP).
Returns:
str: Plain-text formatted list of zones with their names, IDs, and status.
Format: "Available Zones:\n- {zone_name} (ID: {zone_id}, Status: {status})"
Examples:
# List all zones across all accounts
cloudflare_list_zones()
# List zones for a specific account
cloudflare_list_zones(account_id="1234567890abcdef")
Notes:
- Requires appropriate API token permissions to access zone information
- Account ID can be obtained using cloudflare_list_accounts()
- Zone ID from this response can be used with other zone-specific operations
- Status values typically include: 'active', 'pending', 'initializing', 'moved', 'deleted'
Raises:
Exception: If API request fails, authentication issues, or insufficient permissions
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
# Normalize commonly mis-passed values from upstream tools/LLMs
normalized_account_id: Optional[str] = None
if account_id is not None:
_aid = str(account_id).strip()
if _aid and _aid != "0":
normalized_account_id = _aid
response = await client.list_zones(account_id=normalized_account_id)
zones = response.get("result", [])
return format_zones_response(zones)
except Exception as exc:
logger.exception("cloudflare_list_zones failed")
return f"Error: {exc}"
@mcp.tool()
async def cloudflare_get_zone_details(
zone_id: Annotated[str, Field(description="Zone ID to retrieve details for. Obtain this from cloudflare_list_zones().")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
Get detailed information about a specific Cloudflare zone (domain).
This method retrieves comprehensive details about a specific zone, including:
- Zone configuration settings
- DNS settings and records
- Security settings
- Performance settings
- Account information
- Zone status and metadata
Args:
zone_id: Zone ID to retrieve details for. This can be obtained from cloudflare_list_zones().
ctx: Injected request context (provided by FastMCP).
Returns:
dict: JSON response containing detailed zone information including:
- Zone name, ID, and status
- Account information
- DNS settings and configuration
- Security and performance settings
- Creation and modification timestamps
Examples:
# First list zones to get zone IDs
zones = cloudflare_list_zones()
# Then get details for a specific zone
zone_details = cloudflare_get_zone_details(zone_id="abc123def456")
Notes:
- Zone ID must be valid and accessible to the authenticated user
- Requires appropriate API token permissions to access zone details
- Zone ID can be obtained from cloudflare_list_zones() response
- This method provides comprehensive zone configuration information
Raises:
Exception: If zone ID is invalid, zone not found, or insufficient permissions
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
response = await client.get_zone_details(zone_id)
return response
except Exception as exc:
logger.exception("cloudflare_get_zone_details failed")
return {"error": str(exc)}
@mcp.tool()
async def cloudflare_list_accounts(ctx: Context = None) -> Dict[str, Any]:
"""
List all Cloudflare accounts accessible to the authenticated user.
This method retrieves a list of all accounts that the authenticated user has access to.
Each account represents an organization or billing entity in Cloudflare. Accounts contain
zones (domains) and other resources. This is particularly useful for:
- Understanding which accounts you have access to
- Getting account IDs for filtering zones by account
- Managing resources across multiple accounts
The response includes account details such as ID, name, and other metadata.
Args:
ctx: Injected request context (provided by FastMCP).
Returns:
dict: JSON response containing list of accounts with their details.
Each account includes: id, name, settings, and other metadata.
Examples:
# List all accessible accounts
accounts = cloudflare_list_accounts()
# Use account ID to filter zones
zones = cloudflare_list_zones(account_id=accounts["result"][0]["id"])
Notes:
- Requires appropriate API token permissions to access account information
- Account IDs from this response can be used with cloudflare_list_zones(account_id=...)
- Account-level operations (like rulesets) require account IDs from this list
- Each account can contain multiple zones (domains)
Raises:
Exception: If API request fails, authentication issues, or insufficient permissions
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
response = await client.list_accounts()
return response
except Exception as exc:
logger.exception("cloudflare_list_accounts failed")
return {"error": str(exc)}
# Rulesets - Add Rule tool
@mcp.tool()
async def cloudflare_add_ruleset_rule(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID to add the rule to.")],
action: Annotated[str, Field(description="Rule action, e.g., 'js_challenge', 'block', 'challenge', 'log', etc.")],
expression: Annotated[str, Field(description="Filter expression to match requests.")],
description: Annotated[Optional[str], Field(description="Optional rule description.")] = None,
enabled: Annotated[Optional[bool], Field(description="Optional enabled flag; defaults to provider behavior if omitted.")] = None,
position_before_rule_id: Annotated[Optional[str], Field(description="Optional: place new rule before this rule ID. Use empty string to set as first rule.")] = None,
position_after_rule_id: Annotated[Optional[str], Field(description="Optional: place new rule after this rule ID. Use empty string to set as last rule.")] = None,
position_index: Annotated[Optional[int], Field(description="Optional: exact 1-based index position for the new rule.")] = None,
extra_fields: Annotated[Optional[Dict[str, Any]], Field(description="Optional: additional fields to include in the rule body (advanced).")]= None,
ctx: Context = None,
) -> Dict[str, Any]:
"""
Add a single rule to an existing ruleset. Only creates the rule. To get the ruleset ID, use cloudflare_list_rulesets().
Use either zone scope (scope='zones', scope_id=zone_id) or account scope (scope='accounts', scope_id=account_id).
[Optional] To control placement, specify at most one of: position_before_rule_id, position_after_rule_id, position_index.
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
# Build position object (mutually exclusive)
position: Optional[Dict[str, Any]] = None
provided = [
position_before_rule_id is not None,
position_after_rule_id is not None,
position_index is not None,
]
if sum(1 for x in provided if x) > 1:
return {"error": "Only one of position_before_rule_id, position_after_rule_id, position_index may be provided"}
if position_before_rule_id is not None:
position = {"before": position_before_rule_id}
elif position_after_rule_id is not None:
position = {"after": position_after_rule_id}
elif position_index is not None:
position = {"index": position_index}
result = await client.add_rule_to_ruleset(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
action=action,
expression=expression,
description=description,
enabled=enabled,
position=position,
extra_fields=extra_fields,
)
return result
except Exception as exc:
logger.exception("cloudflare_add_ruleset_rule failed")
return {"error": str(exc)}
@mcp.tool()
async def cloudflare_list_rulesets(
scope: Annotated[str, Field(description="Target scope: 'zones' or 'accounts'.")],
scope_id: Annotated[str, Field(description="Scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ctx: Context = None,
) -> Dict[str, Any]:
"""List rulesets for the given scope."""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
return await client.list_rulesets(scope=scope, scope_id=scope_id)
except Exception as exc:
logger.exception("cloudflare_list_rulesets failed")
return {"error": str(exc)}
@mcp.tool()
async def cloudflare_get_ruleset(
scope: Annotated[str, Field(description="Target scope: 'zones' or 'accounts'.")],
scope_id: Annotated[str, Field(description="Scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID to retrieve.")],
ctx: Context = None,
) -> Dict[str, Any]:
"""Get a specific ruleset by ID for the given scope."""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
return await client.get_ruleset(scope=scope, scope_id=scope_id, ruleset_id=ruleset_id)
except Exception as exc:
logger.exception("cloudflare_get_ruleset failed")
return {"error": str(exc)}
# Rulesets - Update Rule
@mcp.tool()
async def cloudflare_update_ruleset_rule(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID containing the rule to update.")],
rule_id: Annotated[str, Field(description="Rule ID to update.")],
action: Annotated[Optional[str], Field(description="Optional new rule action, e.g., 'js_challenge', 'block', 'challenge', 'log', etc.")] = None,
expression: Annotated[Optional[str], Field(description="Optional new filter expression to match requests.")] = None,
description: Annotated[Optional[str], Field(description="Optional new rule description.")] = None,
enabled: Annotated[Optional[bool], Field(description="Optional new enabled flag.")] = None,
position_before_rule_id: Annotated[Optional[str], Field(description="Optional: place rule before this rule ID. Use empty string to set as first rule.")] = None,
position_after_rule_id: Annotated[Optional[str], Field(description="Optional: place rule after this rule ID. Use empty string to set as last rule.")] = None,
position_index: Annotated[Optional[int], Field(description="Optional: exact 1-based index position for the rule.")] = None,
extra_fields: Annotated[Optional[Dict[str, Any]], Field(description="Optional: additional fields to include in the rule body (advanced).")] = None,
ctx: Context = None,
) -> Dict[str, Any]:
"""
Update an existing rule in a ruleset. You can update the rule definition, change its position, or both.
This method allows you to:
- Update rule properties (action, expression, description, enabled status)
- Reorder the rule within the ruleset using position parameters
- Add custom fields via extra_fields parameter
Position control: Specify at most one of: position_before_rule_id, position_after_rule_id, position_index.
Use empty string for position_before_rule_id to place as first rule, or empty string for position_after_rule_id to place as last rule.
Args:
scope: Target scope - 'zones' for zone rulesets or 'accounts' for account rulesets
scope_id: The scope identifier (zone_id when scope='zones', or account_id when scope='accounts')
ruleset_id: Ruleset ID containing the rule to update
rule_id: Rule ID to update
action: Optional new rule action (e.g., 'js_challenge', 'block', 'challenge', 'log')
expression: Optional new filter expression to match requests
description: Optional new rule description
enabled: Optional new enabled flag
position_before_rule_id: Optional - place rule before this rule ID (use empty string for first position)
position_after_rule_id: Optional - place rule after this rule ID (use empty string for last position)
position_index: Optional - exact 1-based index position for the rule
extra_fields: Optional additional fields to include in the rule body (advanced usage)
Returns:
dict: Complete ruleset after updating the rule, including all rules and metadata
Raises:
ValueError: If multiple position parameters are provided or scope is invalid
Exception: If the API request fails or rule/ruleset is not found
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
# Build position object (mutually exclusive)
position: Optional[Dict[str, Any]] = None
provided = [
position_before_rule_id is not None,
position_after_rule_id is not None,
position_index is not None,
]
if sum(1 for x in provided if x) > 1:
return {"error": "Only one of position_before_rule_id, position_after_rule_id, position_index may be provided"}
if position_before_rule_id is not None:
position = {"before": position_before_rule_id}
elif position_after_rule_id is not None:
position = {"after": position_after_rule_id}
elif position_index is not None:
position = {"index": position_index}
result = await client.update_rule_in_ruleset(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
rule_id=rule_id,
action=action,
expression=expression,
description=description,
enabled=enabled,
position=position,
extra_fields=extra_fields,
)
return result
except Exception as exc:
logger.exception("cloudflare_update_ruleset_rule failed")
return {"error": str(exc)}
# Rulesets - Delete Rule
@mcp.tool()
async def cloudflare_delete_ruleset_rule(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID containing the rule to delete.")],
rule_id: Annotated[str, Field(description="Rule ID to delete.")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
Delete a rule from a ruleset. This operation creates a new version of the ruleset.
This method permanently removes the specified rule from the ruleset. The operation is irreversible
and will create a new version of the ruleset. The response includes the complete updated ruleset
after the rule deletion.
Args:
scope: Target scope - 'zones' for zone rulesets or 'accounts' for account rulesets
scope_id: The scope identifier (zone_id when scope='zones', or account_id when scope='accounts')
ruleset_id: Ruleset ID containing the rule to delete
rule_id: Rule ID to delete
Returns:
dict: Complete ruleset after deleting the rule, including all remaining rules and metadata
Raises:
ValueError: If scope is invalid
Exception: If the API request fails or rule/ruleset is not found
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
result = await client.delete_rule_from_ruleset(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
rule_id=rule_id,
)
return result
except Exception as exc:
logger.exception("cloudflare_delete_ruleset_rule failed")
return {"error": str(exc)}
# Rulesets - List Versions
@mcp.tool()
async def cloudflare_list_ruleset_versions(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID to list versions for.")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
List all versions of a ruleset. Returns metadata about each version without the actual rules.
This method retrieves a list of all versions of the specified ruleset, including version numbers,
timestamps, and basic metadata. To get the actual rules for a specific version, use the
cloudflare_get_ruleset_version tool.
Args:
scope: Target scope - 'zones' for zone rulesets or 'accounts' for account rulesets
scope_id: The scope identifier (zone_id when scope='zones', or account_id when scope='accounts')
ruleset_id: Ruleset ID to list versions for
Returns:
dict: List of ruleset versions with metadata including version numbers, timestamps, and basic info
Raises:
ValueError: If scope is invalid
Exception: If the API request fails or ruleset is not found
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
result = await client.list_ruleset_versions(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
)
return result
except Exception as exc:
logger.exception("cloudflare_list_ruleset_versions failed")
return {"error": str(exc)}
# Rulesets - Get Specific Version
@mcp.tool()
async def cloudflare_get_ruleset_version(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID to get version for.")],
version: Annotated[str, Field(description="Version number to retrieve (e.g., '1', '2', '3').")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
Get a specific version of a ruleset including all its rules and configuration.
This method retrieves the complete configuration of a specific version of a ruleset,
including all rules, their actions, expressions, and metadata. This is useful for
reviewing historical configurations or comparing different versions.
Args:
scope: Target scope - 'zones' for zone rulesets or 'accounts' for account rulesets
scope_id: The scope identifier (zone_id when scope='zones', or account_id when scope='accounts')
ruleset_id: Ruleset ID to get version for
version: Version number to retrieve (e.g., '1', '2', '3')
Returns:
dict: Complete ruleset version including all rules, actions, expressions, and metadata
Raises:
ValueError: If scope is invalid
Exception: If the API request fails or ruleset/version is not found
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
result = await client.get_ruleset_version(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
version=version,
)
return result
except Exception as exc:
logger.exception("cloudflare_get_ruleset_version failed")
return {"error": str(exc)}
# Rulesets - Get Version by Tag
@mcp.tool()
async def cloudflare_get_ruleset_version_by_tag(
scope: Annotated[str, Field(description="Target scope: 'zones' for zone rulesets or 'accounts' for account rulesets.")],
scope_id: Annotated[str, Field(description="The scope identifier: zone_id when scope='zones', or account_id when scope='accounts'.")],
ruleset_id: Annotated[str, Field(description="Ruleset ID to get version for.")],
version: Annotated[str, Field(description="Version number to retrieve (e.g., '1', '2', '3').")],
tag: Annotated[str, Field(description="Tag name to filter rules by (e.g., 'wordpress', 'drupal', 'cve-2023-1234').")],
ctx: Context = None,
) -> Dict[str, Any]:
"""
Get a specific version of a ruleset filtered by tag. Returns only rules that match the specified tag.
This method is particularly useful for managed rulesets where rules are categorized by tags
such as 'wordpress', 'drupal', 'cve-2023-1234', etc. It allows you to view only the rules
that are relevant to a specific technology or vulnerability category.
Args:
scope: Target scope - 'zones' for zone rulesets or 'accounts' for account rulesets
scope_id: The scope identifier (zone_id when scope='zones', or account_id when scope='accounts')
ruleset_id: Ruleset ID to get version for
version: Version number to retrieve (e.g., '1', '2', '3')
tag: Tag name to filter rules by (e.g., 'wordpress', 'drupal', 'cve-2023-1234')
Returns:
dict: Ruleset version with only rules that match the specified tag, including their categories and metadata
Raises:
ValueError: If scope is invalid
Exception: If the API request fails or ruleset/version/tag is not found
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
result = await client.get_ruleset_version_by_tag(
scope=scope,
scope_id=scope_id,
ruleset_id=ruleset_id,
version=version,
tag=tag,
)
return result
except Exception as exc:
logger.exception("cloudflare_get_ruleset_version_by_tag failed")
return {"error": str(exc)}
# Device Certificate Provisioning
@mcp.tool()
async def client_certificate_provisioning_enable(
zone_id: Annotated[str, Field(description="Zone ID for which to enable client certificate provisioning. Obtain this from cloudflare_list_zones().")],
enabled: Annotated[bool, Field(description="Set to true to enable, false to disable client certificate provisioning.")] = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""
Enable or disable client certificate provisioning for Cloudflare WARP Device Information Only mode.
Device Information Only mode allows you to enforce device posture rules when users connect to
self-hosted Cloudflare Access applications. This mode relies on client certificates generated
from your account to establish trust between the Access application and the device.
When enabled, WARP clients enrolled in Device Information Only mode will automatically:
- Generate a unique client certificate per device
- Install the certificate on the device (user store on Windows, System keychain on macOS, etc.)
- Use the certificate to authenticate with Access applications
- Allow enforcement of WARP client posture checks
This is the first required step for setting up Device Information Only mode. After enabling
this setting, you must:
1. Configure device profiles to use "Device Information Only" service mode
2. Enroll devices into your Zero Trust organization
3. Set up mTLS rules to require valid client certificates
4. Configure device posture checks and Access policies
Feature Availability:
- Supported on: Windows, macOS, Linux, iOS, Android, ChromeOS
- Works with self-hosted Access applications
- Certificate name matches the WARP enrollment Device ID
- Certificates can be viewed in Cloudflare dashboard under SSL/TLS > Client Certificates
Important Notes:
- Requires 'SSL and Certificates Write' API token permission
- Not compatible with Windows pre-login feature (user must be logged in)
- Certificate is installed in the user certificate store
- Each enrolled device receives a unique client certificate
- Certificates are automatically managed by the WARP client
Args:
zone_id: The zone ID for which to enable/disable client certificate provisioning.
This must be a valid zone ID accessible to the authenticated user.
Obtain zone IDs using cloudflare_list_zones().
enabled: True to enable client certificate provisioning (default),
False to disable it. When enabled, WARP clients in Device Information Only
mode will generate and install client certificates.
ctx: Injected request context (provided by FastMCP).
Returns:
dict: JSON response containing the updated certificate provisioning configuration,
including the enabled status and any additional settings.
Examples:
# Enable client certificate provisioning for a zone
result = client_certificate_provisioning_enable(
zone_id="abc123def456",
enabled=True
)
# Disable client certificate provisioning
result = client_certificate_provisioning_enable(
zone_id="abc123def456",
enabled=False
)
Required API Token Permissions:
- SSL and Certificates Write
Related Operations:
After enabling, configure in Cloudflare One dashboard:
1. Set device profile service mode to "Device Information Only"
2. Configure mTLS rules for your Access application hostname
3. Create WAF custom rule to block requests without valid client certificates
4. Set up WARP client checks and device posture policies
Documentation Reference:
https://developers.cloudflare.com/cloudflare-one/team-and-resources/devices/warp/configure-warp/warp-modes/device-information-only/
Raises:
Exception: If zone ID is invalid, insufficient permissions, or API request fails
"""
try:
client = ctx.request_context.lifespan_context.client # type: ignore[attr-defined]
result = await client.enable_client_certificate_provisioning(
zone_id=zone_id,
enabled=enabled,
)
return result
except Exception as exc:
logger.exception("client_certificate_provisioning_enable failed")
return {"error": str(exc)}
def parse_args() -> Any:
parser = argparse.ArgumentParser(description="Cloudflare MCP Server")
parser.add_argument("--sse", action="store_true", help="Run with SSE (HTTP) transport")
parser.add_argument("--host", default="0.0.0.0", help="Host for SSE mode")
parser.add_argument("--port", type=int, default=8000, help="Port for SSE mode")
return parser.parse_args()
def main() -> None:
# Ensure .env is loaded even if host sets a different CWD
project_root = Path(__file__).resolve().parents[2]
load_dotenv(dotenv_path=project_root / ".env")
args = parse_args()
if args.sse:
mcp.run(transport="sse", host=args.host, port=args.port)
else:
mcp.run()
if __name__ == "__main__":
main()