import os
import httpx
import json
import logging
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
BASE_URL = "https://api.tailscale.com/api/v2"
# Initializing FastMCP
mcp = FastMCP("Tailscale", host="0.0.0.0")
def get_api_key() -> str:
api_key = os.environ.get("TAILSCALE_API_KEY")
if not api_key:
raise ValueError("TAILSCALE_API_KEY environment variable is required")
return api_key
def get_tailnet(tailnet: Optional[str] = None) -> str:
t_net = tailnet or os.environ.get("TAILSCALE_TAILNET")
if not t_net:
raise ValueError("TAILSCALE_TAILNET environment variable is required")
return t_net
async def make_request(method: str, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{BASE_URL}{endpoint}"
auth = (get_api_key(), "") # Basic Auth with empty password for Tailscale API
async with httpx.AsyncClient() as client:
try:
response = await client.request(method, url, json=data, auth=auth, timeout=10.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Return error details if available
try:
error_body = e.response.json()
raise RuntimeError(f"API Error {e.response.status_code}: {error_body}")
except Exception:
raise RuntimeError(f"API Error {e.response.status_code}: {e.response.text}")
except Exception as e:
raise RuntimeError(f"Request failed: {str(e)}")
# --- Devices Tools ---
@mcp.tool()
async def list_devices(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Lists the devices in a tailnet."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/devices")
@mcp.tool()
async def get_device(device_id: str) -> Dict[str, Any]:
"""Retrieves basic properties of a device."""
# Note: Tailscale API documentation says /device/{id}
return await make_request("GET", f"/device/{device_id}")
@mcp.tool()
async def delete_device(device_id: str) -> Dict[str, Any]:
"""Deletes a device from its tailnet."""
return await make_request("DELETE", f"/device/{device_id}")
@mcp.tool()
async def authorize_device(device_id: str, authorized: bool) -> Dict[str, Any]:
"""Authorizes or deauthorizes a device."""
return await make_request("POST", f"/device/{device_id}/authorized", {"authorized": authorized})
@mcp.tool()
async def set_device_tags(device_id: str, tags: List[str]) -> Dict[str, Any]:
"""Sets the tags for a device."""
return await make_request("POST", f"/device/{device_id}/tags", {"tags": tags})
@mcp.tool()
async def set_device_routes(device_id: str, routes: List[str]) -> Dict[str, Any]:
"""Sets the subnet routes that a device is allowed to expose."""
return await make_request("POST", f"/device/{device_id}/routes", {"routes": routes})
@mcp.tool()
async def set_key_expiry(device_id: str, disabled: bool) -> Dict[str, Any]:
"""Sets whether a device's key expires."""
# Check API docs: usually POST /device/{device_id}/key
return await make_request("POST", f"/device/{device_id}/key", {"keyExpiryDisabled": disabled})
# --- ACLs Tools ---
@mcp.tool()
async def get_acl(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Retrieves the current ACL for a tailnet."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/acl")
@mcp.tool()
async def set_acl(tailnet: Optional[str] = None, acl_json: Optional[Dict[str, Any]] = None, acl_str: Optional[str] = None) -> Dict[str, Any]:
"""Sets the ACL for a tailnet. Provide acl_json OR acl_str."""
t_net = get_tailnet(tailnet)
data = acl_json
if not data and acl_str:
data = json.loads(acl_str)
if not data:
raise ValueError("Must provide acl_json or acl_str")
# Endpoint requires headers 'If-Match' sometimes for concurrency,
# but for simple implementation we will just POST as per request.
# Note: Tailscale API might treat this as a complete overwrite.
return await make_request("POST", f"/tailnet/{t_net}/acl", data)
@mcp.tool()
async def validate_acl(tailnet: Optional[str] = None, acl_json: Optional[Dict[str, Any]] = None, acl_str: Optional[str] = None) -> Dict[str, Any]:
"""Validates an ACL for a tailnet without saving it."""
t_net = get_tailnet(tailnet)
data = acl_json
if not data and acl_str:
data = json.loads(acl_str)
if not data:
raise ValueError("Must provide acl_json or acl_str")
return await make_request("POST", f"/tailnet/{t_net}/acl/validate", data)
# --- Keys Tools ---
@mcp.tool()
async def list_keys(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Lists the keys in a tailnet."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/keys")
@mcp.tool()
async def create_key(
tailnet: Optional[str] = None,
ephemeral: bool = False,
reusable: bool = False,
tags: Optional[List[str]] = None,
expiry_seconds: int = 7776000 # 90 days default
) -> Dict[str, Any]:
"""Creates a new auth key."""
t_net = get_tailnet(tailnet)
payload = {
"capabilities": {
"devices": {
"create": {
"reusable": reusable,
"ephemeral": ephemeral,
"tags": tags or [],
}
}
},
"expirySeconds": expiry_seconds
}
return await make_request("POST", f"/tailnet/{t_net}/keys", payload)
@mcp.tool()
async def get_key(key_id: str, tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Retrieves a key."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/keys/{key_id}")
@mcp.tool()
async def delete_key(key_id: str, tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Deletes a key."""
t_net = get_tailnet(tailnet)
return await make_request("DELETE", f"/tailnet/{t_net}/keys/{key_id}")
# --- DNS Tools ---
@mcp.tool()
async def get_nameservers(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Retrieves the list of nameservers."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/dns/nameservers")
@mcp.tool()
async def set_nameservers(dns_ips: List[str], tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Sets the list of nameservers."""
t_net = get_tailnet(tailnet)
return await make_request("POST", f"/tailnet/{t_net}/dns/nameservers", {"dns": dns_ips})
@mcp.tool()
async def get_dns_preferences(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Retrieves DNS preferences."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/dns/preferences")
@mcp.tool()
async def set_dns_preferences(magic_dns: bool, tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Sets DNS preferences (MagicDNS)."""
t_net = get_tailnet(tailnet)
return await make_request("POST", f"/tailnet/{t_net}/dns/preferences", {"magicDNS": magic_dns})
@mcp.tool()
async def get_search_paths(tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Retrieves the search paths."""
t_net = get_tailnet(tailnet)
return await make_request("GET", f"/tailnet/{t_net}/dns/searchpaths")
@mcp.tool()
async def set_search_paths(paths: List[str], tailnet: Optional[str] = None) -> Dict[str, Any]:
"""Sets the search paths."""
t_net = get_tailnet(tailnet)
return await make_request("POST", f"/tailnet/{t_net}/dns/searchpaths", {"searchPaths": paths})
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "stdio":
mcp.run(transport="stdio")
else:
import uvicorn
uvicorn.run(mcp.streamable_http_app(), host="0.0.0.0", port=8000)