Skip to main content
Glama

nixos_search

Search NixOS packages, options, programs, or flakes to find accurate system configuration resources and prevent AI hallucinations about NixOS.

Instructions

Search NixOS packages, options, or programs.

Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")

Returns: Plain text results with bullet points or error message

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
search_typeNopackages
limitNo
channelNounstable

Implementation Reference

  • Core handler function for the 'nixos_search' MCP tool. Performs searches across NixOS packages, options, programs, and flakes using dynamic channel discovery and Elasticsearch API.
    @mcp.tool() async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str: """Search NixOS packages, options, or programs. Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05") Returns: Plain text results with bullet points or error message """ if search_type not in ["packages", "options", "programs", "flakes"]: return error(f"Invalid type '{search_type}'") channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") if not 1 <= limit <= 100: return error("Limit must be 1-100") # Redirect flakes to dedicated function if search_type == "flakes": return await _nixos_flakes_search_impl(query, limit) try: # Build query with correct field names if search_type == "packages": q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_pname": {"query": query, "boost": 3}}}, {"match": {"package_description": query}}, ], "minimum_should_match": 1, } } elif search_type == "options": # Use wildcard for option names to handle hierarchical names like services.nginx.enable q = { "bool": { "must": [{"term": {"type": "option"}}], "should": [ {"wildcard": {"option_name": f"*{query}*"}}, {"match": {"option_description": query}}, ], "minimum_should_match": 1, } } else: # programs q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_programs": {"query": query, "boost": 2}}}, {"match": {"package_pname": query}}, ], "minimum_should_match": 1, } } hits = es_query(channels[channel], q, limit) # Format results as plain text if not hits: return f"No {search_type} found matching '{query}'" results = [] results.append(f"Found {len(hits)} {search_type} matching '{query}':\n") for hit in hits: src = hit.get("_source", {}) if search_type == "packages": name = src.get("package_pname", "") version = src.get("package_pversion", "") desc = src.get("package_description", "") results.append(f"• {name} ({version})") if desc: results.append(f" {desc}") results.append("") elif search_type == "options": name = src.get("option_name", "") opt_type = src.get("option_type", "") desc = src.get("option_description", "") # Strip HTML tags from description if desc and "<rendered-html>" in desc: # Remove outer rendered-html tags desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "") # Remove common HTML tags desc = re.sub(r"<[^>]+>", "", desc) desc = desc.strip() results.append(f"• {name}") if opt_type: results.append(f" Type: {opt_type}") if desc: results.append(f" {desc}") results.append("") else: # programs programs = src.get("package_programs", []) pkg_name = src.get("package_pname", "") # Check if query matches any program exactly (case-insensitive) query_lower = query.lower() matched_programs = [p for p in programs if p.lower() == query_lower] for prog in matched_programs: results.append(f"• {prog} (provided by {pkg_name})") results.append("") return "\n".join(results).strip() except Exception as e: return error(str(e))
  • ChannelCache class responsible for dynamically discovering available NixOS channels, resolving user-friendly names to Elasticsearch indices, and providing fallback mappings. Used extensively by nixos_search for channel validation and queries.
    class ChannelCache: """Cache for discovered channels and resolved mappings.""" def __init__(self) -> None: """Initialize empty cache.""" self.available_channels: dict[str, str] | None = None self.resolved_channels: dict[str, str] | None = None self.using_fallback: bool = False def get_available(self) -> dict[str, str]: """Get available channels, discovering if needed.""" if self.available_channels is None: self.available_channels = self._discover_available_channels() return self.available_channels if self.available_channels is not None else {} def get_resolved(self) -> dict[str, str]: """Get resolved channel mappings, resolving if needed.""" if self.resolved_channels is None: self.resolved_channels = self._resolve_channels() return self.resolved_channels if self.resolved_channels is not None else {} def _discover_available_channels(self) -> dict[str, str]: """Discover available NixOS channels by testing API patterns.""" # Test multiple generation patterns (43, 44, 45) and versions generations = [43, 44, 45, 46] # Future-proof # Removed deprecated versions (20.09, 24.11 - EOL June 2025) versions = ["unstable", "25.05", "25.11", "26.05", "30.05"] # Current and future available = {} for gen in generations: for version in versions: pattern = f"latest-{gen}-nixos-{version}" try: resp = requests.post( f"{NIXOS_API}/{pattern}/_count", json={"query": {"match_all": {}}}, auth=NIXOS_AUTH, timeout=10, # Increased from 5s to 10s for slow connections ) if resp.status_code == 200: count = resp.json().get("count", 0) if count > 0: available[pattern] = f"{count:,} documents" except Exception: continue return available def _resolve_channels(self) -> dict[str, str]: """Resolve user-friendly channel names to actual indices.""" available = self.get_available() # If no channels were discovered, use fallback channels if not available: self.using_fallback = True return FALLBACK_CHANNELS.copy() resolved = {} # Find unstable (should be consistent) unstable_pattern = None for pattern in available: if "unstable" in pattern: unstable_pattern = pattern break if unstable_pattern: resolved["unstable"] = unstable_pattern # Find stable release (highest version number with most documents) stable_candidates = [] for pattern, count_str in available.items(): if "unstable" not in pattern: # Extract version (e.g., "25.05" from "latest-43-nixos-25.05") parts = pattern.split("-") if len(parts) >= 4: version = parts[3] # "25.05" try: # Parse version for comparison (25.05 -> 25.05) major, minor = map(int, version.split(".")) count = int(count_str.replace(",", "").replace(" documents", "")) stable_candidates.append((major, minor, version, pattern, count)) except (ValueError, IndexError): continue if stable_candidates: # Sort by version (descending), then by document count (descending) as tiebreaker stable_candidates.sort(key=lambda x: (x[0], x[1], x[4]), reverse=True) current_stable = stable_candidates[0] resolved["stable"] = current_stable[3] # pattern resolved[current_stable[2]] = current_stable[3] # version -> pattern # Add other version mappings (prefer higher generation/count for same version) version_patterns: dict[str, tuple[str, int]] = {} for _major, _minor, version, pattern, count in stable_candidates: if version not in version_patterns or count > version_patterns[version][1]: version_patterns[version] = (pattern, count) for version, (pattern, _count) in version_patterns.items(): resolved[version] = pattern # Add beta (alias for stable) if "stable" in resolved: resolved["beta"] = resolved["stable"] # If we still have no channels after all that, use fallback if not resolved: self.using_fallback = True return FALLBACK_CHANNELS.copy() return resolved
  • Core helper function that executes Elasticsearch queries to the NixOS search API. Called by nixos_search to fetch search results.
    def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]: """Execute Elasticsearch query.""" try: resp = requests.post( f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10 ) resp.raise_for_status() data = resp.json() # Handle malformed responses gracefully if isinstance(data, dict) and "hits" in data: hits = data.get("hits", {}) if isinstance(hits, dict) and "hits" in hits: return list(hits.get("hits", [])) return [] except requests.Timeout as exc: raise APIError("API error: Connection timed out") from exc except requests.HTTPError as exc: raise APIError(f"API error: {str(exc)}") from exc except Exception as exc: raise APIError(f"API error: {str(exc)}") from exc
  • Helper function that provides channel suggestions for invalid channels in nixos_search error responses.
    def get_channel_suggestions(invalid_channel: str) -> str: """Get helpful suggestions for invalid channels.""" channels = get_channels() available = list(channels.keys()) suggestions = [] # Find similar channel names invalid_lower = invalid_channel.lower() for channel in available: if invalid_lower in channel.lower() or channel.lower() in invalid_lower: suggestions.append(channel) if not suggestions: # Fallback to most common channels common = ["unstable", "stable", "beta"] # Also include version numbers version_channels = [ch for ch in available if "." in ch and ch.replace(".", "").isdigit()] common.extend(version_channels[:2]) # Add up to 2 version channels suggestions = [ch for ch in common if ch in available] if not suggestions: suggestions = available[:4] # First 4 available return f"Available channels: {', '.join(suggestions)}"
  • Internal helper implementation for flake searches, delegated to by nixos_search when search_type='flakes'. Handles flake-specific indexing and deduplication.
    async def _nixos_flakes_search_impl(query: str, limit: int = 20, channel: str = "unstable") -> str: """Internal implementation for flakes search.""" if not 1 <= limit <= 100: return error("Limit must be 1-100") try: # Use the same alias as the web UI to get only flake packages flake_index = "latest-43-group-manual" # Build query for flakes if query.strip() == "" or query == "*": # Empty or wildcard query - get all flakes q: dict[str, Any] = {"match_all": {}} else: # Search query with multiple fields, including nested queries for flake_resolved q = { "bool": { "should": [ {"match": {"flake_name": {"query": query, "boost": 3}}}, {"match": {"flake_description": {"query": query, "boost": 2}}}, {"match": {"package_pname": {"query": query, "boost": 1.5}}}, {"match": {"package_description": query}}, {"wildcard": {"flake_name": {"value": f"*{query}*", "boost": 2.5}}}, {"wildcard": {"package_pname": {"value": f"*{query}*", "boost": 1}}}, {"prefix": {"flake_name": {"value": query, "boost": 2}}}, # Nested queries for flake_resolved fields { "nested": { "path": "flake_resolved", "query": {"term": {"flake_resolved.owner": query.lower()}}, "boost": 2, } }, { "nested": { "path": "flake_resolved", "query": {"term": {"flake_resolved.repo": query.lower()}}, "boost": 2, } }, ], "minimum_should_match": 1, } } # Execute search with package filter to match web UI search_query = {"bool": {"filter": [{"term": {"type": "package"}}], "must": [q]}} try: resp = requests.post( f"{NIXOS_API}/{flake_index}/_search", json={"query": search_query, "size": limit * 5, "track_total_hits": True}, # Get more results auth=NIXOS_AUTH, timeout=10, ) resp.raise_for_status() data = resp.json() hits = data.get("hits", {}).get("hits", []) total = data.get("hits", {}).get("total", {}).get("value", 0) except requests.HTTPError as e: if e.response and e.response.status_code == 404: # No flake indices found return error("Flake indices not found. Flake search may be temporarily unavailable.") raise # Format results as plain text if not hits: return f"""No flakes found matching '{query}'. Try searching for: • Popular flakes: nixpkgs, home-manager, flake-utils, devenv • By owner: nix-community, numtide, cachix • By topic: python, rust, nodejs, devops Browse flakes at: • GitHub: https://github.com/topics/nix-flakes • FlakeHub: https://flakehub.com/""" # Group hits by flake to avoid duplicates flakes = {} packages_only = [] # For entries without flake metadata for hit in hits: src = hit.get("_source", {}) # Get flake information flake_name = src.get("flake_name", "").strip() package_pname = src.get("package_pname", "") resolved = src.get("flake_resolved", {}) # Skip entries without any useful name if not flake_name and not package_pname: continue # If we have flake metadata (resolved), use it to create unique key if isinstance(resolved, dict) and (resolved.get("owner") or resolved.get("repo") or resolved.get("url")): owner = resolved.get("owner", "") repo = resolved.get("repo", "") url = resolved.get("url", "") # Create a unique key based on available info if owner and repo: flake_key = f"{owner}/{repo}" display_name = flake_name or repo or package_pname elif url: # Extract name from URL for git repos flake_key = url if "/" in url: display_name = flake_name or url.rstrip("/").split("/")[-1].replace(".git", "") or package_pname else: display_name = flake_name or package_pname else: flake_key = flake_name or package_pname display_name = flake_key # Initialize flake entry if not seen if flake_key not in flakes: flakes[flake_key] = { "name": display_name, "description": src.get("flake_description") or src.get("package_description", ""), "owner": owner, "repo": repo, "url": url, "type": resolved.get("type", ""), "packages": set(), # Use set to avoid duplicates } # Add package if available attr_name = src.get("package_attr_name", "") if attr_name: flakes[flake_key]["packages"].add(attr_name) elif flake_name: # Has flake_name but no resolved metadata flake_key = flake_name if flake_key not in flakes: flakes[flake_key] = { "name": flake_name, "description": src.get("flake_description") or src.get("package_description", ""), "owner": "", "repo": "", "type": "", "packages": set(), } # Add package if available attr_name = src.get("package_attr_name", "") if attr_name: flakes[flake_key]["packages"].add(attr_name) else: # Package without flake metadata - might still be relevant packages_only.append( { "name": package_pname, "description": src.get("package_description", ""), "attr_name": src.get("package_attr_name", ""), } ) # Build results results = [] # Show both total hits and unique flakes if total > len(flakes): results.append(f"Found {total:,} total matches ({len(flakes)} unique flakes) matching '{query}':\n") else: results.append(f"Found {len(flakes)} unique flakes matching '{query}':\n") for flake in flakes.values(): results.append(f"• {flake['name']}") if flake.get("owner") and flake.get("repo"): results.append( f" Repository: {flake['owner']}/{flake['repo']}" + (f" ({flake['type']})" if flake.get("type") else "") ) elif flake.get("url"): results.append(f" URL: {flake['url']}") if flake.get("description"): desc = flake["description"] if len(desc) > 200: desc = desc[:200] + "..." results.append(f" {desc}") if flake["packages"]: # Show max 5 packages, sorted packages = sorted(flake["packages"])[:5] if len(flake["packages"]) > 5: results.append(f" Packages: {', '.join(packages)}, ... ({len(flake['packages'])} total)") else: results.append(f" Packages: {', '.join(packages)}") results.append("") return "\n".join(results).strip() except Exception as e: return error(str(e))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/utensils/mcp-nixos'

If you have feedback or need assistance with the MCP directory API, please join our Discord server