#!/usr/bin/env python3
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any
import plumbum
import yaml
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts import base
from mcp.types import ToolAnnotations
from pydantic import BaseModel, Field
from rich.console import Console
from rich.logging import RichHandler
# Set up rich logging
console = Console(stderr=True)
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
datefmt="[%X]",
handlers=[
RichHandler(console=console, show_time=True),
],
)
log = logging.getLogger("rich")
OUTPUT_DIR = Path(".kustomize-mcp")
INSTRUCTIONS = """You are a Kustomize rendering and diffing server.
Configuration can be rendered for a given path, stored in checkpoints. You can
also diff all rendered paths between two checkpoints, or diff two specific paths
inside the same checkpoints.
Use checkpoints to store different versions of the configuration for the same
path. This allows you to track the effects of changes over time.
To understand what files are involved in a Kustomization, you can query for its
dependencies. Changing any of these will affect the rendered output of the
Kustomization. Remember that Kustomizations may depend on other Kustomizations,
so the effect of a change may be indirect.
"""
USAGE = """The following tools are available to you for rendering Kubernetes
manifests from Kustomize configurations and show the differences between paths,
or the effects of changes on the configuration: `create_checkpoint`,
`render`, `diff_checkpoints`, `diff_paths`, `dependencies`.
To compare changes in a Kustomize configuration over time, create a checkpoint,
render the configuration into it, make your changes, render again into a new
checkpoint, and then diff the two checkpoints. This will show you the effects of
your changes, and is especially useful to verify that refactorings do not change
the resulting configuration.
To compare Kustomize configurations in two separate directories, create a
checkpoint, render both directories into it, and then diff the two rendered
paths.
"""
# Create the MCP server instance
mcp = FastMCP("kustomize-mcp", instructions=INSTRUCTIONS, json_response=True)
class ManifestMetadata(BaseModel):
source_path: str
api_version: str
kind: str
name: str
namespace: str | None = None
def to_filename(self) -> str:
parts = [
self.api_version.replace("/", "#"),
self.kind,
self.namespace if self.namespace else "",
self.name,
]
res = "+".join(parts)
return f"{res}.yaml"
@classmethod
def from_manifest(cls, source_path: str, manifest: dict) -> "ManifestMetadata":
api_version = manifest.get("apiVersion", "")
kind = manifest.get("kind", "")
metadata = manifest.get("metadata", {})
name = metadata.get("name", "")
namespace = metadata.get("namespace")
return cls(
source_path=source_path,
api_version=api_version,
kind=kind,
name=name,
namespace=namespace,
)
@classmethod
def from_filename(cls, filename: str) -> "ManifestMetadata":
assert filename.endswith(".yaml"), "Filename must end with .yaml"
source_dir = Path(filename).parent.as_posix()
parts = Path(filename).name[:-5].split("+") # Remove .yaml
return cls(
source_path=source_dir,
api_version=parts[0].replace("#", "/"),
kind=parts[1],
namespace=parts[2] if parts[2] else None,
name=parts[3],
)
class DiffResult(BaseModel):
added: list[ManifestMetadata]
deleted: list[ManifestMetadata]
modified: list[ManifestMetadata]
replaced: list[ManifestMetadata]
diff_path: str | None = Field(
default=None, description="Path to the detailed diff output, if any."
)
class Renderer:
"""A class to handle Kustomize rendering operations."""
def __init__(self, root_dir: Path, load_restrictions: bool = True):
self.kustomize = plumbum.local["kustomize"]
self.root_dir = root_dir
self.checkpoints_dir = root_dir / OUTPUT_DIR / "checkpoints"
self.checkpoints_dir.mkdir(parents=True, exist_ok=True)
self.load_restrictions = load_restrictions
def clear(self) -> None:
if self.checkpoints_dir.exists():
shutil.rmtree(self.checkpoints_dir)
self.checkpoints_dir.mkdir(parents=True, exist_ok=True)
def create_checkpoint(self) -> str:
checkpoint_path = Path(
tempfile.mkdtemp(dir=self.checkpoints_dir.as_posix(), prefix="ckp-")
)
return checkpoint_path.name
def delete_checkpoint(self, checkpoint_id: str) -> None:
checkpoint_path = self.checkpoints_dir / checkpoint_id
if checkpoint_path.exists():
shutil.rmtree(checkpoint_path)
def render(self, checkpoint_id: str, path: Path) -> Path:
if path.is_absolute():
raise ValueError("Path has to be relative.")
if "/" in checkpoint_id or checkpoint_id.strip() in ["", ".", ".."]:
raise ValueError("Checkpoint ID has to be a valid single path segment.")
dest_path = self.checkpoints_dir / checkpoint_id / path
try:
dest_path.mkdir(parents=True, exist_ok=False)
except FileExistsError as err:
raise ValueError(
f"Path {path} has already been rendered in checkpoint {checkpoint_id}. "
"Create a new checkpoint to render again."
) from err
self._build(path, dest_path)
return dest_path.relative_to(self.root_dir)
def _build(self, src_path: Path, dest_path: Path) -> None:
# Run kustomize build
args = ["build", src_path.as_posix(), "--enable-helm"]
if not self.load_restrictions:
args.append("--load-restrictor=LoadRestrictionsNone")
out = self.kustomize(*args, cwd=self.root_dir)
docs = yaml.safe_load_all(out)
for doc in docs:
if not doc:
continue
filename = self._get_filename(src_path, doc)
with open(dest_path / filename, "w") as f:
yaml.safe_dump(doc, f)
def _get_filename(self, src_path: Path, doc: dict) -> str:
manifest = ManifestMetadata.from_manifest(src_path.as_posix(), doc)
return manifest.to_filename()
def is_kustomization_file(path: Path) -> bool:
return path.name in ["kustomization.yaml", "kustomization.yml", "Kustomization"]
KustomizationItem = dict | list | str | Any
class DepsResolver:
"""A class to resolve Kustomization dependencies."""
def __init__(self, root_dir: Path):
self.root_dir = root_dir.resolve()
self._collect_paths()
@property
def paths(self) -> list[Path]:
return self.files
@property
def kustomizations(self) -> list[Path]:
return list(self.kustomization_dirs.values())
def compute_dependencies(
self, path: Path, recursive: bool = False, reverse: bool = False
) -> list[Path]:
if not path.is_file():
raise ValueError(f"Path {path} is not a file.")
if reverse:
return self._compute_reverse_dependencies(path, recursive)
kustomization = self._parse_kustomization(path)
paths = self._collect_paths_in_kustomization(path.parent, kustomization)
if recursive:
# Track visited kustomizations to prevent infinite loops
visited = {path.resolve()}
paths = self._expand_recursive_dependencies(paths, visited)
return [p.relative_to(self.root_dir) for p in paths]
def _compute_reverse_dependencies(
self, path: Path, recursive: bool = False
) -> list[Path]:
"""Compute reverse dependencies: which Kustomizations depend on the given file.
Args:
path: Path to a file or Kustomization
recursive: If True, recursively find Kustomizations that depend on
the direct dependents
Returns:
List of Kustomization paths that depend on the given file
"""
# Build a reverse dependency map: file -> set of Kustomizations that depend on it
reverse_deps: dict[Path, set[Path]] = {}
for kustomization_path in self.kustomizations:
try:
kustomization = self._parse_kustomization(kustomization_path)
except ValueError:
# Ignore invalid kustomizations
continue
deps = self._collect_paths_in_kustomization(
kustomization_path.parent, kustomization
)
# For each dependency, record that this kustomization depends on it
for dep in deps:
if dep.resolve() not in reverse_deps:
reverse_deps[dep.resolve()] = set()
reverse_deps[dep.resolve()].add(kustomization_path.resolve())
# Find direct dependents of the given path
target_path = (self.root_dir / path).resolve()
direct_dependents = reverse_deps.get(target_path, set())
if not recursive:
return [p.relative_to(self.root_dir) for p in direct_dependents]
# Recursively find all transitive dependents
all_dependents: set[Path] = set()
visited: set[Path] = set()
to_process = list(direct_dependents)
while to_process:
current = to_process.pop()
if current.resolve() in visited:
continue
visited.add(current.resolve())
all_dependents.add(current)
# Add Kustomizations that depend on the current one
indirect = reverse_deps.get(current.resolve(), set())
to_process.extend(indirect)
return [p.relative_to(self.root_dir) for p in all_dependents]
def _collect_paths(self):
self.files = [p for p in self.root_dir.rglob("*") if p.is_file()]
self.kustomization_dirs = {
p.parent: p for p in self.files if is_kustomization_file(p)
}
def _parse_kustomization(self, path: Path) -> dict:
with open(path) as f:
k = yaml.safe_load(f)
if (
not k
or not k.get("apiVersion", "").startswith("kustomize.config.k8s.io/")
or k.get("kind") not in ("Kustomization", "Component")
):
raise ValueError(f"File {path} is not a valid Kustomization file.")
return k
def _expand_recursive_dependencies(
self, paths: list[Path], visited: set[Path]
) -> list[Path]:
"""Recursively expand dependencies of Kustomizations.
Args:
paths: List of dependency paths to expand
visited: Set of already visited Kustomization paths to prevent cycles
Returns:
Complete list of all transitive dependencies
"""
all_deps = []
for path in paths:
all_deps.append(path)
# If this is a Kustomization file, recursively expand its dependencies
if is_kustomization_file(path) and path.resolve() not in visited:
visited.add(path.resolve())
try:
kustomization = self._parse_kustomization(path)
except ValueError:
# Ignore invalid kustomizations
continue
nested_deps = self._collect_paths_in_kustomization(
path.parent, kustomization
)
# Recursively expand nested dependencies
expanded = self._expand_recursive_dependencies(nested_deps, visited)
all_deps.extend(expanded)
return all_deps
def _collect_paths_in_kustomization(
self, base_path: Path, item: KustomizationItem
) -> list[Path]:
candidates = []
if isinstance(item, str):
candidates.append((base_path / item).resolve())
elif isinstance(item, list):
for subitem in item:
candidates.extend(
self._collect_paths_in_kustomization(base_path, subitem)
)
elif isinstance(item, dict):
for k, v in item.items():
candidates.extend(self._collect_paths_in_kustomization(base_path, k))
candidates.extend(self._collect_paths_in_kustomization(base_path, v))
# Other types are ignored
file_deps = [p for p in candidates if p in self.files]
k_deps = [
self.kustomization_dirs[p] for p in candidates if p in self.kustomization_dirs
]
return file_deps + k_deps
class Differ:
def __init__(
self,
path_1: Path,
path_2: Path,
root_tmp_dir: Path,
strip_roots: list[Path] | None = None,
):
self.git = plumbum.local["git"]
self.diff_root = Path(tempfile.mkdtemp(dir=root_tmp_dir.as_posix()))
self.strip_roots = [p.resolve() for p in (strip_roots or [])]
self.path_1 = self.diff_root / "1"
self.path_2 = self.diff_root / "2"
self.path_1.symlink_to(path_1, target_is_directory=True)
self.path_2.symlink_to(path_2, target_is_directory=True)
def diff(self) -> DiffResult:
res = self._summary()
diff = self._git_diff()
diff_path = self.diff_root / "changes.diff"
with open(diff_path, "w") as f:
f.write(diff)
res.diff_path = diff_path.as_posix()
return res
def _git_diff(self, *args) -> str:
# git diff returns exit code 1 when files differ, which is expected
return self.git(
"diff",
"--no-index",
"--no-prefix",
*args,
"1/",
"2/",
retcode=(0, 1),
cwd=self.diff_root,
)
def _summary(self) -> DiffResult:
res = self._git_diff("--name-status")
lines = res.strip().splitlines()
def filter_type(type_code: str) -> list[Path]:
return [
Path(line.split("\t")[1]) for line in lines if line.startswith(type_code)
]
def to_manifest(path: Path) -> ManifestMetadata:
# Reconstruct full path by resolving symlinks and stripping roots
# First, make absolute (current dir might be different than diff root)
path = (self.diff_root / path).resolve()
for root in self.strip_roots:
if path.is_relative_to(root):
path = path.relative_to(root)
return ManifestMetadata.from_filename(path.as_posix())
added = filter_type("A") + filter_type("C")
deleted = filter_type("D")
modified = filter_type("M")
replaced = filter_type("R")
return DiffResult(
added=[to_manifest(p) for p in added],
deleted=[to_manifest(p) for p in deleted],
modified=[to_manifest(p) for p in modified],
replaced=[to_manifest(p) for p in replaced],
)
class Server:
def __init__(self, root_dir: Path, kustomize_load_restrictions: bool = True):
self.root_dir = root_dir
self.diffs_root = root_dir / OUTPUT_DIR / "diffs"
self.resolver = DepsResolver(root_dir)
self.renderer = Renderer(root_dir, load_restrictions=kustomize_load_restrictions)
self._prepare_dirs()
@property
def checkpoints_dir(self) -> Path:
return self.renderer.checkpoints_dir
def create_checkpoint(self) -> str:
return self.renderer.create_checkpoint()
def clear(self, checkpoint_id: str | None = None):
if checkpoint_id is None:
self.renderer.clear()
else:
self.renderer.delete_checkpoint(checkpoint_id)
def render(self, checkpoint_id: str, path: Path) -> Path:
return self.renderer.render(checkpoint_id, path)
def diff_checkpoints(self, checkpoint_id_1: str, checkpoint_id_2: str) -> DiffResult:
self._ensure_rendered(checkpoint_id_1)
self._ensure_rendered(checkpoint_id_2)
differ = Differ(
self.checkpoints_dir / checkpoint_id_1,
self.checkpoints_dir / checkpoint_id_2,
root_tmp_dir=self.diffs_root,
strip_roots=[
self.checkpoints_dir / checkpoint_id_1,
self.checkpoints_dir / checkpoint_id_2,
],
)
return self._fix_diff_path(differ.diff())
def diff_paths(self, checkpoint_id: str, path_1: Path, path_2: Path) -> DiffResult:
# If the given paths were not yet rendered in the checkpoint, do it
# transparently.
self._ensure_rendered(checkpoint_id, path_1)
self._ensure_rendered(checkpoint_id, path_2)
ckpt_path = self.checkpoints_dir / checkpoint_id
differ = Differ(
ckpt_path / path_1,
ckpt_path / path_2,
root_tmp_dir=self.diffs_root,
strip_roots=[ckpt_path],
)
return self._fix_diff_path(differ.diff())
def dependencies(
self, path: Path, recursive: bool = False, reverse: bool = False
) -> list[Path]:
return [
p.relative_to(self.resolver.root_dir)
for p in self.resolver.compute_dependencies(
path, recursive=recursive, reverse=reverse
)
]
def _prepare_dirs(self):
self.diffs_root.mkdir(parents=True, exist_ok=True)
self.checkpoints_dir.mkdir(parents=True, exist_ok=True)
# Ignore the whole thing in git
with open(self.root_dir / OUTPUT_DIR / ".gitignore", "w") as f:
f.write("*\n")
def _ensure_rendered(self, checkpoint_id: str, path: Path | None = None) -> None:
ckpt_path = self.checkpoints_dir / checkpoint_id
if not ckpt_path.exists():
raise ValueError(f"Checkpoint {checkpoint_id} does not exist.")
if path is None:
# Check if the checkpoint is empty
if not any(ckpt_path.iterdir()):
raise ValueError(
f"No paths have been rendered in checkpoint {checkpoint_id}."
)
return
# Ensure the given path is rendered
if not (ckpt_path / path).exists():
self.renderer.render(checkpoint_id, path)
def _fix_diff_path(self, diff_result: DiffResult) -> DiffResult:
"""Convert diff paths to be relative to the root dir."""
if diff_result.diff_path is not None:
diff_result.diff_path = (
Path(diff_result.diff_path).relative_to(self.root_dir).as_posix()
)
return diff_result
server = Server(
Path.cwd(),
os.getenv("KUSTOMIZE_LOAD_RESTRICTIONS", "true").lower() in ["true", "1", "yes"],
)
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def create_checkpoint() -> str:
"""Creates an empty checkpoint with the given ID.
This checkpoint can be used to store rendered Kustomize configurations and
compare them across changes of the source configuration over time.
Returns:
str: The ID of the created checkpoint.
"""
return server.create_checkpoint()
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def clear_checkpoint(checkpoint_id: str | None = None):
"""Clears all checkpoints or a specific checkpoint if an ID is provided.
Args:
checkpoint_id (str | None): The ID of the checkpoint to clear.
If None, clears all checkpoints.
"""
return server.clear(checkpoint_id)
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def render(checkpoint_id: str, path: str) -> str:
"""Renders the Kustomize configuration at the given path and saves it in a checkpoint.
Args:
checkpoint_id (str): The ID of the checkpoint to save the rendered output.
path (str): The file system path to the Kustomize configuration.
Returns:
str: The location of the saved rendered output.
"""
return server.render(checkpoint_id, Path(path)).as_posix()
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def diff_checkpoints(checkpoint_id_1: str, checkpoint_id_2: str) -> DiffResult:
"""Compares two checkpoints and returns the differences for all rendered
Kustomize configurations.
This requires that both checkpoints have rendered configurations for the
same set of paths.
Args:
checkpoint_id_1 (str): The ID of the first checkpoint.
checkpoint_id_2 (str): The ID of the second checkpoint.
Returns:
DiffResult: A summary of the differences between the two configurations.
"""
return server.diff_checkpoints(checkpoint_id_1, checkpoint_id_2)
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def diff_paths(checkpoint_id: str, path_1: str, path_2: str) -> DiffResult:
"""Compares two Kustomize configurations rendered in the same checkpoint
and returns the differences.
Args:
checkpoint_id (str): The ID of the checkpoint to save the diff output.
path_1 (str): The file system path to the first Kustomize configuration.
path_2 (str): The file system path to the second Kustomize configuration.
Returns:
DiffResult: A summary of the differences between the two configurations.
"""
return server.diff_paths(checkpoint_id, Path(path_1), Path(path_2))
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
def dependencies(path: str, recursive: bool = False, reverse: bool = False) -> list[str]:
"""Returns the list of dependencies for the Kustomization at the given path.
Args:
path (str): The file system path to the Kustomization or file.
recursive (bool): If True, recursively expand dependencies of nested
Kustomizations. Defaults to False.
reverse (bool): If True, returns reverse dependencies - all Kustomizations
that depend on the given file. Can be combined with recursive to find
all transitive reverse dependencies. Defaults to False.
Returns:
list[str]: A list of paths that are dependencies of the Kustomization.
When recursive is True, includes all transitive dependencies.
When reverse is True, returns Kustomization paths that depend on the
given file.
"""
return [
p.as_posix()
for p in server.dependencies(Path(path), recursive=recursive, reverse=reverse)
]
@mcp.prompt()
def explain(query: str) -> list[base.Message]:
return [
base.UserMessage(
f"{USAGE}.\nExplain the Kustomize configuration in the current project "
f"by answering the question: {query}"
),
]
@mcp.prompt()
def refactor(query: str) -> list[base.Message]:
return [
base.UserMessage(
f"{USAGE}.\nRefactor the Kustomize configuration in the current project "
f"by fulfilling the request: {query}"
),
]
@mcp.prompt()
def diff_dirs(path_1: str, path_2: str) -> list[base.Message]:
return [
base.UserMessage(
"Explain the differences between the Kustomize directories at "
f"{path_1} and {path_2}."
),
base.UserMessage(
USAGE + "To compare the two Kustomize directories, create a checkpoint, "
"render both directories into it, and then diff the rendered outputs, "
"by using the `create_checkpoint` and `diff_paths` tools."
),
]
def main():
try:
mcp.run(transport="stdio")
except KeyboardInterrupt:
log.info("Shutting down MCP server...")
if __name__ == "__main__":
main()