Skip to main content
Glama

nixos_stats

Retrieve NixOS channel statistics including package and option counts to verify system configuration data and prevent AI hallucinations about NixOS resources.

Instructions

Get NixOS statistics for a channel.

Args: channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05")

Returns: Plain text statistics including package/option counts

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
channelNounstable

Implementation Reference

  • The primary handler function for the 'nixos_stats' MCP tool. It validates the channel, queries the NixOS Elasticsearch API for package and option counts, handles errors gracefully, and returns formatted plain-text statistics.
    @mcp.tool()
    async def nixos_stats(channel: str = "unstable") -> str:
        """Get NixOS statistics for a channel.
    
        Args:
            channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05")
    
        Returns:
            Plain text statistics including package/option counts
        """
        channels = get_channels()
        if channel not in channels:
            suggestions = get_channel_suggestions(channel)
            return error(f"Invalid channel '{channel}'. {suggestions}")
    
        try:
            index = channels[channel]
            url = f"{NIXOS_API}/{index}/_count"
    
            # Get counts with error handling
            try:
                pkg_resp = requests.post(url, json={"query": {"term": {"type": "package"}}}, auth=NIXOS_AUTH, timeout=10)
                pkg_resp.raise_for_status()
                pkg_count = pkg_resp.json().get("count", 0)
            except Exception:
                pkg_count = 0
    
            try:
                opt_resp = requests.post(url, json={"query": {"term": {"type": "option"}}}, auth=NIXOS_AUTH, timeout=10)
                opt_resp.raise_for_status()
                opt_count = opt_resp.json().get("count", 0)
            except Exception:
                opt_count = 0
    
            if pkg_count == 0 and opt_count == 0:
                return error("Failed to retrieve statistics")
    
            return f"""NixOS Statistics for {channel} channel:
    • Packages: {pkg_count:,}
    • Options: {opt_count:,}"""
    
        except Exception as e:
            return error(str(e))
  • Helper function called by nixos_stats to retrieve resolved channel mappings from the cache.
    def get_channels() -> dict[str, str]:
        """Get current channel mappings (cached and resolved)."""
        return channel_cache.get_resolved()
  • Global ChannelCache instance used by nixos_stats for dynamic channel discovery and resolution.
    channel_cache = ChannelCache()
    
    
    def error(msg: str, code: str = "ERROR") -> str:
        """Format error as plain text."""
        # Ensure msg is always a string, even if empty
        msg = str(msg) if msg is not None else ""
        return f"Error ({code}): {msg}"
    
    
    def get_channels() -> dict[str, str]:
        """Get current channel mappings (cached and resolved)."""
        return channel_cache.get_resolved()
    
    
    def validate_channel(channel: str) -> bool:
        """Validate if a channel exists and is accessible."""
        channels = get_channels()
        if channel in channels:
            index = channels[channel]
            try:
                resp = requests.post(
                    f"{NIXOS_API}/{index}/_count", json={"query": {"match_all": {}}}, auth=NIXOS_AUTH, timeout=5
                )
                return resp.status_code == 200 and resp.json().get("count", 0) > 0
            except Exception:
                return False
        return False
    
    
    def get_channel_suggestions(invalid_channel: str) -> str:
        """Get helpful suggestions for invalid channels."""
        channels = get_channels()
        available = list(channels.keys())
        suggestions = []
    
        # Find similar channel names
        invalid_lower = invalid_channel.lower()
        for channel in available:
            if invalid_lower in channel.lower() or channel.lower() in invalid_lower:
                suggestions.append(channel)
    
        if not suggestions:
            # Fallback to most common channels
            common = ["unstable", "stable", "beta"]
            # Also include version numbers
            version_channels = [ch for ch in available if "." in ch and ch.replace(".", "").isdigit()]
            common.extend(version_channels[:2])  # Add up to 2 version channels
            suggestions = [ch for ch in common if ch in available]
            if not suggestions:
                suggestions = available[:4]  # First 4 available
    
        return f"Available channels: {', '.join(suggestions)}"
    
    
    def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]:
        """Execute Elasticsearch query."""
        try:
            resp = requests.post(
                f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10
            )
            resp.raise_for_status()
            data = resp.json()
            # Handle malformed responses gracefully
            if isinstance(data, dict) and "hits" in data:
                hits = data.get("hits", {})
                if isinstance(hits, dict) and "hits" in hits:
                    return list(hits.get("hits", []))
            return []
        except requests.Timeout as exc:
            raise APIError("API error: Connection timed out") from exc
        except requests.HTTPError as exc:
            raise APIError(f"API error: {str(exc)}") from exc
        except Exception as exc:
            raise APIError(f"API error: {str(exc)}") from exc
    
    
    def parse_html_options(url: str, query: str = "", prefix: str = "", limit: int = 100) -> list[dict[str, str]]:
        """Parse options from HTML documentation."""
        try:
            resp = requests.get(url, timeout=30)  # Increase timeout for large docs
            resp.raise_for_status()
            # Use resp.content to let BeautifulSoup handle encoding detection
            # This prevents encoding errors like "unknown encoding: windows-1252"
            soup = BeautifulSoup(resp.content, "html.parser")
            options = []
    
            # Get all dt elements
            dts = soup.find_all("dt")
    
            for dt in dts:
                # Get option name
                name = ""
                if "home-manager" in url:
                    # Home Manager uses anchor IDs like "opt-programs.git.enable"
                    anchor = dt.find("a", id=True)
                    if anchor:
                        anchor_id = anchor.get("id", "")
                        # Remove "opt-" prefix and convert underscores
                        if anchor_id.startswith("opt-"):
                            name = anchor_id[4:]  # Remove "opt-" prefix
                            # Convert _name_ placeholders back to <name>
                            name = name.replace("_name_", "<name>")
                    else:
                        # Fallback to text content
                        name_elem = dt.find(string=True, recursive=False)
                        if name_elem:
                            name = name_elem.strip()
                        else:
                            name = dt.get_text(strip=True)
                else:
                    # Darwin and fallback - use text content
                    name = dt.get_text(strip=True)
    
                # Skip if it doesn't look like an option (must contain a dot)
                # But allow single-word options in some cases
                if "." not in name and len(name.split()) > 1:
                    continue
    
                # Filter by query or prefix
                if query and query.lower() not in name.lower():
                    continue
                if prefix and not (name.startswith(prefix + ".") or name == prefix):
                    continue
    
                # Find the corresponding dd element
                dd = dt.find_next_sibling("dd")
                if dd:
                    # Extract description (first p tag or direct text)
                    desc_elem = dd.find("p")
                    if desc_elem:
                        description = desc_elem.get_text(strip=True)
                    else:
                        # Get first text node, handle None case
                        text = dd.get_text(strip=True)
                        description = text.split("\n")[0] if text else ""
    
                    # Extract type info - look for various patterns
                    type_info = ""
                    # Pattern 1: <span class="term">Type: ...</span>
                    type_elem = dd.find("span", class_="term")
                    if type_elem and "Type:" in type_elem.get_text():
                        type_info = type_elem.get_text(strip=True).replace("Type:", "").strip()
                    # Pattern 2: Look for "Type:" in text
                    elif "Type:" in dd.get_text():
                        text = dd.get_text()
                        type_start = text.find("Type:") + 5
                        type_end = text.find("\n", type_start)
                        if type_end == -1:
                            type_end = len(text)
                        type_info = text[type_start:type_end].strip()
    
                    options.append(
                        {
                            "name": name,
                            "description": description[:200] if len(description) > 200 else description,
                            "type": type_info,
                        }
                    )
    
                    if len(options) >= limit:
                        break
    
            return options
        except Exception as exc:
            raise DocumentParseError(f"Failed to fetch docs: {str(exc)}") from exc
    
    
    @mcp.tool()
    async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str:
        """Search NixOS packages, options, or programs.
    
        Args:
            query: Search term to look for
            search_type: Type of search - "packages", "options", "programs", or "flakes"
            limit: Maximum number of results to return (1-100)
            channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")
    
        Returns:
            Plain text results with bullet points or error message
        """
        if search_type not in ["packages", "options", "programs", "flakes"]:
            return error(f"Invalid type '{search_type}'")
        channels = get_channels()
        if channel not in channels:
            suggestions = get_channel_suggestions(channel)
            return error(f"Invalid channel '{channel}'. {suggestions}")
        if not 1 <= limit <= 100:
            return error("Limit must be 1-100")
    
        # Redirect flakes to dedicated function
        if search_type == "flakes":
            return await _nixos_flakes_search_impl(query, limit)
    
        try:
            # Build query with correct field names
            if search_type == "packages":
                q = {
                    "bool": {
                        "must": [{"term": {"type": "package"}}],
                        "should": [
                            {"match": {"package_pname": {"query": query, "boost": 3}}},
                            {"match": {"package_description": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
            elif search_type == "options":
                # Use wildcard for option names to handle hierarchical names like services.nginx.enable
                q = {
                    "bool": {
                        "must": [{"term": {"type": "option"}}],
                        "should": [
                            {"wildcard": {"option_name": f"*{query}*"}},
                            {"match": {"option_description": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
            else:  # programs
                q = {
                    "bool": {
                        "must": [{"term": {"type": "package"}}],
                        "should": [
                            {"match": {"package_programs": {"query": query, "boost": 2}}},
                            {"match": {"package_pname": query}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
    
            hits = es_query(channels[channel], q, limit)
    
            # Format results as plain text
            if not hits:
                return f"No {search_type} found matching '{query}'"
    
            results = []
            results.append(f"Found {len(hits)} {search_type} matching '{query}':\n")
    
            for hit in hits:
                src = hit.get("_source", {})
                if search_type == "packages":
                    name = src.get("package_pname", "")
                    version = src.get("package_pversion", "")
                    desc = src.get("package_description", "")
                    results.append(f"• {name} ({version})")
                    if desc:
                        results.append(f"  {desc}")
                    results.append("")
                elif search_type == "options":
                    name = src.get("option_name", "")
                    opt_type = src.get("option_type", "")
                    desc = src.get("option_description", "")
                    # Strip HTML tags from description
                    if desc and "<rendered-html>" in desc:
                        # Remove outer rendered-html tags
                        desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "")
                        # Remove common HTML tags
                        desc = re.sub(r"<[^>]+>", "", desc)
                        desc = desc.strip()
                    results.append(f"• {name}")
                    if opt_type:
                        results.append(f"  Type: {opt_type}")
                    if desc:
                        results.append(f"  {desc}")
                    results.append("")
                else:  # programs
                    programs = src.get("package_programs", [])
                    pkg_name = src.get("package_pname", "")
    
                    # Check if query matches any program exactly (case-insensitive)
                    query_lower = query.lower()
                    matched_programs = [p for p in programs if p.lower() == query_lower]
    
                    for prog in matched_programs:
                        results.append(f"• {prog} (provided by {pkg_name})")
                        results.append("")
    
            return "\n".join(results).strip()
    
        except Exception as e:
            return error(str(e))
    
    
    @mcp.tool()
    async def nixos_info(name: str, type: str = "package", channel: str = "unstable") -> str:  # pylint: disable=redefined-builtin
        """Get detailed info about a NixOS package or option.
    
        Args:
            name: Name of the package or option to look up
            type: Type of lookup - "package" or "option"
            channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")
    
        Returns:
            Plain text details about the package/option or error message
        """
        info_type = type  # Avoid shadowing built-in
        if info_type not in ["package", "option"]:
            return error("Type must be 'package' or 'option'")
        channels = get_channels()
        if channel not in channels:
            suggestions = get_channel_suggestions(channel)
            return error(f"Invalid channel '{channel}'. {suggestions}")
    
        try:
            # Exact match query with correct field names
            field = "package_pname" if info_type == "package" else "option_name"
            query = {"bool": {"must": [{"term": {"type": info_type}}, {"term": {field: name}}]}}
            hits = es_query(channels[channel], query, 1)
    
            if not hits:
                return error(f"{info_type.capitalize()} '{name}' not found", "NOT_FOUND")
    
            src = hits[0].get("_source", {})
    
            if info_type == "package":
                info = []
                info.append(f"Package: {src.get('package_pname', '')}")
                info.append(f"Version: {src.get('package_pversion', '')}")
    
                desc = src.get("package_description", "")
                if desc:
                    info.append(f"Description: {desc}")
    
                homepage = src.get("package_homepage", [])
                if homepage:
                    if isinstance(homepage, list):
                        homepage = homepage[0] if homepage else ""
                    info.append(f"Homepage: {homepage}")
    
                licenses = src.get("package_license_set", [])
                if licenses:
                    info.append(f"License: {', '.join(licenses)}")
    
                return "\n".join(info)
    
            # Option type
            info = []
            info.append(f"Option: {src.get('option_name', '')}")
    
            opt_type = src.get("option_type", "")
            if opt_type:
                info.append(f"Type: {opt_type}")
    
            desc = src.get("option_description", "")
            if desc:
                # Strip HTML tags from description
                if "<rendered-html>" in desc:
                    desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "")
                    desc = re.sub(r"<[^>]+>", "", desc)
                    desc = desc.strip()
                info.append(f"Description: {desc}")
    
            default = src.get("option_default", "")
            if default:
                info.append(f"Default: {default}")
    
            example = src.get("option_example", "")
            if example:
                info.append(f"Example: {example}")
    
            return "\n".join(info)
    
        except Exception as e:
            return error(str(e))
    
    
    @mcp.tool()
    async def nixos_channels() -> str:
        """List available NixOS channels with their status.
    
        Returns:
            Plain text list showing channel names, versions, and availability
        """
        try:
            # Get resolved channels and available raw data
            configured = get_channels()
            available = channel_cache.get_available()
    
            results = []
    
            # Show warning if using fallback channels
            if channel_cache.using_fallback:
                results.append("⚠️  WARNING: Using fallback channels (API discovery failed)")
                results.append("    Check network connectivity to search.nixos.org")
                results.append("")
                results.append("NixOS Channels (fallback mode):\n")
            else:
                results.append("NixOS Channels (auto-discovered):\n")
    
            # Show user-friendly channel names
            for name, index in sorted(configured.items()):
                status = "✓ Available" if index in available else "✗ Unavailable"
                doc_count = available.get(index, "Unknown")
    
                # Mark stable channel clearly
                label = f"• {name}"
                if name == "stable":
                    # Extract version from index
                    parts = index.split("-")
                    if len(parts) >= 4:
                        version = parts[3]
                        label = f"• {name} (current: {version})"
    
                results.append(f"{label} → {index}")
                if index in available:
                    results.append(f"  Status: {status} ({doc_count})")
                else:
                    if channel_cache.using_fallback:
                        results.append("  Status: Fallback (may not be current)")
                    else:
                        results.append(f"  Status: {status}")
                results.append("")
    
            # Show additional discovered channels not in our mapping
            if not channel_cache.using_fallback:
                discovered_only = set(available.keys()) - set(configured.values())
                if discovered_only:
                    results.append("Additional available channels:")
                    for index in sorted(discovered_only):
                        results.append(f"• {index} ({available[index]})")
    
            # Add deprecation warnings
            results.append("\nNote: Channels are dynamically discovered.")
            results.append("'stable' always points to the current stable release.")
            if channel_cache.using_fallback:
                results.append("\n⚠️  Fallback channels may not reflect the latest available versions.")
                results.append("   Please check your network connection to search.nixos.org.")
    
            return "\n".join(results).strip()
        except Exception as e:
            return error(str(e))
    
    
    @mcp.tool()
    async def nixos_stats(channel: str = "unstable") -> str:
        """Get NixOS statistics for a channel.
    
        Args:
            channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05")
    
        Returns:
            Plain text statistics including package/option counts
        """
        channels = get_channels()
        if channel not in channels:
            suggestions = get_channel_suggestions(channel)
            return error(f"Invalid channel '{channel}'. {suggestions}")
    
        try:
            index = channels[channel]
            url = f"{NIXOS_API}/{index}/_count"
    
            # Get counts with error handling
            try:
                pkg_resp = requests.post(url, json={"query": {"term": {"type": "package"}}}, auth=NIXOS_AUTH, timeout=10)
                pkg_resp.raise_for_status()
                pkg_count = pkg_resp.json().get("count", 0)
            except Exception:
                pkg_count = 0
    
            try:
                opt_resp = requests.post(url, json={"query": {"term": {"type": "option"}}}, auth=NIXOS_AUTH, timeout=10)
                opt_resp.raise_for_status()
                opt_count = opt_resp.json().get("count", 0)
            except Exception:
                opt_count = 0
    
            if pkg_count == 0 and opt_count == 0:
                return error("Failed to retrieve statistics")
    
            return f"""NixOS Statistics for {channel} channel:
    • Packages: {pkg_count:,}
    • Options: {opt_count:,}"""
    
        except Exception as e:
            return error(str(e))
  • FastMCP decorator that registers the nixos_stats function as an MCP tool.
    @mcp.tool()
    async def nixos_stats(channel: str = "unstable") -> str:

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/utensils/mcp-nixos'

If you have feedback or need assistance with the MCP directory API, please join our Discord server