nixos_stats
Retrieve NixOS channel statistics including package and option counts to verify system configuration data and prevent AI hallucinations about NixOS resources.
Instructions
Get NixOS statistics for a channel.
Args: channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05")
Returns: Plain text statistics including package/option counts
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| channel | No | unstable |
Implementation Reference
- mcp_nixos/server.py:605-648 (handler)The primary handler function for the 'nixos_stats' MCP tool. It validates the channel, queries the NixOS Elasticsearch API for package and option counts, handles errors gracefully, and returns formatted plain-text statistics.@mcp.tool() async def nixos_stats(channel: str = "unstable") -> str: """Get NixOS statistics for a channel. Args: channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05") Returns: Plain text statistics including package/option counts """ channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") try: index = channels[channel] url = f"{NIXOS_API}/{index}/_count" # Get counts with error handling try: pkg_resp = requests.post(url, json={"query": {"term": {"type": "package"}}}, auth=NIXOS_AUTH, timeout=10) pkg_resp.raise_for_status() pkg_count = pkg_resp.json().get("count", 0) except Exception: pkg_count = 0 try: opt_resp = requests.post(url, json={"query": {"term": {"type": "option"}}}, auth=NIXOS_AUTH, timeout=10) opt_resp.raise_for_status() opt_count = opt_resp.json().get("count", 0) except Exception: opt_count = 0 if pkg_count == 0 and opt_count == 0: return error("Failed to retrieve statistics") return f"""NixOS Statistics for {channel} channel: • Packages: {pkg_count:,} • Options: {opt_count:,}""" except Exception as e: return error(str(e))
- mcp_nixos/server.py:180-183 (helper)Helper function called by nixos_stats to retrieve resolved channel mappings from the cache.def get_channels() -> dict[str, str]: """Get current channel mappings (cached and resolved).""" return channel_cache.get_resolved()
- mcp_nixos/server.py:170-648 (helper)Global ChannelCache instance used by nixos_stats for dynamic channel discovery and resolution.channel_cache = ChannelCache() def error(msg: str, code: str = "ERROR") -> str: """Format error as plain text.""" # Ensure msg is always a string, even if empty msg = str(msg) if msg is not None else "" return f"Error ({code}): {msg}" def get_channels() -> dict[str, str]: """Get current channel mappings (cached and resolved).""" return channel_cache.get_resolved() def validate_channel(channel: str) -> bool: """Validate if a channel exists and is accessible.""" channels = get_channels() if channel in channels: index = channels[channel] try: resp = requests.post( f"{NIXOS_API}/{index}/_count", json={"query": {"match_all": {}}}, auth=NIXOS_AUTH, timeout=5 ) return resp.status_code == 200 and resp.json().get("count", 0) > 0 except Exception: return False return False def get_channel_suggestions(invalid_channel: str) -> str: """Get helpful suggestions for invalid channels.""" channels = get_channels() available = list(channels.keys()) suggestions = [] # Find similar channel names invalid_lower = invalid_channel.lower() for channel in available: if invalid_lower in channel.lower() or channel.lower() in invalid_lower: suggestions.append(channel) if not suggestions: # Fallback to most common channels common = ["unstable", "stable", "beta"] # Also include version numbers version_channels = [ch for ch in available if "." in ch and ch.replace(".", "").isdigit()] common.extend(version_channels[:2]) # Add up to 2 version channels suggestions = [ch for ch in common if ch in available] if not suggestions: suggestions = available[:4] # First 4 available return f"Available channels: {', '.join(suggestions)}" def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]: """Execute Elasticsearch query.""" try: resp = requests.post( f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10 ) resp.raise_for_status() data = resp.json() # Handle malformed responses gracefully if isinstance(data, dict) and "hits" in data: hits = data.get("hits", {}) if isinstance(hits, dict) and "hits" in hits: return list(hits.get("hits", [])) return [] except requests.Timeout as exc: raise APIError("API error: Connection timed out") from exc except requests.HTTPError as exc: raise APIError(f"API error: {str(exc)}") from exc except Exception as exc: raise APIError(f"API error: {str(exc)}") from exc def parse_html_options(url: str, query: str = "", prefix: str = "", limit: int = 100) -> list[dict[str, str]]: """Parse options from HTML documentation.""" try: resp = requests.get(url, timeout=30) # Increase timeout for large docs resp.raise_for_status() # Use resp.content to let BeautifulSoup handle encoding detection # This prevents encoding errors like "unknown encoding: windows-1252" soup = BeautifulSoup(resp.content, "html.parser") options = [] # Get all dt elements dts = soup.find_all("dt") for dt in dts: # Get option name name = "" if "home-manager" in url: # Home Manager uses anchor IDs like "opt-programs.git.enable" anchor = dt.find("a", id=True) if anchor: anchor_id = anchor.get("id", "") # Remove "opt-" prefix and convert underscores if anchor_id.startswith("opt-"): name = anchor_id[4:] # Remove "opt-" prefix # Convert _name_ placeholders back to <name> name = name.replace("_name_", "<name>") else: # Fallback to text content name_elem = dt.find(string=True, recursive=False) if name_elem: name = name_elem.strip() else: name = dt.get_text(strip=True) else: # Darwin and fallback - use text content name = dt.get_text(strip=True) # Skip if it doesn't look like an option (must contain a dot) # But allow single-word options in some cases if "." not in name and len(name.split()) > 1: continue # Filter by query or prefix if query and query.lower() not in name.lower(): continue if prefix and not (name.startswith(prefix + ".") or name == prefix): continue # Find the corresponding dd element dd = dt.find_next_sibling("dd") if dd: # Extract description (first p tag or direct text) desc_elem = dd.find("p") if desc_elem: description = desc_elem.get_text(strip=True) else: # Get first text node, handle None case text = dd.get_text(strip=True) description = text.split("\n")[0] if text else "" # Extract type info - look for various patterns type_info = "" # Pattern 1: <span class="term">Type: ...</span> type_elem = dd.find("span", class_="term") if type_elem and "Type:" in type_elem.get_text(): type_info = type_elem.get_text(strip=True).replace("Type:", "").strip() # Pattern 2: Look for "Type:" in text elif "Type:" in dd.get_text(): text = dd.get_text() type_start = text.find("Type:") + 5 type_end = text.find("\n", type_start) if type_end == -1: type_end = len(text) type_info = text[type_start:type_end].strip() options.append( { "name": name, "description": description[:200] if len(description) > 200 else description, "type": type_info, } ) if len(options) >= limit: break return options except Exception as exc: raise DocumentParseError(f"Failed to fetch docs: {str(exc)}") from exc @mcp.tool() async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str: """Search NixOS packages, options, or programs. Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05") Returns: Plain text results with bullet points or error message """ if search_type not in ["packages", "options", "programs", "flakes"]: return error(f"Invalid type '{search_type}'") channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") if not 1 <= limit <= 100: return error("Limit must be 1-100") # Redirect flakes to dedicated function if search_type == "flakes": return await _nixos_flakes_search_impl(query, limit) try: # Build query with correct field names if search_type == "packages": q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_pname": {"query": query, "boost": 3}}}, {"match": {"package_description": query}}, ], "minimum_should_match": 1, } } elif search_type == "options": # Use wildcard for option names to handle hierarchical names like services.nginx.enable q = { "bool": { "must": [{"term": {"type": "option"}}], "should": [ {"wildcard": {"option_name": f"*{query}*"}}, {"match": {"option_description": query}}, ], "minimum_should_match": 1, } } else: # programs q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_programs": {"query": query, "boost": 2}}}, {"match": {"package_pname": query}}, ], "minimum_should_match": 1, } } hits = es_query(channels[channel], q, limit) # Format results as plain text if not hits: return f"No {search_type} found matching '{query}'" results = [] results.append(f"Found {len(hits)} {search_type} matching '{query}':\n") for hit in hits: src = hit.get("_source", {}) if search_type == "packages": name = src.get("package_pname", "") version = src.get("package_pversion", "") desc = src.get("package_description", "") results.append(f"• {name} ({version})") if desc: results.append(f" {desc}") results.append("") elif search_type == "options": name = src.get("option_name", "") opt_type = src.get("option_type", "") desc = src.get("option_description", "") # Strip HTML tags from description if desc and "<rendered-html>" in desc: # Remove outer rendered-html tags desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "") # Remove common HTML tags desc = re.sub(r"<[^>]+>", "", desc) desc = desc.strip() results.append(f"• {name}") if opt_type: results.append(f" Type: {opt_type}") if desc: results.append(f" {desc}") results.append("") else: # programs programs = src.get("package_programs", []) pkg_name = src.get("package_pname", "") # Check if query matches any program exactly (case-insensitive) query_lower = query.lower() matched_programs = [p for p in programs if p.lower() == query_lower] for prog in matched_programs: results.append(f"• {prog} (provided by {pkg_name})") results.append("") return "\n".join(results).strip() except Exception as e: return error(str(e)) @mcp.tool() async def nixos_info(name: str, type: str = "package", channel: str = "unstable") -> str: # pylint: disable=redefined-builtin """Get detailed info about a NixOS package or option. Args: name: Name of the package or option to look up type: Type of lookup - "package" or "option" channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05") Returns: Plain text details about the package/option or error message """ info_type = type # Avoid shadowing built-in if info_type not in ["package", "option"]: return error("Type must be 'package' or 'option'") channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") try: # Exact match query with correct field names field = "package_pname" if info_type == "package" else "option_name" query = {"bool": {"must": [{"term": {"type": info_type}}, {"term": {field: name}}]}} hits = es_query(channels[channel], query, 1) if not hits: return error(f"{info_type.capitalize()} '{name}' not found", "NOT_FOUND") src = hits[0].get("_source", {}) if info_type == "package": info = [] info.append(f"Package: {src.get('package_pname', '')}") info.append(f"Version: {src.get('package_pversion', '')}") desc = src.get("package_description", "") if desc: info.append(f"Description: {desc}") homepage = src.get("package_homepage", []) if homepage: if isinstance(homepage, list): homepage = homepage[0] if homepage else "" info.append(f"Homepage: {homepage}") licenses = src.get("package_license_set", []) if licenses: info.append(f"License: {', '.join(licenses)}") return "\n".join(info) # Option type info = [] info.append(f"Option: {src.get('option_name', '')}") opt_type = src.get("option_type", "") if opt_type: info.append(f"Type: {opt_type}") desc = src.get("option_description", "") if desc: # Strip HTML tags from description if "<rendered-html>" in desc: desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "") desc = re.sub(r"<[^>]+>", "", desc) desc = desc.strip() info.append(f"Description: {desc}") default = src.get("option_default", "") if default: info.append(f"Default: {default}") example = src.get("option_example", "") if example: info.append(f"Example: {example}") return "\n".join(info) except Exception as e: return error(str(e)) @mcp.tool() async def nixos_channels() -> str: """List available NixOS channels with their status. Returns: Plain text list showing channel names, versions, and availability """ try: # Get resolved channels and available raw data configured = get_channels() available = channel_cache.get_available() results = [] # Show warning if using fallback channels if channel_cache.using_fallback: results.append("⚠️ WARNING: Using fallback channels (API discovery failed)") results.append(" Check network connectivity to search.nixos.org") results.append("") results.append("NixOS Channels (fallback mode):\n") else: results.append("NixOS Channels (auto-discovered):\n") # Show user-friendly channel names for name, index in sorted(configured.items()): status = "✓ Available" if index in available else "✗ Unavailable" doc_count = available.get(index, "Unknown") # Mark stable channel clearly label = f"• {name}" if name == "stable": # Extract version from index parts = index.split("-") if len(parts) >= 4: version = parts[3] label = f"• {name} (current: {version})" results.append(f"{label} → {index}") if index in available: results.append(f" Status: {status} ({doc_count})") else: if channel_cache.using_fallback: results.append(" Status: Fallback (may not be current)") else: results.append(f" Status: {status}") results.append("") # Show additional discovered channels not in our mapping if not channel_cache.using_fallback: discovered_only = set(available.keys()) - set(configured.values()) if discovered_only: results.append("Additional available channels:") for index in sorted(discovered_only): results.append(f"• {index} ({available[index]})") # Add deprecation warnings results.append("\nNote: Channels are dynamically discovered.") results.append("'stable' always points to the current stable release.") if channel_cache.using_fallback: results.append("\n⚠️ Fallback channels may not reflect the latest available versions.") results.append(" Please check your network connection to search.nixos.org.") return "\n".join(results).strip() except Exception as e: return error(str(e)) @mcp.tool() async def nixos_stats(channel: str = "unstable") -> str: """Get NixOS statistics for a channel. Args: channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05") Returns: Plain text statistics including package/option counts """ channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") try: index = channels[channel] url = f"{NIXOS_API}/{index}/_count" # Get counts with error handling try: pkg_resp = requests.post(url, json={"query": {"term": {"type": "package"}}}, auth=NIXOS_AUTH, timeout=10) pkg_resp.raise_for_status() pkg_count = pkg_resp.json().get("count", 0) except Exception: pkg_count = 0 try: opt_resp = requests.post(url, json={"query": {"term": {"type": "option"}}}, auth=NIXOS_AUTH, timeout=10) opt_resp.raise_for_status() opt_count = opt_resp.json().get("count", 0) except Exception: opt_count = 0 if pkg_count == 0 and opt_count == 0: return error("Failed to retrieve statistics") return f"""NixOS Statistics for {channel} channel: • Packages: {pkg_count:,} • Options: {opt_count:,}""" except Exception as e: return error(str(e))
- mcp_nixos/server.py:605-606 (registration)FastMCP decorator that registers the nixos_stats function as an MCP tool.@mcp.tool() async def nixos_stats(channel: str = "unstable") -> str: