Skip to main content
Glama

nixos_search

Search NixOS packages, options, programs, or flakes to find accurate system configurations. Specify query, type, limit, and channel for precise results. Streamline NixOS resource discovery with structured output.

Instructions

Search NixOS packages, options, or programs.

Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")

Returns: Plain text results with bullet points or error message

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
channelNounstable
limitNo
queryYes
search_typeNopackages

Implementation Reference

  • The primary handler function for the 'nixos_search' tool, registered via @mcp.tool(). Handles searches for NixOS packages, options, programs, and flakes using Elasticsearch API. Includes input validation, channel resolution, query building, API calls, and plain-text result formatting.
    @mcp.tool() async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str: """Search NixOS packages, options, or programs. Args: query: Search term to look for search_type: Type of search - "packages", "options", "programs", or "flakes" limit: Maximum number of results to return (1-100) channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05") Returns: Plain text results with bullet points or error message """ if search_type not in ["packages", "options", "programs", "flakes"]: return error(f"Invalid type '{search_type}'") channels = get_channels() if channel not in channels: suggestions = get_channel_suggestions(channel) return error(f"Invalid channel '{channel}'. {suggestions}") if not 1 <= limit <= 100: return error("Limit must be 1-100") # Redirect flakes to dedicated function if search_type == "flakes": return await _nixos_flakes_search_impl(query, limit) try: # Build query with correct field names if search_type == "packages": q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_pname": {"query": query, "boost": 3}}}, {"match": {"package_description": query}}, ], "minimum_should_match": 1, } } elif search_type == "options": # Use wildcard for option names to handle hierarchical names like services.nginx.enable q = { "bool": { "must": [{"term": {"type": "option"}}], "should": [ {"wildcard": {"option_name": f"*{query}*"}}, {"match": {"option_description": query}}, ], "minimum_should_match": 1, } } else: # programs q = { "bool": { "must": [{"term": {"type": "package"}}], "should": [ {"match": {"package_programs": {"query": query, "boost": 2}}}, {"match": {"package_pname": query}}, ], "minimum_should_match": 1, } } hits = es_query(channels[channel], q, limit) # Format results as plain text if not hits: return f"No {search_type} found matching '{query}'" results = [] results.append(f"Found {len(hits)} {search_type} matching '{query}':\n") for hit in hits: src = hit.get("_source", {}) if search_type == "packages": name = src.get("package_pname", "") version = src.get("package_pversion", "") desc = src.get("package_description", "") results.append(f"• {name} ({version})") if desc: results.append(f" {desc}") results.append("") elif search_type == "options": name = src.get("option_name", "") opt_type = src.get("option_type", "") desc = src.get("option_description", "") # Strip HTML tags from description if desc and "<rendered-html>" in desc: # Remove outer rendered-html tags desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "") # Remove common HTML tags desc = re.sub(r"<[^>]+>", "", desc) desc = desc.strip() results.append(f"• {name}") if opt_type: results.append(f" Type: {opt_type}") if desc: results.append(f" {desc}") results.append("") else: # programs programs = src.get("package_programs", []) pkg_name = src.get("package_pname", "") # Check if query matches any program exactly (case-insensitive) query_lower = query.lower() matched_programs = [p for p in programs if p.lower() == query_lower] for prog in matched_programs: results.append(f"• {prog} (provided by {pkg_name})") results.append("") return "\n".join(results).strip() except Exception as e: return error(str(e))
  • Helper function used by nixos_search to execute Elasticsearch queries against NixOS search API indices.
    def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]: """Execute Elasticsearch query.""" try: resp = requests.post( f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10 ) resp.raise_for_status() data = resp.json() # Handle malformed responses gracefully if isinstance(data, dict) and "hits" in data: hits = data.get("hits", {}) if isinstance(hits, dict) and "hits" in hits: return list(hits.get("hits", [])) return [] except requests.Timeout as exc: raise APIError("API error: Connection timed out") from exc except requests.HTTPError as exc: raise APIError(f"API error: {str(exc)}") from exc except Exception as exc: raise APIError(f"API error: {str(exc)}") from exc
  • Helper class for caching and dynamically discovering/resolving NixOS channel indices used by nixos_search for channel validation and queries.
    class ChannelCache: """Cache for discovered channels and resolved mappings.""" def __init__(self) -> None: """Initialize empty cache.""" self.available_channels: dict[str, str] | None = None self.resolved_channels: dict[str, str] | None = None self.using_fallback: bool = False def get_available(self) -> dict[str, str]: """Get available channels, discovering if needed.""" if self.available_channels is None: self.available_channels = self._discover_available_channels() return self.available_channels if self.available_channels is not None else {} def get_resolved(self) -> dict[str, str]: """Get resolved channel mappings, resolving if needed.""" if self.resolved_channels is None: self.resolved_channels = self._resolve_channels() return self.resolved_channels if self.resolved_channels is not None else {} def _discover_available_channels(self) -> dict[str, str]: """Discover available NixOS channels by testing API patterns.""" # Test multiple generation patterns (43, 44, 45) and versions generations = [43, 44, 45, 46] # Future-proof # Removed deprecated versions (20.09, 24.11 - EOL June 2025) versions = ["unstable", "25.05", "25.11", "26.05", "30.05"] # Current and future available = {} for gen in generations: for version in versions: pattern = f"latest-{gen}-nixos-{version}" try: resp = requests.post( f"{NIXOS_API}/{pattern}/_count", json={"query": {"match_all": {}}}, auth=NIXOS_AUTH, timeout=10, # Increased from 5s to 10s for slow connections ) if resp.status_code == 200: count = resp.json().get("count", 0) if count > 0: available[pattern] = f"{count:,} documents" except Exception: continue return available def _resolve_channels(self) -> dict[str, str]: """Resolve user-friendly channel names to actual indices.""" available = self.get_available() # If no channels were discovered, use fallback channels if not available: self.using_fallback = True return FALLBACK_CHANNELS.copy() resolved = {} # Find unstable (should be consistent) unstable_pattern = None for pattern in available: if "unstable" in pattern: unstable_pattern = pattern break if unstable_pattern: resolved["unstable"] = unstable_pattern # Find stable release (highest version number with most documents) stable_candidates = [] for pattern, count_str in available.items(): if "unstable" not in pattern: # Extract version (e.g., "25.05" from "latest-43-nixos-25.05") parts = pattern.split("-") if len(parts) >= 4: version = parts[3] # "25.05" try: # Parse version for comparison (25.05 -> 25.05) major, minor = map(int, version.split(".")) count = int(count_str.replace(",", "").replace(" documents", "")) stable_candidates.append((major, minor, version, pattern, count)) except (ValueError, IndexError): continue if stable_candidates: # Sort by version (descending), then by document count (descending) as tiebreaker stable_candidates.sort(key=lambda x: (x[0], x[1], x[4]), reverse=True) current_stable = stable_candidates[0] resolved["stable"] = current_stable[3] # pattern resolved[current_stable[2]] = current_stable[3] # version -> pattern # Add other version mappings (prefer higher generation/count for same version) version_patterns: dict[str, tuple[str, int]] = {} for _major, _minor, version, pattern, count in stable_candidates: if version not in version_patterns or count > version_patterns[version][1]: version_patterns[version] = (pattern, count) for version, (pattern, _count) in version_patterns.items(): resolved[version] = pattern # Add beta (alias for stable) if "stable" in resolved: resolved["beta"] = resolved["stable"] # If we still have no channels after all that, use fallback if not resolved: self.using_fallback = True return FALLBACK_CHANNELS.copy() return resolved
  • Internal helper implementation for flake searches, called from nixos_search when search_type='flakes'. Handles flake-specific Elasticsearch queries and result grouping.
    async def _nixos_flakes_search_impl(query: str, limit: int = 20, channel: str = "unstable") -> str: """Internal implementation for flakes search.""" if not 1 <= limit <= 100: return error("Limit must be 1-100") try: # Use the same alias as the web UI to get only flake packages flake_index = "latest-43-group-manual" # Build query for flakes if query.strip() == "" or query == "*": # Empty or wildcard query - get all flakes q: dict[str, Any] = {"match_all": {}} else: # Search query with multiple fields, including nested queries for flake_resolved q = { "bool": { "should": [ {"match": {"flake_name": {"query": query, "boost": 3}}}, {"match": {"flake_description": {"query": query, "boost": 2}}}, {"match": {"package_pname": {"query": query, "boost": 1.5}}}, {"match": {"package_description": query}}, {"wildcard": {"flake_name": {"value": f"*{query}*", "boost": 2.5}}}, {"wildcard": {"package_pname": {"value": f"*{query}*", "boost": 1}}}, {"prefix": {"flake_name": {"value": query, "boost": 2}}}, # Nested queries for flake_resolved fields { "nested": { "path": "flake_resolved", "query": {"term": {"flake_resolved.owner": query.lower()}}, "boost": 2, } }, { "nested": { "path": "flake_resolved", "query": {"term": {"flake_resolved.repo": query.lower()}}, "boost": 2, } }, ], "minimum_should_match": 1, } } # Execute search with package filter to match web UI search_query = {"bool": {"filter": [{"term": {"type": "package"}}], "must": [q]}} try: resp = requests.post( f"{NIXOS_API}/{flake_index}/_search", json={"query": search_query, "size": limit * 5, "track_total_hits": True}, # Get more results auth=NIXOS_AUTH, timeout=10, ) resp.raise_for_status() data = resp.json() hits = data.get("hits", {}).get("hits", []) total = data.get("hits", {}).get("total", {}).get("value", 0) except requests.HTTPError as e: if e.response and e.response.status_code == 404: # No flake indices found return error("Flake indices not found. Flake search may be temporarily unavailable.") raise # Format results as plain text if not hits: return f"""No flakes found matching '{query}'. Try searching for: • Popular flakes: nixpkgs, home-manager, flake-utils, devenv • By owner: nix-community, numtide, cachix • By topic: python, rust, nodejs, devops Browse flakes at: • GitHub: https://github.com/topics/nix-flakes • FlakeHub: https://flakehub.com/""" # Group hits by flake to avoid duplicates flakes = {} packages_only = [] # For entries without flake metadata for hit in hits: src = hit.get("_source", {}) # Get flake information flake_name = src.get("flake_name", "").strip() package_pname = src.get("package_pname", "") resolved = src.get("flake_resolved", {}) # Skip entries without any useful name if not flake_name and not package_pname: continue # If we have flake metadata (resolved), use it to create unique key if isinstance(resolved, dict) and (resolved.get("owner") or resolved.get("repo") or resolved.get("url")): owner = resolved.get("owner", "") repo = resolved.get("repo", "") url = resolved.get("url", "") # Create a unique key based on available info if owner and repo: flake_key = f"{owner}/{repo}" display_name = flake_name or repo or package_pname elif url: # Extract name from URL for git repos flake_key = url if "/" in url: display_name = flake_name or url.rstrip("/").split("/")[-1].replace(".git", "") or package_pname else: display_name = flake_name or package_pname else: flake_key = flake_name or package_pname display_name = flake_key # Initialize flake entry if not seen if flake_key not in flakes: flakes[flake_key] = { "name": display_name, "description": src.get("flake_description") or src.get("package_description", ""), "owner": owner, "repo": repo, "url": url, "type": resolved.get("type", ""), "packages": set(), # Use set to avoid duplicates } # Add package if available attr_name = src.get("package_attr_name", "") if attr_name: flakes[flake_key]["packages"].add(attr_name) elif flake_name: # Has flake_name but no resolved metadata flake_key = flake_name if flake_key not in flakes: flakes[flake_key] = { "name": flake_name, "description": src.get("flake_description") or src.get("package_description", ""), "owner": "", "repo": "", "type": "", "packages": set(), } # Add package if available attr_name = src.get("package_attr_name", "") if attr_name: flakes[flake_key]["packages"].add(attr_name) else: # Package without flake metadata - might still be relevant packages_only.append( { "name": package_pname, "description": src.get("package_description", ""), "attr_name": src.get("package_attr_name", ""), } ) # Build results results = [] # Show both total hits and unique flakes if total > len(flakes): results.append(f"Found {total:,} total matches ({len(flakes)} unique flakes) matching '{query}':\n") else: results.append(f"Found {len(flakes)} unique flakes matching '{query}':\n") for flake in flakes.values(): results.append(f"• {flake['name']}") if flake.get("owner") and flake.get("repo"): results.append( f" Repository: {flake['owner']}/{flake['repo']}" + (f" ({flake['type']})" if flake.get("type") else "") ) elif flake.get("url"): results.append(f" URL: {flake['url']}") if flake.get("description"): desc = flake["description"] if len(desc) > 200: desc = desc[:200] + "..." results.append(f" {desc}") if flake["packages"]: # Show max 5 packages, sorted packages = sorted(flake["packages"])[:5] if len(flake["packages"]) > 5: results.append(f" Packages: {', '.join(packages)}, ... ({len(flake['packages'])} total)") else: results.append(f" Packages: {', '.join(packages)}") results.append("") return "\n".join(results).strip() except Exception as e: return error(str(e))
  • Initialization of the FastMCP server instance where tools like nixos_search are registered via decorators.
    mcp = FastMCP("mcp-nixos")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/utensils/mcp-nixos'

If you have feedback or need assistance with the MCP directory API, please join our Discord server