Skip to main content
Glama
searxng_mcp.py31.3 kB
#!/usr/bin/python # coding: utf-8 import argparse import os import sys import requests import yaml import random import logging from threading import local from typing import Optional, Dict, List, Union, Any from eunomia_mcp.middleware import EunomiaMcpMiddleware from pydantic import Field from fastmcp import FastMCP, Context from fastmcp.server.auth.oidc_proxy import OIDCProxy from fastmcp.server.auth import OAuthProxy, RemoteAuthProvider from fastmcp.server.auth.providers.jwt import JWTVerifier, StaticTokenVerifier from fastmcp.server.middleware import MiddlewareContext, Middleware from fastmcp.server.middleware.logging import LoggingMiddleware from fastmcp.server.middleware.timing import TimingMiddleware from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware from fastmcp.utilities.logging import get_logger # Thread-local storage for user token local = local() logger = get_logger(name="SearXNG.TokenMiddleware") logger.setLevel(logging.DEBUG) def to_boolean(string: Union[str, bool] = None) -> bool: if isinstance(string, bool): return string if not string: return False normalized = str(string).strip().lower() true_values = {"t", "true", "y", "yes", "1"} false_values = {"f", "false", "n", "no", "0"} if normalized in true_values: return True elif normalized in false_values: return False else: raise ValueError(f"Cannot convert '{string}' to boolean") # Global variables for SearXNG configuration SEARXNG_INSTANCE_URL = os.environ.get("SEARXNG_INSTANCE_URL", None) SEARXNG_USERNAME = os.environ.get("SEARXNG_USERNAME", None) SEARXNG_PASSWORD = os.environ.get("SEARXNG_PASSWORD", None) HAS_BASIC_AUTH = bool(SEARXNG_USERNAME and SEARXNG_PASSWORD) INSTANCES_LIST_URL = "https://raw.githubusercontent.com/searxng/searx-instances/refs/heads/master/searxinstances/instances.yml" USE_RANDOM_INSTANCE = to_boolean(os.environ.get("USE_RANDOM_INSTANCE", "false").lower()) config = { "enable_delegation": to_boolean(os.environ.get("ENABLE_DELEGATION", "False")), "audience": os.environ.get("AUDIENCE", None), "delegated_scopes": os.environ.get("DELEGATED_SCOPES", "api"), "token_endpoint": None, # Will be fetched dynamically from OIDC config "oidc_client_id": os.environ.get("OIDC_CLIENT_ID", None), "oidc_client_secret": os.environ.get("OIDC_CLIENT_SECRET", None), "oidc_config_url": os.environ.get("OIDC_CONFIG_URL", None), "jwt_jwks_uri": os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI", None), "jwt_issuer": os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER", None), "jwt_audience": os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE", None), "jwt_algorithm": os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM", None), "jwt_secret": os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY", None), "jwt_required_scopes": os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES", None), } class UserTokenMiddleware(Middleware): async def on_request(self, context: MiddlewareContext, call_next): logger.debug(f"Delegation enabled: {config['enable_delegation']}") if config["enable_delegation"]: headers = getattr(context.message, "headers", {}) auth = headers.get("Authorization") if auth and auth.startswith("Bearer "): token = auth.split(" ")[1] local.user_token = token local.user_claims = None # Will be populated by JWTVerifier # Extract claims if JWTVerifier already validated if hasattr(context, "auth") and hasattr(context.auth, "claims"): local.user_claims = context.auth.claims logger.info( "Stored JWT claims for delegation", extra={"subject": context.auth.claims.get("sub")}, ) else: logger.debug("JWT claims not yet available (will be after auth)") logger.info("Extracted Bearer token for delegation") else: logger.error("Missing or invalid Authorization header") raise ValueError("Missing or invalid Authorization header") return await call_next(context) class JWTClaimsLoggingMiddleware(Middleware): async def on_response(self, context: MiddlewareContext, call_next): response = await call_next(context) logger.info(f"JWT Response: {response}") if hasattr(context, "auth") and hasattr(context.auth, "claims"): logger.info( "JWT Authentication Success", extra={ "subject": context.auth.claims.get("sub"), "client_id": context.auth.claims.get("client_id"), "scopes": context.auth.claims.get("scope"), }, ) # Function to fetch and select a random SearXNG instance def get_random_searxng_instance() -> str: logger = logging.getLogger("SearXNG") logger.debug("[SearXNG] Fetching list of SearXNG instances...") try: response = requests.get(INSTANCES_LIST_URL) response.raise_for_status() instances_data = yaml.safe_load(response.text) # Filter for standard internet instances (not onion or hidden) standard_instances: List[str] = [] for url, data in instances_data.items(): instance_data = data or {} comments = instance_data.get("comments", []) network_type = instance_data.get("network_type") if ( not comments or ("hidden" not in comments and "onion" not in comments) ) and (not network_type or network_type == "normal"): standard_instances.append(url) logger.debug(f"[SearXNG] Found {len(standard_instances)} standard instances") if not standard_instances: raise ValueError("No standard SearXNG instances found") # Select a random instance random_instance = random.choice(standard_instances) logger.debug(f"[SearXNG] Selected random instance: {random_instance}") return random_instance except Exception as e: logger.error(f"[SearXNG] Error fetching instances: {str(e)}") raise ValueError("Failed to fetch SearXNG instances list") from e def register_tools(mcp: FastMCP): @mcp.tool( annotations={ "title": "SearXNG Search", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, tags={"search"}, ) async def web_search( query: str = Field(description="Search query", default=None), language: str = Field( description="Language code for search results (e.g., 'en', 'de', 'fr'). Default: 'en'", default="en", ), time_range: Optional[str] = Field( description="Time range for search results. Options: 'day', 'week', 'month', 'year'. Default: null (no time restriction).", default=None, ), categories: Optional[List[str]] = Field( description="Categories to search in (e.g., 'general', 'images', 'news'). Default: null (all categories).", default=None, ), engines: Optional[List[str]] = Field( description="Specific search engines to use. Default: null (all available engines).", default=None, ), safesearch: int = Field( description="Safe search level: 0 (off), 1 (moderate), 2 (strict). Default: 1 (moderate).", default=1, ), pageno: int = Field( description="Page number for results. Must be minimum 1. Default: 1.", default=1, ge=1, ), max_results: int = Field( description="Maximum number of search results to return. Range: 1-50. Default: 10.", default=10, ge=1, le=50, ), ctx: Context = Field( description="MCP context for progress reporting.", default=None ), ) -> Dict[str, Any]: """ Perform web searches using SearXNG, a privacy-respecting metasearch engine. Returns relevant web content with customizable parameters. Returns a Dictionary response with status, message, data (search results), and error if any. """ logger = logging.getLogger("SearXNG") logger.debug(f"[SearXNG] Searching for: {query}") try: if not query: return { "status": 400, "message": "Invalid input: query must not be empty", "data": None, "error": "query must not be empty", } # Prepare search parameters search_params = { "q": query, "format": "json", "language": language, "safesearch": safesearch, "pageno": pageno, } if time_range: search_params["time_range"] = time_range if categories: search_params["categories"] = ",".join(categories) if engines: search_params["engines"] = ",".join(engines) # Report initial progress if ctx is available if ctx: await ctx.report_progress(progress=0, total=100) logger.debug("Reported initial progress: 0/100") # Make request to SearXNG auth = (SEARXNG_USERNAME, SEARXNG_PASSWORD) if HAS_BASIC_AUTH else None response = requests.get( f"{SEARXNG_INSTANCE_URL}/search", params=search_params, auth=auth ) response.raise_for_status() search_response: Dict[str, Any] = response.json() # Limit results limited_results = search_response.get("results", [])[:max_results] # Construct final response final_response = { **search_response, "results": limited_results, "number_of_results": len(limited_results), } # Report completion if ctx: await ctx.report_progress(progress=100, total=100) logger.debug("Reported final progress: 100/100") logger.debug(f"[SearXNG] Search completed for query: {query}") return { "status": 200, "message": "Search completed successfully", "data": final_response, "error": None, } except requests.exceptions.HTTPError as e: status_code = e.response.status_code if e.response else None if status_code == 401: error_msg = "Authentication failed. Please check your SearXNG username and password." else: error_msg = f"SearXNG API error: {e.response.json().get('message', str(e)) if e.response else str(e)}" logger.error(f"[SearXNG Error] {error_msg}") return { "status": status_code or 500, "message": "Failed to perform search", "data": None, "error": error_msg, } except Exception as e: logger.error(f"[SearXNG Error] {str(e)}") return { "status": 500, "message": "Failed to perform search", "data": None, "error": str(e), } def register_prompts(mcp: FastMCP): # Prompts @mcp.prompt def search(topic) -> str: return f"Searching the web for: {topic}." def searxng_mcp(): parser = argparse.ArgumentParser(description="SearXNG MCP Server") parser.add_argument( "-t", "--transport", default="stdio", choices=["stdio", "http", "sse"], help="Transport method: 'stdio', 'http', or 'sse' [legacy] (default: stdio)", ) parser.add_argument( "-s", "--host", default="0.0.0.0", help="Host address for HTTP transport (default: 0.0.0.0)", ) parser.add_argument( "-p", "--port", type=int, default=8000, help="Port number for HTTP transport (default: 8000)", ) parser.add_argument( "--auth-type", default="none", choices=["none", "static", "jwt", "oauth-proxy", "oidc-proxy", "remote-oauth"], help="Authentication type for MCP server: 'none' (disabled), 'static' (internal), 'jwt' (external token verification), 'oauth-proxy', 'oidc-proxy', 'remote-oauth' (external) (default: none)", ) # JWT/Token params parser.add_argument( "--token-jwks-uri", default=None, help="JWKS URI for JWT verification" ) parser.add_argument( "--token-issuer", default=None, help="Issuer for JWT verification" ) parser.add_argument( "--token-audience", default=None, help="Audience for JWT verification" ) parser.add_argument( "--token-algorithm", default=os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM"), choices=[ "HS256", "HS384", "HS512", "RS256", "RS384", "RS512", "ES256", "ES384", "ES512", ], help="JWT signing algorithm (required for HMAC or static key). Auto-detected for JWKS.", ) parser.add_argument( "--token-secret", default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"), help="Shared secret for HMAC (HS*) or PEM public key for static asymmetric verification.", ) parser.add_argument( "--token-public-key", default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"), help="Path to PEM public key file or inline PEM string (for static asymmetric keys).", ) parser.add_argument( "--required-scopes", default=os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES"), help="Comma-separated list of required scopes (e.g., searxng.read,searxng.write).", ) # OAuth Proxy params parser.add_argument( "--oauth-upstream-auth-endpoint", default=None, help="Upstream authorization endpoint for OAuth Proxy", ) parser.add_argument( "--oauth-upstream-token-endpoint", default=None, help="Upstream token endpoint for OAuth Proxy", ) parser.add_argument( "--oauth-upstream-client-id", default=None, help="Upstream client ID for OAuth Proxy", ) parser.add_argument( "--oauth-upstream-client-secret", default=None, help="Upstream client secret for OAuth Proxy", ) parser.add_argument( "--oauth-base-url", default=None, help="Base URL for OAuth Proxy" ) # OIDC Proxy params parser.add_argument( "--oidc-config-url", default=None, help="OIDC configuration URL" ) parser.add_argument("--oidc-client-id", default=None, help="OIDC client ID") parser.add_argument("--oidc-client-secret", default=None, help="OIDC client secret") parser.add_argument("--oidc-base-url", default=None, help="Base URL for OIDC Proxy") # Remote OAuth params parser.add_argument( "--remote-auth-servers", default=None, help="Comma-separated list of authorization servers for Remote OAuth", ) parser.add_argument( "--remote-base-url", default=None, help="Base URL for Remote OAuth" ) # Common parser.add_argument( "--allowed-client-redirect-uris", default=None, help="Comma-separated list of allowed client redirect URIs", ) # Eunomia params parser.add_argument( "--eunomia-type", default="none", choices=["none", "embedded", "remote"], help="Eunomia authorization type: 'none' (disabled), 'embedded' (built-in), 'remote' (external) (default: none)", ) parser.add_argument( "--eunomia-policy-file", default="mcp_policies.json", help="Policy file for embedded Eunomia (default: mcp_policies.json)", ) parser.add_argument( "--eunomia-remote-url", default=None, help="URL for remote Eunomia server" ) # Delegation params parser.add_argument( "--enable-delegation", action="store_true", default=to_boolean(os.environ.get("ENABLE_DELEGATION", "False")), help="Enable OIDC token delegation", ) parser.add_argument( "--audience", default=os.environ.get("AUDIENCE", None), help="Audience for the delegated token", ) parser.add_argument( "--delegated-scopes", default=os.environ.get("DELEGATED_SCOPES", "api"), help="Scopes for the delegated token (space-separated)", ) parser.add_argument( "--openapi-file", default=None, help="Path to the OpenAPI JSON file to import additional tools from", ) parser.add_argument( "--openapi-base-url", default=None, help="Base URL for the OpenAPI client (overrides instance URL)", ) parser.add_argument( "--openapi-use-token", action="store_true", help="Use the incoming Bearer token (from MCP request) to authenticate OpenAPI import", ) parser.add_argument( "--openapi-username", default=os.getenv("OPENAPI_USERNAME"), help="Username for basic auth during OpenAPI import", ) parser.add_argument( "--openapi-password", default=os.getenv("OPENAPI_PASSWORD"), help="Password for basic auth during OpenAPI import", ) parser.add_argument( "--openapi-client-id", default=os.getenv("OPENAPI_CLIENT_ID"), help="OAuth client ID for OpenAPI import", ) parser.add_argument( "--openapi-client-secret", default=os.getenv("OPENAPI_CLIENT_SECRET"), help="OAuth client secret for OpenAPI import", ) args = parser.parse_args() if args.port < 0 or args.port > 65535: print(f"Error: Port {args.port} is out of valid range (0-65535).") sys.exit(1) # Update config with CLI arguments config["enable_delegation"] = args.enable_delegation config["audience"] = args.audience or config["audience"] config["delegated_scopes"] = args.delegated_scopes or config["delegated_scopes"] config["oidc_config_url"] = args.oidc_config_url or config["oidc_config_url"] config["oidc_client_id"] = args.oidc_client_id or config["oidc_client_id"] config["oidc_client_secret"] = ( args.oidc_client_secret or config["oidc_client_secret"] ) # Configure delegation if enabled if config["enable_delegation"]: if args.auth_type != "oidc-proxy": logger.error("Token delegation requires auth-type=oidc-proxy") sys.exit(1) if not config["audience"]: logger.error("audience is required for delegation") sys.exit(1) if not all( [ config["oidc_config_url"], config["oidc_client_id"], config["oidc_client_secret"], ] ): logger.error( "Delegation requires complete OIDC configuration (oidc-config-url, oidc-client-id, oidc-client-secret)" ) sys.exit(1) # Fetch OIDC configuration to get token_endpoint try: logger.info( "Fetching OIDC configuration", extra={"oidc_config_url": config["oidc_config_url"]}, ) oidc_config_resp = requests.get(config["oidc_config_url"]) oidc_config_resp.raise_for_status() oidc_config = oidc_config_resp.json() config["token_endpoint"] = oidc_config.get("token_endpoint") if not config["token_endpoint"]: logger.error("No token_endpoint found in OIDC configuration") raise ValueError("No token_endpoint found in OIDC configuration") logger.info( "OIDC configuration fetched successfully", extra={"token_endpoint": config["token_endpoint"]}, ) except Exception as e: print(f"Failed to fetch OIDC configuration: {e}") logger.error( "Failed to fetch OIDC configuration", extra={"error_type": type(e).__name__, "error_message": str(e)}, ) sys.exit(1) # Set auth based on type auth = None allowed_uris = ( args.allowed_client_redirect_uris.split(",") if args.allowed_client_redirect_uris else None ) if args.auth_type == "none": auth = None elif args.auth_type == "static": auth = StaticTokenVerifier( tokens={ "test-token": {"client_id": "test-user", "scopes": ["read", "write"]}, "admin-token": {"client_id": "admin", "scopes": ["admin"]}, } ) elif args.auth_type == "jwt": # Fallback to env vars if not provided via CLI jwks_uri = args.token_jwks_uri or os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI") issuer = args.token_issuer or os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER") audience = args.token_audience or os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE") algorithm = args.token_algorithm secret_or_key = args.token_secret or args.token_public_key public_key_pem = None if not (jwks_uri or secret_or_key): logger.error( "JWT auth requires either --token-jwks-uri or --token-secret/--token-public-key" ) sys.exit(1) if not (issuer and audience): logger.error("JWT requires --token-issuer and --token-audience") sys.exit(1) # Load static public key from file if path is given if args.token_public_key and os.path.isfile(args.token_public_key): try: with open(args.token_public_key, "r") as f: public_key_pem = f.read() logger.info(f"Loaded static public key from {args.token_public_key}") except Exception as e: print(f"Failed to read public key file: {e}") logger.error(f"Failed to read public key file: {e}") sys.exit(1) elif args.token_public_key: public_key_pem = args.token_public_key # Inline PEM # Validation: Conflicting options if jwks_uri and (algorithm or secret_or_key): logger.warning( "JWKS mode ignores --token-algorithm and --token-secret/--token-public-key" ) # HMAC mode if algorithm and algorithm.startswith("HS"): if not secret_or_key: logger.error(f"HMAC algorithm {algorithm} requires --token-secret") sys.exit(1) if jwks_uri: logger.error("Cannot use --token-jwks-uri with HMAC") sys.exit(1) public_key = secret_or_key else: public_key = public_key_pem # Required scopes required_scopes = None if args.required_scopes: required_scopes = [ s.strip() for s in args.required_scopes.split(",") if s.strip() ] try: auth = JWTVerifier( jwks_uri=jwks_uri, public_key=public_key, issuer=issuer, audience=audience, algorithm=( algorithm if algorithm and algorithm.startswith("HS") else None ), required_scopes=required_scopes, ) logger.info( "JWTVerifier configured", extra={ "mode": ( "JWKS" if jwks_uri else ( "HMAC" if algorithm and algorithm.startswith("HS") else "Static Key" ) ), "algorithm": algorithm, "required_scopes": required_scopes, }, ) except Exception as e: print(f"Failed to initialize JWTVerifier: {e}") logger.error(f"Failed to initialize JWTVerifier: {e}") sys.exit(1) elif args.auth_type == "oauth-proxy": if not ( args.oauth_upstream_auth_endpoint and args.oauth_upstream_token_endpoint and args.oauth_upstream_client_id and args.oauth_upstream_client_secret and args.oauth_base_url and args.token_jwks_uri and args.token_issuer and args.token_audience ): print( "oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, " "oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, " "token-issuer, token-audience" ) logger.error( "oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, " "oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, " "token-issuer, token-audience", extra={ "auth_endpoint": args.oauth_upstream_auth_endpoint, "token_endpoint": args.oauth_upstream_token_endpoint, "client_id": args.oauth_upstream_client_id, "base_url": args.oauth_base_url, "jwks_uri": args.token_jwks_uri, "issuer": args.token_issuer, "audience": args.token_audience, }, ) sys.exit(1) token_verifier = JWTVerifier( jwks_uri=args.token_jwks_uri, issuer=args.token_issuer, audience=args.token_audience, ) auth = OAuthProxy( upstream_authorization_endpoint=args.oauth_upstream_auth_endpoint, upstream_token_endpoint=args.oauth_upstream_token_endpoint, upstream_client_id=args.oauth_upstream_client_id, upstream_client_secret=args.oauth_upstream_client_secret, token_verifier=token_verifier, base_url=args.oauth_base_url, allowed_client_redirect_uris=allowed_uris, ) elif args.auth_type == "oidc-proxy": if not ( args.oidc_config_url and args.oidc_client_id and args.oidc_client_secret and args.oidc_base_url ): logger.error( "oidc-proxy requires oidc-config-url, oidc-client-id, oidc-client-secret, oidc-base-url", extra={ "config_url": args.oidc_config_url, "client_id": args.oidc_client_id, "base_url": args.oidc_base_url, }, ) sys.exit(1) auth = OIDCProxy( config_url=args.oidc_config_url, client_id=args.oidc_client_id, client_secret=args.oidc_client_secret, base_url=args.oidc_base_url, allowed_client_redirect_uris=allowed_uris, ) elif args.auth_type == "remote-oauth": if not ( args.remote_auth_servers and args.remote_base_url and args.token_jwks_uri and args.token_issuer and args.token_audience ): logger.error( "remote-oauth requires remote-auth-servers, remote-base-url, token-jwks-uri, token-issuer, token-audience", extra={ "auth_servers": args.remote_auth_servers, "base_url": args.remote_base_url, "jwks_uri": args.token_jwks_uri, "issuer": args.token_issuer, "audience": args.token_audience, }, ) sys.exit(1) auth_servers = [url.strip() for url in args.remote_auth_servers.split(",")] token_verifier = JWTVerifier( jwks_uri=args.token_jwks_uri, issuer=args.token_issuer, audience=args.token_audience, ) auth = RemoteAuthProvider( token_verifier=token_verifier, authorization_servers=auth_servers, base_url=args.remote_base_url, ) # === 2. Build Middleware List === middlewares: List[ Union[ UserTokenMiddleware, ErrorHandlingMiddleware, RateLimitingMiddleware, TimingMiddleware, LoggingMiddleware, JWTClaimsLoggingMiddleware, EunomiaMcpMiddleware, ] ] = [ ErrorHandlingMiddleware(include_traceback=True, transform_errors=True), RateLimitingMiddleware(max_requests_per_second=10.0, burst_capacity=20), TimingMiddleware(), LoggingMiddleware(), JWTClaimsLoggingMiddleware(), ] if config["enable_delegation"] or args.auth_type == "jwt": middlewares.insert(0, UserTokenMiddleware()) # Must be first if args.eunomia_type in ["embedded", "remote"]: try: from eunomia_mcp import create_eunomia_middleware policy_file = args.eunomia_policy_file or "mcp_policies.json" eunomia_endpoint = ( args.eunomia_remote_url if args.eunomia_type == "remote" else None ) eunomia_mw = create_eunomia_middleware( policy_file=policy_file, eunomia_endpoint=eunomia_endpoint ) middlewares.append(eunomia_mw) logger.info(f"Eunomia middleware enabled ({args.eunomia_type})") except Exception as e: print(f"Failed to load Eunomia middleware: {e}") logger.error("Failed to load Eunomia middleware", extra={"error": str(e)}) sys.exit(1) mcp = FastMCP(name="SearXNGServer", auth=auth) register_tools(mcp) register_prompts(mcp) for mw in middlewares: mcp.add_middleware(mw) print("\nStarting SearXNG MCP Server") print(f" Transport: {args.transport.upper()}") print(f" Auth: {args.auth_type}") print(f" Delegation: {'ON' if config['enable_delegation'] else 'OFF'}") print(f" Eunomia: {args.eunomia_type}") if args.transport == "stdio": mcp.run(transport="stdio") elif args.transport == "http": mcp.run(transport="http", host=args.host, port=args.port) elif args.transport == "sse": mcp.run(transport="sse", host=args.host, port=args.port) else: logger.error("Invalid transport", extra={"transport": args.transport}) sys.exit(1) if __name__ == "__main__": searxng_mcp()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Knuckles-Team/searxng-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server