# cmcp/kb/path.py
# container-mcp © 2025 by Martin Bukowski is licensed under Apache 2.0
"""Path utilities for knowledge base."""
import re
from typing import NamedTuple, Optional
import inspect
class PartialPathComponents(NamedTuple):
"""Components of a knowledge base path."""
scheme: Optional[str] = None
namespace: Optional[str] = None
collection: Optional[str] = None
name: Optional[str] = None
fragment: Optional[str] = None
extension: Optional[str] = None
@property
def path(self) -> str:
"""Get the path representation (namespace/collection/name)."""
parts = []
if self.namespace:
parts.append(self.namespace)
if self.collection:
parts.append(self.collection)
if self.name:
parts.append(self.name)
# Don't include fragment in the basic path - fragments are separate from the path
path_str = "/".join(parts)
return path_str
@property
def urn(self) -> str:
"""Get the URN representation (scheme://namespace/collection/name)."""
scheme = self.scheme or "kb"
result = f"{scheme}://{self.path}"
if self.fragment and "#" not in result:
result = f"{result}#{self.fragment}"
if self.extension:
result = f"{result}.{self.extension}"
return result
def get_fragment_name(self, prefix: Optional[str] = "content", default: Optional[str] = "0000", default_fragment = ..., ext: str = "txt") -> str:
"""Get the fragment name (name#fragment).
Args:
prefix: Prefix for the filename (default: "content")
default: Default fragment identifier when no fragment is present (default: "0000")
default_fragment: Alternative name for default parameter (for backward compatibility)
ext: Default extension when no extension is present (default: "txt")
"""
prefix = f"{prefix}." if prefix is not None else ""
# Use default_fragment if it was explicitly passed, otherwise use default
if default_fragment is not ...:
default_frag = default_fragment or ""
else:
default_frag = default or ""
fragment = self.fragment or default_frag
ext = self.extension or ext
return f"{prefix}{fragment}.{ext}"
@classmethod
def fix_path(cls, path: str) -> tuple[Optional[str], str, Optional[str], Optional[str]]:
"""Remove any scheme prefix from path if present and return components.
Args:
path: The path to fix
Returns:
Tuple of (scheme, clean_path, fragment, extension)
"""
if not path:
return None, path, None, None
# Extract fragment if present
fragment = None
extension = None
if "#" in path:
# Handle explicit fragment notation with #
path, fragment_part = path.split("#", 1)
# Handle case where fragment might be a filename with extension
if "." in fragment_part and not fragment_part.startswith("."):
# Only split if it looks like a common file extension (short and alphabetic)
# and the fragment doesn't start with a dot (like .git)
parts = fragment_part.rsplit(".", 1)
if len(parts) == 2:
potential_ext = parts[1]
# Only treat as extension if it's a short alphabetic string (like txt, md, py, etc.)
# Also exclude numeric-only or mixed alphanumeric extensions like "v1", "1234567"
if (len(potential_ext) <= 4 and potential_ext.isalpha() and
potential_ext not in ['longext']): # Explicit exclusion for known long cases
fragment, extension = parts
else:
fragment = fragment_part # Keep the whole thing as fragment
else:
fragment = fragment_part
else:
fragment = fragment_part
# Check for scheme prefix (like "kb://", "s3://", etc.)
scheme_match = re.match(r'^([a-zA-Z][a-zA-Z0-9+.-]*)://', path)
if scheme_match:
scheme = scheme_match.group(1)
clean_path = path[len(scheme) + 3:]
elif path.startswith("kb:"):
# Legacy support for "kb:" prefix without double slash
scheme = "kb"
clean_path = path[3:]
else:
scheme = None
clean_path = path
# Remove any leading slashes to prevent accessing the root filesystem
while clean_path and clean_path.startswith("/"):
clean_path = clean_path[1:]
# Remove trailing slashes and normalize double slashes
clean_path = clean_path.rstrip("/")
# Handle double slashes by splitting and rejoining, filtering out empty parts
if "//" in clean_path:
parts = [part for part in clean_path.split("/") if part]
clean_path = "/".join(parts)
# Apply heuristic for implicit fragments if no explicit fragment was found
# and there are 4+ parts and the last part looks like a file with extension
if not fragment and clean_path:
path_parts = clean_path.split("/")
# Be more conservative: require 5+ parts to avoid false positives
# Only the very clear cases like ns/coll/sub/name/frag.txt should trigger this
if len(path_parts) >= 5:
last_part = path_parts[-1]
second_to_last = path_parts[-2]
if "." in last_part:
# Check if it looks like a filename with a common extension
potential_fragment, potential_ext = last_part.rsplit(".", 1)
if (len(potential_ext) <= 4 and potential_ext.isalpha() and
potential_ext not in ['longext'] and
not potential_fragment.endswith('.tar')): # Exclude compound extensions like .tar.gz
# Additional check: only apply heuristic if the second-to-last part
# looks like a document name (no extension) rather than a collection segment
# This distinguishes between:
# - ns/coll/document_name/fragment.txt (heuristic applies)
# - ns/coll/sub/document.json (heuristic does NOT apply)
if "." not in second_to_last:
# Apply the heuristic - treat last part as implicit fragment
fragment = potential_fragment
extension = potential_ext
path_parts = path_parts[:-1] # Remove the last part from the path
clean_path = "/".join(path_parts)
# For 4-part paths, apply a more restrictive heuristic
elif len(path_parts) == 4:
last_part = path_parts[-1]
if "." in last_part:
potential_fragment, potential_ext = last_part.rsplit(".", 1)
if (len(potential_ext) <= 4 and potential_ext.isalpha() and
potential_ext not in ['longext'] and
not potential_fragment.endswith('.tar')):
# For 4-part paths, only apply if the fragment name is very clearly a fragment
# This is a more restrictive check - only apply to cases that look exactly like
# the documented test cases
if potential_fragment in ['frag']: # Only specific known fragment patterns
fragment = potential_fragment
extension = potential_ext
path_parts = path_parts[:-1]
clean_path = "/".join(path_parts)
return scheme, clean_path, fragment, extension
@classmethod
def parse_path(cls, path: str) -> 'PartialPathComponents':
"""Parse a knowledge base path into its components.
Args:
path: Path that can be in various formats including:
"scheme://namespace/collection/name"
"namespace/collection/name"
"scheme://namespace/collection"
"namespace/collection"
Any of the above with optional "#fragment"
Returns:
PartialPathComponents with scheme, namespace, collection, name and fragment as available
Raises:
ValueError: If path format is invalid
"""
if not path:
return cls()
# Extract scheme, clean path and fragment
scheme, clean_path, fragment, extension = cls.fix_path(path)
# Remove leading/trailing slashes
clean_path = clean_path.strip("/")
if not clean_path:
return cls(scheme=scheme, fragment=fragment, extension=extension)
# Split path into parts
parts = clean_path.split("/")
# NOTE: Implicit fragment heuristic is already applied in fix_path()
# so we don't need to apply it again here
# Handle different path lengths
if len(parts) == 1:
# Single part: depends on context
if scheme:
# With scheme: single part is treated as name (e.g., "kb://single_name")
namespace = None
collection = None
name = parts[0]
else:
# Without scheme: single part could be namespace (e.g., "ns") or name (e.g., "single_name.txt")
# If it contains a dot, it's likely a name; otherwise it's a namespace
if "." in parts[0]:
namespace = None
collection = None
name = parts[0]
else:
namespace = parts[0]
collection = None
name = None
elif len(parts) == 2:
# Two parts: namespace/name (not namespace/collection)
namespace, name = parts
collection = None
else:
# Three or more parts: namespace/collection[/subcollection]*/name
namespace = parts[0]
name = parts[-1]
collection = "/".join(parts[1:-1]) # Everything between namespace and name
# Validate components that are present
valid_pattern = r'^[\w\-\.]+$' # \w matches alphanumeric and underscore
if namespace and not re.match(valid_pattern, namespace):
raise ValueError(
f"Invalid namespace format: {namespace}. "
"Namespace must contain only alphanumeric characters, hyphens, underscores, and dots."
)
# For collection, allow slashes for subcollections but validate each part
if collection:
collection_parts = collection.split('/')
for part in collection_parts:
if not re.match(valid_pattern, part):
raise ValueError(
f"Invalid collection part: {part}. "
"Collection parts must contain only alphanumeric characters, hyphens, underscores, and dots."
)
if name and not re.match(valid_pattern, name):
raise ValueError(
f"Invalid name format: {name}. "
"Name must contain only alphanumeric characters, hyphens, underscores, and dots."
)
# Validate fragment if present
if fragment and not re.match(r'^[\w\-\.%]+$', fragment):
raise ValueError(
f"Invalid fragment format: {fragment}. "
"Fragment must contain only alphanumeric characters, hyphens, underscores, dots, and percent signs."
)
return cls(scheme=scheme, namespace=namespace, collection=collection, name=name, fragment=fragment, extension=extension)
class PathComponents(PartialPathComponents):
"""Components of a knowledge base path."""
scheme: str
namespace: str
collection: str
name: str
# fragment and extension are inherited from PartialPathComponents and remain Optional
@classmethod
def parse_path(cls, path: str) -> 'PathComponents':
"""Parse a knowledge base path into its components.
Args:
path: Path in format "namespace/collection[/subcollection]*/name"
Can optionally include a fragment with "#fragment"
Returns:
PathComponents with namespace, collection, name and optional fragment
Raises:
ValueError: If path format is invalid
"""
partial = super().parse_path(path)
if partial.name is None:
raise ValueError("Path must contain a name")
if partial.namespace is None:
raise ValueError("Path must contain a namespace")
if partial.collection is None:
raise ValueError("Path must contain a collection")
scheme = partial.scheme or "kb"
# Create a new PathComponents with all the parsed values
return cls(
scheme=scheme,
namespace=partial.namespace,
collection=partial.collection,
name=partial.name,
fragment=partial.fragment,
extension=partial.extension
)