Skip to main content
Glama
server.py15.1 kB
"""MCP Server implementation for grep functionality using system grep binary.""" from pathlib import Path import json import subprocess import shutil import os import fnmatch from typing import Dict, List, Optional, Union, Any from mcp.server.fastmcp import FastMCP from mcp_grep.core import MCPGrep # Create an MCP server mcp = FastMCP("grep-server") def get_grep_info() -> Dict[str, Optional[str]]: """Get information about the system grep binary.""" info = { "path": None, "version": None, "supports_pcre": False, "supports_color": False } # Find grep path grep_path = shutil.which("grep") if grep_path: info["path"] = grep_path # Get version try: version_output = subprocess.check_output([grep_path, "--version"], text=True) info["version"] = version_output.split("\n")[0].strip() # Check for PCRE support try: subprocess.check_output([grep_path, "--perl-regexp", "test", "-"], input="test", text=True, stderr=subprocess.DEVNULL) info["supports_pcre"] = True except subprocess.CalledProcessError: pass # Check for color support try: subprocess.check_output([grep_path, "--color=auto", "test", "-"], input="test", text=True) info["supports_color"] = True except subprocess.CalledProcessError: pass except subprocess.CalledProcessError: pass return info # Register grep info as a resource @mcp.resource("grep://info") def grep_info() -> str: """Resource providing information about the grep binary.""" return json.dumps(get_grep_info(), indent=2) def _format_results(results: List[Dict[str, Any]], count: int) -> Dict: """Format grep results for the MCP response.""" # Truncate results if there are too many matches to avoid response size issues MAX_RESULTS = 50 if len(results) > MAX_RESULTS: truncated_results = results[:MAX_RESULTS] truncated_message = f"Found {count} matches, showing first {MAX_RESULTS}." results_json = json.dumps(truncated_results, indent=2) return { "content": [ { "type": "text", "text": truncated_message + "\n\n" + results_json } ], "isError": False } else: results_json = json.dumps(results, indent=2) return { "content": [ { "type": "text", "text": results_json } ], "isError": False } @mcp.tool() def grep( pattern: str, paths: Union[str, List[str]], ignore_case: bool = False, before_context: int = 0, after_context: int = 0, context: Optional[int] = None, max_count: int = 0, fixed_strings: bool = False, recursive: bool = False, regexp: bool = True, invert_match: bool = False, line_number: bool = True, file_pattern: Optional[str] = None ) -> Dict: """Search for pattern in files using system grep. Args: pattern: Pattern to search for paths: File or directory paths to search in (string or list of strings) ignore_case: Case-insensitive matching (-i) before_context: Number of lines before match (-B) after_context: Number of lines after match (-A) context: Number of context lines around match (equal before/after) max_count: Stop after N matches (-m) fixed_strings: Treat pattern as literal text, not regex (-F) recursive: Search directories recursively (-r) regexp: Use regular expressions for pattern matching invert_match: Select non-matching lines (-v) line_number: Show line numbers (-n) file_pattern: Pattern to filter files (e.g., "*.txt") Returns: JSON string with search results """ try: # Convert single path to list and expand user paths if isinstance(paths, str): paths = [os.path.expanduser(paths)] else: paths = [os.path.expanduser(p) for p in paths] # Use our MCPGrep implementation for more consistent and flexible searching grep_tool = MCPGrep( pattern=pattern, ignore_case=ignore_case, fixed_strings=fixed_strings, regexp=regexp, invert_match=invert_match, line_number=line_number, before_context=before_context, after_context=after_context, context=context, max_count=max_count ) # Search for matches results = [] match_count = 0 # If any path contains a wildcard, handle it at the paths level wildcarded_paths = [] standard_paths = [] for path in paths: if "*" in path or "?" in path: wildcarded_paths.append(path) else: standard_paths.append(path) # Process standard paths if standard_paths: try: for result in grep_tool.search_files(standard_paths, recursive, file_pattern): results.append(result) match_count += 1 if max_count > 0 and match_count >= max_count: break except Exception as e: return { "content": [ { "type": "text", "text": f"Error searching files: {str(e)}" } ], "isError": True } # Process wildcard paths if wildcarded_paths and match_count < (max_count if max_count > 0 else float('inf')): # For each wildcarded path for wild_path in wildcarded_paths: # Split into directory and pattern dir_path = os.path.dirname(wild_path) or "." base_pattern = os.path.basename(wild_path) # Find all matching files in the directory try: dir_obj = Path(dir_path) if dir_obj.exists() and dir_obj.is_dir(): matching_files = [] # Gather files recursively if needed if recursive: for root, _, files in os.walk(dir_path): for file in files: if fnmatch.fnmatch(file, base_pattern): # Also check file_pattern if specified if file_pattern and not fnmatch.fnmatch(file, file_pattern): continue matching_files.append(os.path.join(root, file)) else: # Non-recursive search for item in dir_obj.iterdir(): if item.is_file() and fnmatch.fnmatch(item.name, base_pattern): # Also check file_pattern if specified if file_pattern and not fnmatch.fnmatch(item.name, file_pattern): continue matching_files.append(str(item)) # Search in the matching files for file_path in matching_files: try: for result in grep_tool.search_file(file_path): results.append(result) match_count += 1 if max_count > 0 and match_count >= max_count: break if max_count > 0 and match_count >= max_count: break except Exception as e: print(f"Error searching {file_path}: {e}") if max_count > 0 and match_count >= max_count: break else: print(f"Directory not found: {dir_path}") except Exception as e: print(f"Error processing wildcard path {wild_path}: {e}") # No results case if not results: return { "content": [ { "type": "text", "text": "No matches found" } ], "isError": False } # Return the formatted results return _format_results(results, match_count) except Exception as e: return { "content": [ { "type": "text", "text": f"Error executing grep: {str(e)}" } ], "isError": True } def parse_grep_query(query: str) -> Dict: """Parse a natural language query for grep operations. Args: query: Natural language query for grep operations Returns: Dictionary with extracted parameters for the grep tool """ # Default values params = { "pattern": "", "paths": ["."], # Default to current directory "ignore_case": False, "before_context": 0, "after_context": 0, "context": None, "max_count": 0, "fixed_strings": False, "recursive": False, "regexp": True, "invert_match": False, "line_number": True, "file_pattern": None } # Extract pattern from quotation marks if present import re pattern_match = re.search(r"['\"](.*?)['\"]", query) if pattern_match: params["pattern"] = pattern_match.group(1) # Detect case insensitivity if re.search(r"case\s*insensitive|ignor.*case|regardless\s*of\s*case", query, re.IGNORECASE): params["ignore_case"] = True # Detect context lines context_match = re.search(r"(\d+)\s*lines\s*(before|after|of\s*context|context)", query) if context_match: context_num = int(context_match.group(1)) context_type = context_match.group(2).lower() if "before" in context_type: params["before_context"] = context_num elif "after" in context_type: params["after_context"] = context_num else: # General context params["context"] = context_num # Detect both before and after context both_context_match = re.search(r"(\d+)\s*lines\s*before\s*and\s*(\d+)\s*lines\s*after", query) if both_context_match: params["before_context"] = int(both_context_match.group(1)) params["after_context"] = int(both_context_match.group(2)) # Look for equal context on both sides equal_context_match = re.search(r"show\s*(\d+)\s*lines\s*before\s*and\s*after", query) if equal_context_match: params["context"] = int(equal_context_match.group(1)) # Detect recursive search if re.search(r"recursiv|subdirector|all\s*files|and\s*its\s*subdirectories", query, re.IGNORECASE): params["recursive"] = True # Detect fixed string (exact match) if re.search(r"exact\s*string|exact\s*text|literal|fixed\s*string", query, re.IGNORECASE): params["fixed_strings"] = True # Detect max count max_count_match = re.search(r"(first|only|just|limit\s*to)\s*(\d+)", query, re.IGNORECASE) if max_count_match: params["max_count"] = int(max_count_match.group(2)) # Detect regex flag if re.search(r"regex|regular\s*expression", query, re.IGNORECASE): params["regexp"] = True # Detect invert match if re.search(r"don't\s*contain|doesn't\s*contain|not\s*contain|invert\s*match|lines\s*that\s*don't", query, re.IGNORECASE): params["invert_match"] = True # Detect line number display if re.search(r"without\s*line\s*numbers|no\s*line\s*numbers", query, re.IGNORECASE): params["line_number"] = False # Detect file pattern file_pattern_match = re.search(r"in\s*all\s*([*\.a-zA-Z0-9]+)\s*files", query, re.IGNORECASE) if file_pattern_match: params["file_pattern"] = file_pattern_match.group(1) if not params["file_pattern"].startswith("*"): params["file_pattern"] = f"*.{params['file_pattern']}" # Multiple file extensions multi_ext_match = re.search(r"in\s*all\s*([a-zA-Z0-9]+)\s*and\s*([a-zA-Z0-9]+)\s*files", query, re.IGNORECASE) if multi_ext_match: ext1 = multi_ext_match.group(1) ext2 = multi_ext_match.group(2) # Create a pattern for multiple extensions params["paths"] = [f"*.{ext1}", f"*.{ext2}"] params["recursive"] = True # Extract the file path(s) file_match = re.search(r"in\s*([a-zA-Z0-9_\-./\\]+\.[a-zA-Z0-9]+)", query) if file_match: params["paths"] = [file_match.group(1)] # Extract directory dir_match = re.search(r"in\s*the\s*([a-zA-Z0-9_\-./\\]+)\s*directory", query) if dir_match: params["paths"] = [dir_match.group(1)] # If searching in a directory and recursive not explicitly specified, default to recursive if not re.search(r"recursiv|subdirector", query, re.IGNORECASE): params["recursive"] = True # If no pattern found in quotes, try to extract it from the query if not params["pattern"]: # Look for common patterns like "find X in Y" or "search for X in Y" find_pattern = re.search(r"(?:find|search\s*for|looking\s*for)\s*['\"]*([^'\"]+?)['\"]*\s*(?:in|across)", query, re.IGNORECASE) if find_pattern: params["pattern"] = find_pattern.group(1).strip() else: # Just grab the first word that isn't a common verb or preposition words = query.split() for word in words: if word.lower() not in ["search", "find", "for", "in", "grep", "look", "show", "get", "with", "and", "the"]: params["pattern"] = word break # Clean up pattern (remove unnecessary quotes) if params["pattern"].startswith(("'", '"')) and params["pattern"].endswith(("'", '"')): params["pattern"] = params["pattern"][1:-1] return params if __name__ == "__main__": # Run the server with stdio transport for MCP mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/erniebrodeur/mcp-grep'

If you have feedback or need assistance with the MCP directory API, please join our Discord server