"""Search operations for LocalFS MCP."""
import re
from pathlib import Path
from typing import Iterator, List
from mcp.server.fastmcp import Context
from .schemas.search import SearchMatch, SearchResult
from .safety import (
validate_path,
check_depth,
is_binary_file,
get_relative_path,
DEFAULT_ROOT,
)
def _walk_directory(
path: Path, root: Path, max_depth: int, current_depth: int = 0
) -> Iterator[tuple[Path, int]]:
"""
Recursively walk directory tree with depth control.
Args:
path: Directory to walk
root: Root directory for relative path calculation
max_depth: Maximum depth to traverse
current_depth: Current depth (for recursion)
Yields:
Tuple of (path, depth) for each item
"""
check_depth(current_depth, max_depth)
try:
for item in path.iterdir():
yield (item, current_depth)
if item.is_dir() and current_depth < max_depth:
yield from _walk_directory(item, root, max_depth, current_depth + 1)
except PermissionError:
# Skip directories we can't access
pass
def register_search_tools(server):
"""Register all search tools with the MCP server."""
@server.tool()
def search_by_name(
name: str,
path: str = ".",
recursive: bool = True,
case_sensitive: bool = False,
ctx: Context = None,
) -> dict:
"""
Search for files and directories by exact name match.
Args:
name: Exact name to search for
path: Starting directory for search (default: root)
recursive: Search subdirectories recursively (default: True)
case_sensitive: Perform case-sensitive matching (default: False)
ctx: MCP context with session config
Returns:
Dictionary with search results
"""
config = ctx.session_config
root = DEFAULT_ROOT
max_depth = config.max_search_depth if recursive else 0
# Validate path
search_path = validate_path(path, root)
if not search_path.exists():
raise FileNotFoundError(f"Search path not found: {path}")
# Prepare search name
search_name = name if case_sensitive else name.lower()
matches = []
max_depth_reached = 0
# Walk directory
for item, depth in _walk_directory(search_path, root, max_depth):
max_depth_reached = max(max_depth_reached, depth)
# Check name match
item_name = item.name if case_sensitive else item.name.lower()
if item_name == search_name:
stat = item.stat()
matches.append(
SearchMatch(
path=get_relative_path(item, root),
name=item.name,
is_directory=item.is_dir(),
size_bytes=stat.st_size if item.is_file() else 0,
)
)
return SearchResult(
matches=matches,
total_matches=len(matches),
search_depth_reached=max_depth_reached,
truncated=False,
).model_dump()
@server.tool()
def search_by_glob(
pattern: str, path: str = ".", recursive: bool = True, ctx: Context = None
) -> dict:
"""
Search for files and directories using glob patterns.
Supports wildcards: * (any chars), ** (recursive), ? (single char), [abc] (char class)
Args:
pattern: Glob pattern (e.g., "*.txt", "**/*.py")
path: Starting directory for search (default: root)
recursive: Use recursive globbing (default: True)
ctx: MCP context with session config
Returns:
Dictionary with search results
"""
config = ctx.session_config
root = DEFAULT_ROOT
# Validate path
search_path = validate_path(path, root)
if not search_path.exists():
raise FileNotFoundError(f"Search path not found: {path}")
matches = []
# Use glob or rglob based on recursive flag
if recursive:
items = search_path.rglob(pattern)
else:
items = search_path.glob(pattern)
# Collect matches
for item in items:
# Validate depth for recursive searches
try:
rel_path = item.relative_to(search_path)
depth = len(rel_path.parts) - 1
if depth > config.max_search_depth:
continue
except ValueError:
continue
stat = item.stat()
matches.append(
SearchMatch(
path=get_relative_path(item, root),
name=item.name,
is_directory=item.is_dir(),
size_bytes=stat.st_size if item.is_file() else 0,
)
)
# Calculate max depth reached
if matches:
max_depth = max(len(Path(m.path).parts) - 1 for m in matches)
else:
max_depth = 0
return SearchResult(
matches=matches,
total_matches=len(matches),
search_depth_reached=max_depth,
truncated=False,
).model_dump()
@server.tool()
def search_by_filename_regex(
regex_pattern: str,
path: str = ".",
recursive: bool = True,
case_sensitive: bool = False,
ctx: Context = None,
) -> dict:
"""
Search for files and directories using regex pattern on filename.
Args:
regex_pattern: Regular expression pattern
path: Starting directory for search (default: root)
recursive: Search subdirectories recursively (default: True)
case_sensitive: Perform case-sensitive matching (default: False)
ctx: MCP context with session config
Returns:
Dictionary with search results
"""
config = ctx.session_config
root = DEFAULT_ROOT
max_depth = config.max_search_depth if recursive else 0
# Validate path
search_path = validate_path(path, root)
if not search_path.exists():
raise FileNotFoundError(f"Search path not found: {path}")
# Compile regex
flags = 0 if case_sensitive else re.IGNORECASE
try:
pattern = re.compile(regex_pattern, flags)
except re.error as e:
raise ValueError(f"Invalid regex pattern: {e}")
matches = []
max_depth_reached = 0
# Walk directory
for item, depth in _walk_directory(search_path, root, max_depth):
max_depth_reached = max(max_depth_reached, depth)
# Check regex match
if pattern.search(item.name):
stat = item.stat()
matches.append(
SearchMatch(
path=get_relative_path(item, root),
name=item.name,
is_directory=item.is_dir(),
size_bytes=stat.st_size if item.is_file() else 0,
)
)
return SearchResult(
matches=matches,
total_matches=len(matches),
search_depth_reached=max_depth_reached,
truncated=False,
).model_dump()
@server.tool()
def search_by_content_text(
query: str,
path: str = ".",
recursive: bool = True,
case_sensitive: bool = False,
max_results: int = 100,
ctx: Context = None,
) -> dict:
"""
Search for files containing specific text.
Only searches text files (binary files are skipped).
Args:
query: Text to search for
path: Starting directory for search (default: root)
recursive: Search subdirectories recursively (default: True)
case_sensitive: Perform case-sensitive matching (default: False)
max_results: Maximum number of results to return (default: 100)
ctx: MCP context with session config
Returns:
Dictionary with search results
"""
config = ctx.session_config
root = DEFAULT_ROOT
max_depth = config.max_search_depth if recursive else 0
max_file_size = config.max_file_size_mb * 1024 * 1024
# Validate path
search_path = validate_path(path, root)
if not search_path.exists():
raise FileNotFoundError(f"Search path not found: {path}")
# Prepare search query
search_query = query if case_sensitive else query.lower()
matches: List[SearchMatch] = []
max_depth_reached = 0
truncated = False
# Walk directory
for item, depth in _walk_directory(search_path, root, max_depth):
if len(matches) >= max_results:
truncated = True
break
max_depth_reached = max(max_depth_reached, depth)
# Only search files
if not item.is_file():
continue
# Skip files that are too large
if item.stat().st_size > max_file_size:
continue
try:
# Read file and check if binary
with open(item, "rb") as f:
data = f.read()
if is_binary_file(data):
continue
# Decode as text
content = data.decode("utf-8", errors="ignore")
# Search for query
search_content = content if case_sensitive else content.lower()
if search_query in search_content:
# Find first occurrence for context
index = search_content.index(search_query)
# Get line number
line_num = content[:index].count("\n") + 1
# Get context (50 chars before and after)
context_start = max(0, index - 50)
context_end = min(len(content), index + len(query) + 50)
context = content[context_start:context_end].replace("\n", " ")
matches.append(
SearchMatch(
path=get_relative_path(item, root),
name=item.name,
is_directory=False,
size_bytes=item.stat().st_size,
match_context=f"...{context}...",
line_number=line_num,
)
)
except (PermissionError, UnicodeDecodeError, OSError):
# Skip files we can't read
continue
return SearchResult(
matches=matches,
total_matches=len(matches),
search_depth_reached=max_depth_reached,
truncated=truncated,
).model_dump()
@server.tool()
def search_by_content_regex(
regex_pattern: str,
path: str = ".",
recursive: bool = True,
case_sensitive: bool = False,
max_results: int = 100,
ctx: Context = None,
) -> dict:
"""
Search for files containing text matching a regex pattern.
Only searches text files (binary files are skipped).
Args:
regex_pattern: Regular expression pattern
path: Starting directory for search (default: root)
recursive: Search subdirectories recursively (default: True)
case_sensitive: Perform case-sensitive matching (default: False)
max_results: Maximum number of results to return (default: 100)
ctx: MCP context with session config
Returns:
Dictionary with search results
"""
config = ctx.session_config
root = DEFAULT_ROOT
max_depth = config.max_search_depth if recursive else 0
max_file_size = config.max_file_size_mb * 1024 * 1024
# Validate path
search_path = validate_path(path, root)
if not search_path.exists():
raise FileNotFoundError(f"Search path not found: {path}")
# Compile regex
flags = 0 if case_sensitive else re.IGNORECASE
try:
pattern = re.compile(regex_pattern, flags)
except re.error as e:
raise ValueError(f"Invalid regex pattern: {e}")
matches: List[SearchMatch] = []
max_depth_reached = 0
truncated = False
# Walk directory
for item, depth in _walk_directory(search_path, root, max_depth):
if len(matches) >= max_results:
truncated = True
break
max_depth_reached = max(max_depth_reached, depth)
# Only search files
if not item.is_file():
continue
# Skip files that are too large
if item.stat().st_size > max_file_size:
continue
try:
# Read file and check if binary
with open(item, "rb") as f:
data = f.read()
if is_binary_file(data):
continue
# Decode as text
content = data.decode("utf-8", errors="ignore")
# Search for pattern
match = pattern.search(content)
if match:
# Get line number
index = match.start()
line_num = content[:index].count("\n") + 1
# Get context (50 chars before and after)
context_start = max(0, index - 50)
context_end = min(len(content), match.end() + 50)
context = content[context_start:context_end].replace("\n", " ")
matches.append(
SearchMatch(
path=get_relative_path(item, root),
name=item.name,
is_directory=False,
size_bytes=item.stat().st_size,
match_context=f"...{context}...",
line_number=line_num,
)
)
except (PermissionError, UnicodeDecodeError, OSError):
# Skip files we can't read
continue
return SearchResult(
matches=matches,
total_matches=len(matches),
search_depth_reached=max_depth_reached,
truncated=truncated,
).model_dump()