import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import re
from dataclasses import dataclass
import fnmatch
from .base import BaseTool
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Container for search results"""
path: str
match_type: str # file, content, pattern
line_number: Optional[int] = None
content: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class PathFinder(BaseTool):
"""Advanced file and directory search tool"""
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
search_path = arguments.get('path', '.')
operation = arguments.get('operation', 'find')
operations = {
'find': self._find_files,
'glob': self._glob_search,
'pattern': self._pattern_search,
'recent': self._find_recent
}
if operation not in operations:
return {"error": f"Unknown operation: {operation}"}
try:
result = await operations[operation](Path(search_path), arguments)
return {"success": True, "data": result}
except Exception as e:
logger.error(f"PathFinder operation failed: {e}")
return {"success": False, "error": str(e)}
async def _find_files(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find files based on criteria"""
filters = args.get('filters', {})
max_depth = args.get('max_depth', None)
exclude_patterns = set(args.get('exclude', []))
results = []
total_scanned = 0
try:
for root, dirs, files in self._walk_with_depth(path, max_depth):
# Apply directory exclusions
dirs[:] = [d for d in dirs if not any(
fnmatch.fnmatch(d, pattern) for pattern in exclude_patterns
)]
for file in files:
total_scanned += 1
file_path = Path(root) / file
if self._should_skip(file_path):
continue
if self._matches_filters(file_path, filters):
stat = file_path.stat()
results.append({
"path": str(file_path),
"name": file_path.name,
"extension": file_path.suffix,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"created": datetime.fromtimestamp(stat.st_ctime).isoformat()
})
return {
"results": results,
"summary": {
"total_found": len(results),
"total_scanned": total_scanned,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"File search failed: {e}")
async def _glob_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search using glob patterns"""
patterns = args.get('patterns', ['*'])
recursive = args.get('recursive', True)
results = []
total_matches = 0
try:
for pattern in patterns:
if recursive:
matches = path.rglob(pattern)
else:
matches = path.glob(pattern)
for match in matches:
if self._should_skip(match):
continue
stat = match.stat()
results.append({
"path": str(match),
"pattern": pattern,
"type": "directory" if match.is_dir() else "file",
"size": stat.st_size if match.is_file() else None,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
total_matches += 1
return {
"results": results,
"summary": {
"patterns": patterns,
"total_matches": total_matches,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Glob search failed: {e}")
async def _pattern_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search for files matching complex patterns"""
pattern_rules = args.get('rules', {})
max_results = args.get('max_results', None)
results = []
try:
for file_path in self._recursive_search(path):
if self._should_skip(file_path):
continue
if self._matches_pattern_rules(file_path, pattern_rules):
stat = file_path.stat()
results.append({
"path": str(file_path),
"name": file_path.name,
"matches": self._get_matching_rules(file_path, pattern_rules),
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
if max_results and len(results) >= max_results:
break
return {
"results": results,
"summary": {
"total_matches": len(results),
"rules_applied": list(pattern_rules.keys()),
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Pattern search failed: {e}")
async def _find_recent(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find recently modified files"""
hours = args.get('hours', 24)
file_types = set(args.get('file_types', []))
min_size = args.get('min_size', 0)
max_size = args.get('max_size', float('inf'))
results = []
cutoff_time = datetime.now().timestamp() - (hours * 3600)
try:
for file_path in self._recursive_search(path):
if self._should_skip(file_path):
continue
stat = file_path.stat()
if stat.st_mtime >= cutoff_time:
if not file_types or file_path.suffix in file_types:
if min_size <= stat.st_size <= max_size:
results.append({
"path": str(file_path),
"name": file_path.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"hours_ago": (datetime.now().timestamp() - stat.st_mtime) / 3600
})
# Sort by modification time
results.sort(key=lambda x: x["modified"], reverse=True)
return {
"results": results,
"summary": {
"total_found": len(results),
"time_range_hours": hours,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Recent files search failed: {e}")
def _walk_with_depth(self, path: Path, max_depth: Optional[int] = None):
"""Walk directory tree with optional depth limit"""
base_depth = len(path.parents)
for root, dirs, files in path.walk():
current_depth = len(Path(root).parents) - base_depth
if max_depth is not None and current_depth > max_depth:
dirs.clear()
else:
yield root, dirs, files
def _matches_filters(self, path: Path, filters: Dict[str, Any]) -> bool:
"""Check if file matches all filters"""
try:
stat = path.stat()
for key, value in filters.items():
if key == 'extension' and path.suffix != value:
return False
elif key == 'name' and path.name != value:
return False
elif key == 'min_size' and stat.st_size < value:
return False
elif key == 'max_size' and stat.st_size > value:
return False
elif key == 'modified_after' and stat.st_mtime < value:
return False
elif key == 'modified_before' and stat.st_mtime > value:
return False
return True
except Exception:
return False
def _matches_pattern_rules(self, path: Path, rules: Dict[str, Any]) -> bool:
"""Check if file matches pattern rules"""
try:
for rule_type, pattern in rules.items():
if rule_type == 'name_pattern':
if not fnmatch.fnmatch(path.name, pattern):
return False
elif rule_type == 'path_pattern':
if not fnmatch.fnmatch(str(path), pattern):
return False
elif rule_type == 'regex':
if not re.search(pattern, str(path)):
return False
return True
except Exception:
return False
def _get_matching_rules(self, path: Path, rules: Dict[str, Any]) -> List[str]:
"""Get list of matching rules for a file"""
matches = []
for rule_type, pattern in rules.items():
if rule_type == 'name_pattern' and fnmatch.fnmatch(path.name, pattern):
matches.append(rule_type)
elif rule_type == 'path_pattern' and fnmatch.fnmatch(str(path), pattern):
matches.append(rule_type)
elif rule_type == 'regex' and re.search(pattern, str(path)):
matches.append(rule_type)
return matches
def _recursive_search(self, path: Path) -> List[Path]:
"""Recursively search directory"""
try:
return list(path.rglob('*'))
except Exception:
return []
class ContentScanner(BaseTool):
"""Advanced content search and analysis tool"""
def __init__(self):
super().__init__()
self._file_cache = {}
self.max_workers = 4
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
operation = arguments.get('operation', 'search')
target_path = arguments.get('path', '.')
operations = {
'search': self._search_content,
'analyze': self._analyze_content,
'regex': self._regex_search,
'similar': self._find_similar
}
if operation not in operations:
return {"error": f"Unknown operation: {operation}"}
try:
result = await operations[operation](Path(target_path), arguments)
return {"success": True, "data": result}
except Exception as e:
logger.error(f"ContentScanner operation failed: {e}")
return {"success": False, "error": str(e)}
async def _search_content(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search file contents for text"""
search_text = args.get('text')
case_sensitive = args.get('case_sensitive', False)
whole_word = args.get('whole_word', False)
file_pattern = args.get('file_pattern', '*')
if not search_text:
return {"error": "Search text is required"}
results = []
total_files = 0
matches_found = 0
try:
# Prepare search pattern
if whole_word:
pattern = r'\b' + re.escape(search_text) + r'\b'
else:
pattern = re.escape(search_text)
flags = 0 if case_sensitive else re.IGNORECASE
regex = re.compile(pattern, flags)
# Search files
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
total_files += 1
try:
matches = await self._find_matches(file_path, regex)
if matches:
matches_found += len(matches)
results.append({
"file": str(file_path),
"matches": matches
})
except Exception as e:
logger.error(f"Error searching {file_path}: {e}")
return {
"results": results,
"summary": {
"total_files_searched": total_files,
"files_with_matches": len(results),
"total_matches": matches_found,
"search_pattern": {
"text": search_text,
"case_sensitive": case_sensitive,
"whole_word": whole_word
}
}
}
except Exception as e:
raise RuntimeError(f"Content search failed: {e}")
async def _analyze_content(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze file contents"""
file_pattern = args.get('file_pattern', '*')
analysis_types = set(args.get('types', ['duplicates', 'statistics', 'patterns']))
try:
analysis_results = {
"files_analyzed": 0,
"total_size": 0,
"analysis": {}
}
if 'duplicates' in analysis_types:
analysis_results["analysis"]["duplicates"] = await self._find_duplicate_content(path, file_pattern)
if 'statistics' in analysis_types:
analysis_results["analysis"]["statistics"] = await self._generate_content_statistics(path, file_pattern)
if 'patterns' in analysis_types:
analysis_results["analysis"]["patterns"] = await self._analyze_content_patterns(path, file_pattern)
return analysis_results
except Exception as e:
raise RuntimeError(f"Content analysis failed: {e}")
async def _regex_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search using regular expressions"""
pattern = args.get('pattern')
file_pattern = args.get('file_pattern', '*')
multiline = args.get('multiline', False)
if not pattern:
return {"error": "Regex pattern is required"}
try:
flags = re.MULTILINE if multiline else 0
regex = re.compile(pattern, flags)
results = []
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
matches = await self._find_matches(file_path, regex)
if matches:
results.append({
"file": str(file_path),
"matches": matches
})
return {
"results": results,
"summary": {
"total_files_searched": len(list(path.rglob(file_pattern))),
"files_with_matches": len(results),
"pattern": pattern,
"multiline": multiline
}
}
except Exception as e:
raise RuntimeError(f"Regex search failed: {e}")
async def _find_similar(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find files with similar content"""
threshold = args.get('similarity_threshold', 0.8)
file_pattern = args.get('file_pattern', '*')
min_size = args.get('min_size', 0)
try:
file_groups = []
content_hashes = {}
# First pass: collect file contents
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
if file_path.stat().st_size < min_size:
continue
try:
content = await self._read_file_content(file_path)
if content:
content_hashes[str(file_path)] = self._calculate_similarity_hash(content)
except Exception as e:
logger.error(f"Error reading {file_path}: {e}")
# Second pass: compare files
analyzed_files = set()
for file1, hash1 in content_hashes.items():
if file1 in analyzed_files:
continue
similar_files = []
for file2, hash2 in content_hashes.items():
if file1 != file2 and file2 not in analyzed_files:
similarity = self._calculate_hash_similarity(hash1, hash2)
if similarity >= threshold:
similar_files.append({
"path": file2,
"similarity": similarity
})
analyzed_files.add(file2)
if similar_files:
analyzed_files.add(file1)
file_groups.append({
"base_file": file1,
"similar_files": similar_files
})
return {
"groups": file_groups,
"summary": {
"total_files": len(content_hashes),
"similarity_groups": len(file_groups),
"threshold": threshold
}
}
except Exception as e:
raise RuntimeError(f"Similarity analysis failed: {e}")
async def _find_matches(self, file_path: Path, pattern: re.Pattern) -> List[Dict[str, Any]]:
"""Find pattern matches in file"""
matches = []
try:
content = await self._read_file_content(file_path)
if not content:
return matches
for i, line in enumerate(content.splitlines(), 1):
for match in pattern.finditer(line):
matches.append({
"line": i,
"start": match.start(),
"end": match.end(),
"text": match.group(),
"context": self._get_line_context(content.splitlines(), i)
})
except Exception as e:
logger.error(f"Error finding matches in {file_path}: {e}")
return matches
async def _find_duplicate_content(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Find duplicate content across files"""
content_map = {}
duplicates = []
try:
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
content = await self._read_file_content(file_path)
if not content:
continue
content_hash = self._calculate_content_hash(content)
if content_hash in content_map:
# Found a duplicate
if content_map[content_hash] not in duplicates:
duplicates.append({
"original": content_map[content_hash],
"duplicates": []
})
for group in duplicates:
if group["original"] == content_map[content_hash]:
group["duplicates"].append(str(file_path))
else:
content_map[content_hash] = str(file_path)
return {
"duplicate_groups": duplicates,
"total_duplicates": sum(len(group["duplicates"]) for group in duplicates)
}
except Exception as e:
logger.error(f"Error finding duplicates: {e}")
return {"error": str(e)}
async def _generate_content_statistics(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Generate statistics about file contents"""
stats = {
"total_files": 0,
"total_lines": 0,
"total_size": 0,
"average_line_length": 0,
"file_types": {},
"encoding_types": {},
"line_endings": {
"unix": 0,
"windows": 0,
"mixed": 0
}
}
try:
line_lengths = []
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
stats["total_files"] += 1
stats["total_size"] += file_path.stat().st_size
# Track file types
ext = file_path.suffix
stats["file_types"][ext] = stats["file_types"].get(ext, 0) + 1
content = await self._read_file_content(file_path)
if not content:
continue
lines = content.splitlines()
stats["total_lines"] += len(lines)
line_lengths.extend(len(line) for line in lines)
# Detect line endings
if '\r\n' in content and '\n' in content.replace('\r\n', ''):
stats["line_endings"]["mixed"] += 1
elif '\r\n' in content:
stats["line_endings"]["windows"] += 1
else:
stats["line_endings"]["unix"] += 1
# Track encoding
encoding = self._detect_encoding(file_path)
stats["encoding_types"][encoding] = stats["encoding_types"].get(encoding, 0) + 1
if line_lengths:
stats["average_line_length"] = sum(line_lengths) / len(line_lengths)
return stats
except Exception as e:
logger.error(f"Error generating statistics: {e}")
return {"error": str(e)}
async def _analyze_content_patterns(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Analyze content for common patterns"""
patterns = {
"common_words": {},
"line_patterns": [],
"structure_patterns": []
}
try:
word_freq = {}
line_patterns = set()
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
content = await self._read_file_content(file_path)
if not content:
continue
# Analyze words
words = re.findall(r'\w+', content.lower())
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
# Analyze line patterns
lines = content.splitlines()
for line in lines:
# Find repeating patterns
pattern_match = re.match(r'^(\s*)(.+?)(\s*)$', line)
if pattern_match:
indent, content, trailing = pattern_match.groups()
if len(indent) > 0:
line_patterns.add(f"indent:{len(indent)}")
# Analyze structure patterns
if file_path.suffix == '.py':
await self._analyze_python_patterns(content, patterns)
# Process word frequencies
patterns["common_words"] = dict(sorted(
word_freq.items(),
key=lambda x: x[1],
reverse=True
)[:100])
patterns["line_patterns"] = list(line_patterns)
return patterns
except Exception as e:
logger.error(f"Error analyzing patterns: {e}")
return {"error": str(e)}
async def _read_file_content(self, path: Path) -> Optional[str]:
"""Read file content with caching"""
if str(path) in self._file_cache:
return self._file_cache[str(path)]
try:
content = path.read_text(encoding=self._detect_encoding(path))
self._file_cache[str(path)] = content
return content
except Exception as e:
logger.error(f"Error reading {path}: {e}")
return None
def _detect_encoding(self, path: Path) -> str:
"""Detect file encoding"""
try:
import chardet
with open(path, 'rb') as f:
raw = f.read()
result = chardet.detect(raw)
return result['encoding'] or 'utf-8'
except Exception:
return 'utf-8'
def _calculate_content_hash(self, content: str) -> str:
"""Calculate hash of content"""
import hashlib
return hashlib.md5(content.encode()).hexdigest()
def _calculate_similarity_hash(self, content: str) -> List[int]:
"""Calculate similarity hash for content"""
# Simplified implementation of similarity hashing
words = content.split()
return [hash(word) for word in words]
def _calculate_hash_similarity(self, hash1: List[int], hash2: List[int]) -> float:
"""Calculate similarity between two hashes"""
common = set(hash1) & set(hash2)
return len(common) / max(len(hash1), len(hash2))
def _get_line_context(self, lines: List[str], line_number: int, context_lines: int = 2) -> Dict[str, List[str]]:
"""Get context lines around a match"""
start = max(0, line_number - context_lines - 1)
end = min(len(lines), line_number + context_lines)
return {
"before": lines[start:line_number-1],
"after": lines[line_number:end]
}
async def _analyze_python_patterns(self, content: str, patterns: Dict[str, Any]) -> None:
"""Analyze Python-specific patterns"""
import ast
try:
tree = ast.parse(content)
# Analyze structure patterns
class_patterns = []
function_patterns = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
methods = len([n for n in node.body if isinstance(n, ast.FunctionDef)])
class_patterns.append(f"class_with_{methods}_methods")
elif isinstance(node, ast.FunctionDef):
args = len(node.args.args)
function_patterns.append(f"function_with_{args}_args")
if class_patterns:
patterns["structure_patterns"].extend(class_patterns)
if function_patterns:
patterns["structure_patterns"].extend(function_patterns)
except Exception as e:
logger.error(f"Error analyzing Python patterns: {e}")