Skip to main content
Glama

nixos_search

Search NixOS packages, options, programs, or flakes to find accurate system configuration resources and prevent AI hallucinations about NixOS.

Instructions

Search NixOS packages, options, or programs.

Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")

Returns: Plain text results with bullet points or error message

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
search_typeNopackages
limitNo
channelNounstable

Implementation Reference

  • Core handler function for the 'nixos_search' MCP tool. Performs searches across NixOS packages, options, programs, and flakes using dynamic channel discovery and Elasticsearch API.
    @mcp.tool()
    async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str:
        """Search NixOS packages, options, or programs.
    
        Args:
            query: Search term to look for
            search_type: Type of search - "packages", "options", "programs", or "flakes"
            limit: Maximum number of results to return (1-100)
            channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")
    
        Returns:
            Plain text results with bullet points or error message
        """
        if search_type not in ["packages", "options", "programs", "flakes"]:
            return error(f"Invalid type '{search_type}'")
        channels = get_channels()
        if channel not in channels:
            suggestions = get_channel_suggestions(channel)
            return error(f"Invalid channel '{channel}'. {suggestions}")
        if not 1 <= limit <= 100:
            return error("Limit must be 1-100")
    
        # Redirect flakes to dedicated function
        if search_type == "flakes":
            return await _nixos_flakes_search_impl(query, limit)
    
        try:
            # Build query with correct field names
            if search_type == "packages":
                q = {
                    "bool": {
                        "must": [{"term": {"type": "package"}}],
                        "should": [
                            {"match": {"package_pname": {"query": query, "boost": 3}}},
                            {"match": {"package_description": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
            elif search_type == "options":
                # Use wildcard for option names to handle hierarchical names like services.nginx.enable
                q = {
                    "bool": {
                        "must": [{"term": {"type": "option"}}],
                        "should": [
                            {"wildcard": {"option_name": f"*{query}*"}},
                            {"match": {"option_description": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
            else:  # programs
                q = {
                    "bool": {
                        "must": [{"term": {"type": "package"}}],
                        "should": [
                            {"match": {"package_programs": {"query": query, "boost": 2}}},
                            {"match": {"package_pname": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
    
            hits = es_query(channels[channel], q, limit)
    
            # Format results as plain text
            if not hits:
                return f"No {search_type} found matching '{query}'"
    
            results = []
            results.append(f"Found {len(hits)} {search_type} matching '{query}':\n")
    
            for hit in hits:
                src = hit.get("_source", {})
                if search_type == "packages":
                    name = src.get("package_pname", "")
                    version = src.get("package_pversion", "")
                    desc = src.get("package_description", "")
                    results.append(f"• {name} ({version})")
                    if desc:
                        results.append(f"  {desc}")
                    results.append("")
                elif search_type == "options":
                    name = src.get("option_name", "")
                    opt_type = src.get("option_type", "")
                    desc = src.get("option_description", "")
                    # Strip HTML tags from description
                    if desc and "<rendered-html>" in desc:
                        # Remove outer rendered-html tags
                        desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "")
                        # Remove common HTML tags
                        desc = re.sub(r"<[^>]+>", "", desc)
                        desc = desc.strip()
                    results.append(f"• {name}")
                    if opt_type:
                        results.append(f"  Type: {opt_type}")
                    if desc:
                        results.append(f"  {desc}")
                    results.append("")
                else:  # programs
                    programs = src.get("package_programs", [])
                    pkg_name = src.get("package_pname", "")
    
                    # Check if query matches any program exactly (case-insensitive)
                    query_lower = query.lower()
                    matched_programs = [p for p in programs if p.lower() == query_lower]
    
                    for prog in matched_programs:
                        results.append(f"• {prog} (provided by {pkg_name})")
                        results.append("")
    
            return "\n".join(results).strip()
    
        except Exception as e:
            return error(str(e))
  • ChannelCache class responsible for dynamically discovering available NixOS channels, resolving user-friendly names to Elasticsearch indices, and providing fallback mappings. Used extensively by nixos_search for channel validation and queries.
    class ChannelCache:
        """Cache for discovered channels and resolved mappings."""
    
        def __init__(self) -> None:
            """Initialize empty cache."""
            self.available_channels: dict[str, str] | None = None
            self.resolved_channels: dict[str, str] | None = None
            self.using_fallback: bool = False
    
        def get_available(self) -> dict[str, str]:
            """Get available channels, discovering if needed."""
            if self.available_channels is None:
                self.available_channels = self._discover_available_channels()
            return self.available_channels if self.available_channels is not None else {}
    
        def get_resolved(self) -> dict[str, str]:
            """Get resolved channel mappings, resolving if needed."""
            if self.resolved_channels is None:
                self.resolved_channels = self._resolve_channels()
            return self.resolved_channels if self.resolved_channels is not None else {}
    
        def _discover_available_channels(self) -> dict[str, str]:
            """Discover available NixOS channels by testing API patterns."""
            # Test multiple generation patterns (43, 44, 45) and versions
            generations = [43, 44, 45, 46]  # Future-proof
            # Removed deprecated versions (20.09, 24.11 - EOL June 2025)
            versions = ["unstable", "25.05", "25.11", "26.05", "30.05"]  # Current and future
    
            available = {}
            for gen in generations:
                for version in versions:
                    pattern = f"latest-{gen}-nixos-{version}"
                    try:
                        resp = requests.post(
                            f"{NIXOS_API}/{pattern}/_count",
                            json={"query": {"match_all": {}}},
                            auth=NIXOS_AUTH,
                            timeout=10,  # Increased from 5s to 10s for slow connections
                        )
                        if resp.status_code == 200:
                            count = resp.json().get("count", 0)
                            if count > 0:
                                available[pattern] = f"{count:,} documents"
                    except Exception:
                        continue
    
            return available
    
        def _resolve_channels(self) -> dict[str, str]:
            """Resolve user-friendly channel names to actual indices."""
            available = self.get_available()
    
            # If no channels were discovered, use fallback channels
            if not available:
                self.using_fallback = True
                return FALLBACK_CHANNELS.copy()
    
            resolved = {}
    
            # Find unstable (should be consistent)
            unstable_pattern = None
            for pattern in available:
                if "unstable" in pattern:
                    unstable_pattern = pattern
                    break
    
            if unstable_pattern:
                resolved["unstable"] = unstable_pattern
    
            # Find stable release (highest version number with most documents)
            stable_candidates = []
            for pattern, count_str in available.items():
                if "unstable" not in pattern:
                    # Extract version (e.g., "25.05" from "latest-43-nixos-25.05")
                    parts = pattern.split("-")
                    if len(parts) >= 4:
                        version = parts[3]  # "25.05"
                        try:
                            # Parse version for comparison (25.05 -> 25.05)
                            major, minor = map(int, version.split("."))
                            count = int(count_str.replace(",", "").replace(" documents", ""))
                            stable_candidates.append((major, minor, version, pattern, count))
                        except (ValueError, IndexError):
                            continue
    
            if stable_candidates:
                # Sort by version (descending), then by document count (descending) as tiebreaker
                stable_candidates.sort(key=lambda x: (x[0], x[1], x[4]), reverse=True)
                current_stable = stable_candidates[0]
    
                resolved["stable"] = current_stable[3]  # pattern
                resolved[current_stable[2]] = current_stable[3]  # version -> pattern
    
                # Add other version mappings (prefer higher generation/count for same version)
                version_patterns: dict[str, tuple[str, int]] = {}
                for _major, _minor, version, pattern, count in stable_candidates:
                    if version not in version_patterns or count > version_patterns[version][1]:
                        version_patterns[version] = (pattern, count)
    
                for version, (pattern, _count) in version_patterns.items():
                    resolved[version] = pattern
    
            # Add beta (alias for stable)
            if "stable" in resolved:
                resolved["beta"] = resolved["stable"]
    
            # If we still have no channels after all that, use fallback
            if not resolved:
                self.using_fallback = True
                return FALLBACK_CHANNELS.copy()
    
            return resolved
  • Core helper function that executes Elasticsearch queries to the NixOS search API. Called by nixos_search to fetch search results.
    def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]:
        """Execute Elasticsearch query."""
        try:
            resp = requests.post(
                f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10
            )
            resp.raise_for_status()
            data = resp.json()
            # Handle malformed responses gracefully
            if isinstance(data, dict) and "hits" in data:
                hits = data.get("hits", {})
                if isinstance(hits, dict) and "hits" in hits:
                    return list(hits.get("hits", []))
            return []
        except requests.Timeout as exc:
            raise APIError("API error: Connection timed out") from exc
        except requests.HTTPError as exc:
            raise APIError(f"API error: {str(exc)}") from exc
        except Exception as exc:
            raise APIError(f"API error: {str(exc)}") from exc
  • Helper function that provides channel suggestions for invalid channels in nixos_search error responses.
    def get_channel_suggestions(invalid_channel: str) -> str:
        """Get helpful suggestions for invalid channels."""
        channels = get_channels()
        available = list(channels.keys())
        suggestions = []
    
        # Find similar channel names
        invalid_lower = invalid_channel.lower()
        for channel in available:
            if invalid_lower in channel.lower() or channel.lower() in invalid_lower:
                suggestions.append(channel)
    
        if not suggestions:
            # Fallback to most common channels
            common = ["unstable", "stable", "beta"]
            # Also include version numbers
            version_channels = [ch for ch in available if "." in ch and ch.replace(".", "").isdigit()]
            common.extend(version_channels[:2])  # Add up to 2 version channels
            suggestions = [ch for ch in common if ch in available]
            if not suggestions:
                suggestions = available[:4]  # First 4 available
    
        return f"Available channels: {', '.join(suggestions)}"
  • Internal helper implementation for flake searches, delegated to by nixos_search when search_type='flakes'. Handles flake-specific indexing and deduplication.
    async def _nixos_flakes_search_impl(query: str, limit: int = 20, channel: str = "unstable") -> str:
        """Internal implementation for flakes search."""
        if not 1 <= limit <= 100:
            return error("Limit must be 1-100")
    
        try:
            # Use the same alias as the web UI to get only flake packages
            flake_index = "latest-43-group-manual"
    
            # Build query for flakes
            if query.strip() == "" or query == "*":
                # Empty or wildcard query - get all flakes
                q: dict[str, Any] = {"match_all": {}}
            else:
                # Search query with multiple fields, including nested queries for flake_resolved
                q = {
                    "bool": {
                        "should": [
                            {"match": {"flake_name": {"query": query, "boost": 3}}},
                            {"match": {"flake_description": {"query": query, "boost": 2}}},
                            {"match": {"package_pname": {"query": query, "boost": 1.5}}},
                            {"match": {"package_description": query}},
                            {"wildcard": {"flake_name": {"value": f"*{query}*", "boost": 2.5}}},
                            {"wildcard": {"package_pname": {"value": f"*{query}*", "boost": 1}}},
                            {"prefix": {"flake_name": {"value": query, "boost": 2}}},
                            # Nested queries for flake_resolved fields
                            {
                                "nested": {
                                    "path": "flake_resolved",
                                    "query": {"term": {"flake_resolved.owner": query.lower()}},
                                    "boost": 2,
                                }
                            },
                            {
                                "nested": {
                                    "path": "flake_resolved",
                                    "query": {"term": {"flake_resolved.repo": query.lower()}},
                                    "boost": 2,
                                }
                            },
                        ],
                        "minimum_should_match": 1,
                    }
                }
    
            # Execute search with package filter to match web UI
            search_query = {"bool": {"filter": [{"term": {"type": "package"}}], "must": [q]}}
    
            try:
                resp = requests.post(
                    f"{NIXOS_API}/{flake_index}/_search",
                    json={"query": search_query, "size": limit * 5, "track_total_hits": True},  # Get more results
                    auth=NIXOS_AUTH,
                    timeout=10,
                )
                resp.raise_for_status()
                data = resp.json()
                hits = data.get("hits", {}).get("hits", [])
                total = data.get("hits", {}).get("total", {}).get("value", 0)
            except requests.HTTPError as e:
                if e.response and e.response.status_code == 404:
                    # No flake indices found
                    return error("Flake indices not found. Flake search may be temporarily unavailable.")
                raise
    
            # Format results as plain text
            if not hits:
                return f"""No flakes found matching '{query}'.
    
    Try searching for:
    • Popular flakes: nixpkgs, home-manager, flake-utils, devenv
    • By owner: nix-community, numtide, cachix
    • By topic: python, rust, nodejs, devops
    
    Browse flakes at:
    • GitHub: https://github.com/topics/nix-flakes
    • FlakeHub: https://flakehub.com/"""
    
            # Group hits by flake to avoid duplicates
            flakes = {}
            packages_only = []  # For entries without flake metadata
    
            for hit in hits:
                src = hit.get("_source", {})
    
                # Get flake information
                flake_name = src.get("flake_name", "").strip()
                package_pname = src.get("package_pname", "")
                resolved = src.get("flake_resolved", {})
    
                # Skip entries without any useful name
                if not flake_name and not package_pname:
                    continue
    
                # If we have flake metadata (resolved), use it to create unique key
                if isinstance(resolved, dict) and (resolved.get("owner") or resolved.get("repo") or resolved.get("url")):
                    owner = resolved.get("owner", "")
                    repo = resolved.get("repo", "")
                    url = resolved.get("url", "")
    
                    # Create a unique key based on available info
                    if owner and repo:
                        flake_key = f"{owner}/{repo}"
                        display_name = flake_name or repo or package_pname
                    elif url:
                        # Extract name from URL for git repos
                        flake_key = url
                        if "/" in url:
                            display_name = flake_name or url.rstrip("/").split("/")[-1].replace(".git", "") or package_pname
                        else:
                            display_name = flake_name or package_pname
                    else:
                        flake_key = flake_name or package_pname
                        display_name = flake_key
    
                    # Initialize flake entry if not seen
                    if flake_key not in flakes:
                        flakes[flake_key] = {
                            "name": display_name,
                            "description": src.get("flake_description") or src.get("package_description", ""),
                            "owner": owner,
                            "repo": repo,
                            "url": url,
                            "type": resolved.get("type", ""),
                            "packages": set(),  # Use set to avoid duplicates
                        }
    
                    # Add package if available
                    attr_name = src.get("package_attr_name", "")
                    if attr_name:
                        flakes[flake_key]["packages"].add(attr_name)
    
                elif flake_name:
                    # Has flake_name but no resolved metadata
                    flake_key = flake_name
    
                    if flake_key not in flakes:
                        flakes[flake_key] = {
                            "name": flake_name,
                            "description": src.get("flake_description") or src.get("package_description", ""),
                            "owner": "",
                            "repo": "",
                            "type": "",
                            "packages": set(),
                        }
    
                    # Add package if available
                    attr_name = src.get("package_attr_name", "")
                    if attr_name:
                        flakes[flake_key]["packages"].add(attr_name)
    
                else:
                    # Package without flake metadata - might still be relevant
                    packages_only.append(
                        {
                            "name": package_pname,
                            "description": src.get("package_description", ""),
                            "attr_name": src.get("package_attr_name", ""),
                        }
                    )
    
            # Build results
            results = []
            # Show both total hits and unique flakes
            if total > len(flakes):
                results.append(f"Found {total:,} total matches ({len(flakes)} unique flakes) matching '{query}':\n")
            else:
                results.append(f"Found {len(flakes)} unique flakes matching '{query}':\n")
    
            for flake in flakes.values():
                results.append(f"• {flake['name']}")
                if flake.get("owner") and flake.get("repo"):
                    results.append(
                        f"  Repository: {flake['owner']}/{flake['repo']}"
                        + (f" ({flake['type']})" if flake.get("type") else "")
                    )
                elif flake.get("url"):
                    results.append(f"  URL: {flake['url']}")
                if flake.get("description"):
                    desc = flake["description"]
                    if len(desc) > 200:
                        desc = desc[:200] + "..."
                    results.append(f"  {desc}")
                if flake["packages"]:
                    # Show max 5 packages, sorted
                    packages = sorted(flake["packages"])[:5]
                    if len(flake["packages"]) > 5:
                        results.append(f"  Packages: {', '.join(packages)}, ... ({len(flake['packages'])} total)")
                    else:
                        results.append(f"  Packages: {', '.join(packages)}")
                results.append("")
    
            return "\n".join(results).strip()
    
        except Exception as e:
            return error(str(e))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/utensils/mcp-nixos'

If you have feedback or need assistance with the MCP directory API, please join our Discord server