Skip to main content
Glama
wrale

mcp-server-tree-sitter

by wrale

run_query

Execute tree-sitter queries on project files to analyze and extract code patterns. Specify project, query, and optional file path or language for targeted results. Returns a list of matches for efficient code analysis.

Instructions

Run a tree-sitter query on project files.

    Args:
        project: Project name
        query: Tree-sitter query string
        file_path: Optional specific file to query
        language: Language to use (required if file_path not provided)
        max_results: Maximum number of results

    Returns:
        List of query matches
    

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathNo
languageNo
max_resultsNo
projectYes
queryYes

Implementation Reference

  • Handler for the 'run_query' MCP tool. Registers the tool and provides the entry point that delegates to the query_code helper function.
    @mcp_server.tool()
    def run_query(
        project: str,
        query: str,
        file_path: Optional[str] = None,
        language: Optional[str] = None,
        max_results: int = 100,
    ) -> List[Dict[str, Any]]:
        """Run a tree-sitter query on project files.
    
        Args:
            project: Project name
            query: Tree-sitter query string
            file_path: Optional specific file to query
            language: Language to use (required if file_path not provided)
            max_results: Maximum number of results
    
        Returns:
            List of query matches
        """
        from ..tools.search import query_code
    
        config = config_manager.get_config()
    
        return query_code(
            project_registry.get_project(project),
            query,
            language_registry,
            tree_cache,
            file_path,
            language,
            max_results if max_results is not None else config.max_results_default,
        )
  • Core helper function 'query_code' that implements the tree-sitter query execution logic: file parsing, caching, query matching, and result formatting with positions and snippets.
    def query_code(
        project: Any,
        query_string: str,
        language_registry: Any,
        tree_cache: Any,
        file_path: Optional[str] = None,
        language: Optional[str] = None,
        max_results: int = 100,
        include_snippets: bool = True,
    ) -> List[Dict[str, Any]]:
        """
        Run a tree-sitter query on code files.
    
        Args:
            project: Project object
            query_string: Tree-sitter query string
            language_registry: Language registry
            tree_cache: Tree cache instance
            file_path: Optional specific file to query
            language: Language to use (required if file_path not provided)
            max_results: Maximum number of results to return
            include_snippets: Whether to include code snippets in results
    
        Returns:
            List of query matches
        """
        root = project.root_path
        results: List[Dict[str, Any]] = []
    
        if file_path is not None:
            # Query a specific file
            abs_path = project.get_file_path(file_path)
    
            try:
                validate_file_access(abs_path, root)
            except SecurityError as e:
                raise SecurityError(f"Access denied: {e}") from e
    
            # Detect language if not provided
            if not language:
                detected_language = language_registry.language_for_file(file_path)
                if detected_language:
                    language = detected_language
                if not language:
                    raise QueryError(f"Could not detect language for {file_path}")
    
            try:
                # Check if we have a cached tree
                assert language is not None  # For type checking
                cached = tree_cache.get(abs_path, language)
                if cached:
                    tree, source_bytes = cached
                else:
                    # Parse file
                    with open(abs_path, "rb") as f:
                        source_bytes = f.read()
    
                    parser = language_registry.get_parser(language)
                    tree = parser.parse(source_bytes)
    
                    # Cache the tree
                    tree_cache.put(abs_path, language, tree, source_bytes)
    
                # Execute query
                lang = language_registry.get_language(language)
                query = lang.query(query_string)
    
                captures = query.captures(tree.root_node)
    
                # Handle different return formats from query.captures()
                if isinstance(captures, dict):
                    # Dictionary format: {capture_name: [node1, node2, ...], ...}
                    for capture_name, nodes in captures.items():
                        for node in nodes:
                            # Skip if we've reached max results
                            if max_results is not None and len(results) >= max_results:
                                break
    
                            try:
                                from ..utils.tree_sitter_helpers import get_node_text
    
                                text = get_node_text(node, source_bytes, decode=True)
                            except Exception:
                                text = "<binary data>"
    
                            result = {
                                "file": file_path,
                                "capture": capture_name,
                                "start": {
                                    "row": node.start_point[0],
                                    "column": node.start_point[1],
                                },
                                "end": {
                                    "row": node.end_point[0],
                                    "column": node.end_point[1],
                                },
                            }
    
                            if include_snippets:
                                result["text"] = text
    
                            results.append(result)
                else:
                    # List format: [(node1, capture_name1), (node2, capture_name2), ...]
                    for match in captures:
                        # Handle different return types from query.captures()
                        if isinstance(match, tuple) and len(match) == 2:
                            # Direct tuple unpacking
                            node, capture_name = match
                        elif hasattr(match, "node") and hasattr(match, "capture_name"):
                            # Object with node and capture_name attributes
                            node, capture_name = match.node, match.capture_name
                        elif isinstance(match, dict) and "node" in match and "capture" in match:
                            # Dictionary with node and capture keys
                            node, capture_name = match["node"], match["capture"]
                        else:
                            # Skip if format is unknown
                            continue
    
                        # Skip if we've reached max results
                        if max_results is not None and len(results) >= max_results:
                            break
    
                        try:
                            from ..utils.tree_sitter_helpers import get_node_text
    
                            text = get_node_text(node, source_bytes, decode=True)
                        except Exception:
                            text = "<binary data>"
    
                        result = {
                            "file": file_path,
                            "capture": capture_name,
                            "start": {
                                "row": node.start_point[0],
                                "column": node.start_point[1],
                            },
                            "end": {"row": node.end_point[0], "column": node.end_point[1]},
                        }
    
                        if include_snippets:
                            result["text"] = text
    
                        results.append(result)
            except Exception as e:
                raise QueryError(f"Error querying {file_path}: {e}") from e
        else:
            # Query across multiple files
            if not language:
                raise QueryError("Language is required when file_path is not provided")
    
            # Find all matching files for the language
            extensions = [(ext, lang) for ext, lang in language_registry._language_map.items() if lang == language]
    
            if not extensions:
                raise QueryError(f"No file extensions found for language {language}")
    
            # Process files in parallel
            def process_file(rel_path: str) -> List[Dict[str, Any]]:
                try:
                    # Use single-file version of query_code
                    file_results = query_code(
                        project,
                        query_string,
                        language_registry,
                        tree_cache,
                        rel_path,
                        language,
                        max_results if max_results is None else max_results - len(results),
                        include_snippets,
                    )
                    return file_results
                except Exception:
                    # Skip files that can't be queried
                    return []
    
            # Collect files to process
            files_to_process = []
            for ext, _ in extensions:
                for path in root.glob(f"**/*.{ext}"):
                    if path.is_file():
                        files_to_process.append(str(path.relative_to(root)))
    
            # Process files until we reach max_results
            for file in files_to_process:
                try:
                    file_results = process_file(file)
                    results.extend(file_results)
    
                    if max_results is not None and len(results) >= max_results:
                        break
                except Exception:
                    # Skip files that cause errors
                    continue
    
        return results[:max_results] if max_results is not None else results
  • The call to register_tools in the main server setup, which registers all tools including 'run_query'.
    from .tools.registration import register_tools
    
    register_capabilities(mcp)
    register_tools(mcp, container)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wrale/mcp-server-tree-sitter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server