#!/usr/bin/python
# coding: utf-8
import argparse
import os
import sys
import requests
import yaml
import random
import logging
from threading import local
from typing import Optional, Dict, List, Union, Any
from eunomia_mcp.middleware import EunomiaMcpMiddleware
from pydantic import Field
from fastmcp import FastMCP, Context
from fastmcp.server.auth.oidc_proxy import OIDCProxy
from fastmcp.server.auth import OAuthProxy, RemoteAuthProvider
from fastmcp.server.auth.providers.jwt import JWTVerifier, StaticTokenVerifier
from fastmcp.server.middleware import MiddlewareContext, Middleware
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fastmcp.utilities.logging import get_logger
# Thread-local storage for user token
local = local()
logger = get_logger(name="SearXNG.TokenMiddleware")
logger.setLevel(logging.DEBUG)
def to_boolean(string: Union[str, bool] = None) -> bool:
if isinstance(string, bool):
return string
if not string:
return False
normalized = str(string).strip().lower()
true_values = {"t", "true", "y", "yes", "1"}
false_values = {"f", "false", "n", "no", "0"}
if normalized in true_values:
return True
elif normalized in false_values:
return False
else:
raise ValueError(f"Cannot convert '{string}' to boolean")
# Global variables for SearXNG configuration
SEARXNG_INSTANCE_URL = os.environ.get("SEARXNG_INSTANCE_URL", None)
SEARXNG_USERNAME = os.environ.get("SEARXNG_USERNAME", None)
SEARXNG_PASSWORD = os.environ.get("SEARXNG_PASSWORD", None)
HAS_BASIC_AUTH = bool(SEARXNG_USERNAME and SEARXNG_PASSWORD)
INSTANCES_LIST_URL = "https://raw.githubusercontent.com/searxng/searx-instances/refs/heads/master/searxinstances/instances.yml"
USE_RANDOM_INSTANCE = to_boolean(os.environ.get("USE_RANDOM_INSTANCE", "false").lower())
config = {
"enable_delegation": to_boolean(os.environ.get("ENABLE_DELEGATION", "False")),
"audience": os.environ.get("AUDIENCE", None),
"delegated_scopes": os.environ.get("DELEGATED_SCOPES", "api"),
"token_endpoint": None, # Will be fetched dynamically from OIDC config
"oidc_client_id": os.environ.get("OIDC_CLIENT_ID", None),
"oidc_client_secret": os.environ.get("OIDC_CLIENT_SECRET", None),
"oidc_config_url": os.environ.get("OIDC_CONFIG_URL", None),
"jwt_jwks_uri": os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI", None),
"jwt_issuer": os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER", None),
"jwt_audience": os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE", None),
"jwt_algorithm": os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM", None),
"jwt_secret": os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY", None),
"jwt_required_scopes": os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES", None),
}
class UserTokenMiddleware(Middleware):
async def on_request(self, context: MiddlewareContext, call_next):
logger.debug(f"Delegation enabled: {config['enable_delegation']}")
if config["enable_delegation"]:
headers = getattr(context.message, "headers", {})
auth = headers.get("Authorization")
if auth and auth.startswith("Bearer "):
token = auth.split(" ")[1]
local.user_token = token
local.user_claims = None # Will be populated by JWTVerifier
# Extract claims if JWTVerifier already validated
if hasattr(context, "auth") and hasattr(context.auth, "claims"):
local.user_claims = context.auth.claims
logger.info(
"Stored JWT claims for delegation",
extra={"subject": context.auth.claims.get("sub")},
)
else:
logger.debug("JWT claims not yet available (will be after auth)")
logger.info("Extracted Bearer token for delegation")
else:
logger.error("Missing or invalid Authorization header")
raise ValueError("Missing or invalid Authorization header")
return await call_next(context)
class JWTClaimsLoggingMiddleware(Middleware):
async def on_response(self, context: MiddlewareContext, call_next):
response = await call_next(context)
logger.info(f"JWT Response: {response}")
if hasattr(context, "auth") and hasattr(context.auth, "claims"):
logger.info(
"JWT Authentication Success",
extra={
"subject": context.auth.claims.get("sub"),
"client_id": context.auth.claims.get("client_id"),
"scopes": context.auth.claims.get("scope"),
},
)
# Function to fetch and select a random SearXNG instance
def get_random_searxng_instance() -> str:
logger = logging.getLogger("SearXNG")
logger.debug("[SearXNG] Fetching list of SearXNG instances...")
try:
response = requests.get(INSTANCES_LIST_URL)
response.raise_for_status()
instances_data = yaml.safe_load(response.text)
# Filter for standard internet instances (not onion or hidden)
standard_instances: List[str] = []
for url, data in instances_data.items():
instance_data = data or {}
comments = instance_data.get("comments", [])
network_type = instance_data.get("network_type")
if (
not comments or ("hidden" not in comments and "onion" not in comments)
) and (not network_type or network_type == "normal"):
standard_instances.append(url)
logger.debug(f"[SearXNG] Found {len(standard_instances)} standard instances")
if not standard_instances:
raise ValueError("No standard SearXNG instances found")
# Select a random instance
random_instance = random.choice(standard_instances)
logger.debug(f"[SearXNG] Selected random instance: {random_instance}")
return random_instance
except Exception as e:
logger.error(f"[SearXNG] Error fetching instances: {str(e)}")
raise ValueError("Failed to fetch SearXNG instances list") from e
def register_tools(mcp: FastMCP):
@mcp.tool(
annotations={
"title": "SearXNG Search",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
tags={"search"},
)
async def web_search(
query: str = Field(description="Search query", default=None),
language: str = Field(
description="Language code for search results (e.g., 'en', 'de', 'fr'). Default: 'en'",
default="en",
),
time_range: Optional[str] = Field(
description="Time range for search results. Options: 'day', 'week', 'month', 'year'. Default: null (no time restriction).",
default=None,
),
categories: Optional[List[str]] = Field(
description="Categories to search in (e.g., 'general', 'images', 'news'). Default: null (all categories).",
default=None,
),
engines: Optional[List[str]] = Field(
description="Specific search engines to use. Default: null (all available engines).",
default=None,
),
safesearch: int = Field(
description="Safe search level: 0 (off), 1 (moderate), 2 (strict). Default: 1 (moderate).",
default=1,
),
pageno: int = Field(
description="Page number for results. Must be minimum 1. Default: 1.",
default=1,
ge=1,
),
max_results: int = Field(
description="Maximum number of search results to return. Range: 1-50. Default: 10.",
default=10,
ge=1,
le=50,
),
ctx: Context = Field(
description="MCP context for progress reporting.", default=None
),
) -> Dict[str, Any]:
"""
Perform web searches using SearXNG, a privacy-respecting metasearch engine. Returns relevant web content with customizable parameters.
Returns a Dictionary response with status, message, data (search results), and error if any.
"""
logger = logging.getLogger("SearXNG")
logger.debug(f"[SearXNG] Searching for: {query}")
try:
if not query:
return {
"status": 400,
"message": "Invalid input: query must not be empty",
"data": None,
"error": "query must not be empty",
}
# Prepare search parameters
search_params = {
"q": query,
"format": "json",
"language": language,
"safesearch": safesearch,
"pageno": pageno,
}
if time_range:
search_params["time_range"] = time_range
if categories:
search_params["categories"] = ",".join(categories)
if engines:
search_params["engines"] = ",".join(engines)
# Report initial progress if ctx is available
if ctx:
await ctx.report_progress(progress=0, total=100)
logger.debug("Reported initial progress: 0/100")
# Make request to SearXNG
auth = (SEARXNG_USERNAME, SEARXNG_PASSWORD) if HAS_BASIC_AUTH else None
response = requests.get(
f"{SEARXNG_INSTANCE_URL}/search", params=search_params, auth=auth
)
response.raise_for_status()
search_response: Dict[str, Any] = response.json()
# Limit results
limited_results = search_response.get("results", [])[:max_results]
# Construct final response
final_response = {
**search_response,
"results": limited_results,
"number_of_results": len(limited_results),
}
# Report completion
if ctx:
await ctx.report_progress(progress=100, total=100)
logger.debug("Reported final progress: 100/100")
logger.debug(f"[SearXNG] Search completed for query: {query}")
return {
"status": 200,
"message": "Search completed successfully",
"data": final_response,
"error": None,
}
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code if e.response else None
if status_code == 401:
error_msg = "Authentication failed. Please check your SearXNG username and password."
else:
error_msg = f"SearXNG API error: {e.response.json().get('message', str(e)) if e.response else str(e)}"
logger.error(f"[SearXNG Error] {error_msg}")
return {
"status": status_code or 500,
"message": "Failed to perform search",
"data": None,
"error": error_msg,
}
except Exception as e:
logger.error(f"[SearXNG Error] {str(e)}")
return {
"status": 500,
"message": "Failed to perform search",
"data": None,
"error": str(e),
}
def register_prompts(mcp: FastMCP):
# Prompts
@mcp.prompt
def search(topic) -> str:
return f"Searching the web for: {topic}."
def searxng_mcp():
parser = argparse.ArgumentParser(description="SearXNG MCP Server")
parser.add_argument(
"-t",
"--transport",
default="stdio",
choices=["stdio", "http", "sse"],
help="Transport method: 'stdio', 'http', or 'sse' [legacy] (default: stdio)",
)
parser.add_argument(
"-s",
"--host",
default="0.0.0.0",
help="Host address for HTTP transport (default: 0.0.0.0)",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=8000,
help="Port number for HTTP transport (default: 8000)",
)
parser.add_argument(
"--auth-type",
default="none",
choices=["none", "static", "jwt", "oauth-proxy", "oidc-proxy", "remote-oauth"],
help="Authentication type for MCP server: 'none' (disabled), 'static' (internal), 'jwt' (external token verification), 'oauth-proxy', 'oidc-proxy', 'remote-oauth' (external) (default: none)",
)
# JWT/Token params
parser.add_argument(
"--token-jwks-uri", default=None, help="JWKS URI for JWT verification"
)
parser.add_argument(
"--token-issuer", default=None, help="Issuer for JWT verification"
)
parser.add_argument(
"--token-audience", default=None, help="Audience for JWT verification"
)
parser.add_argument(
"--token-algorithm",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_ALGORITHM"),
choices=[
"HS256",
"HS384",
"HS512",
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512",
],
help="JWT signing algorithm (required for HMAC or static key). Auto-detected for JWKS.",
)
parser.add_argument(
"--token-secret",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"),
help="Shared secret for HMAC (HS*) or PEM public key for static asymmetric verification.",
)
parser.add_argument(
"--token-public-key",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_PUBLIC_KEY"),
help="Path to PEM public key file or inline PEM string (for static asymmetric keys).",
)
parser.add_argument(
"--required-scopes",
default=os.getenv("FASTMCP_SERVER_AUTH_JWT_REQUIRED_SCOPES"),
help="Comma-separated list of required scopes (e.g., searxng.read,searxng.write).",
)
# OAuth Proxy params
parser.add_argument(
"--oauth-upstream-auth-endpoint",
default=None,
help="Upstream authorization endpoint for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-token-endpoint",
default=None,
help="Upstream token endpoint for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-client-id",
default=None,
help="Upstream client ID for OAuth Proxy",
)
parser.add_argument(
"--oauth-upstream-client-secret",
default=None,
help="Upstream client secret for OAuth Proxy",
)
parser.add_argument(
"--oauth-base-url", default=None, help="Base URL for OAuth Proxy"
)
# OIDC Proxy params
parser.add_argument(
"--oidc-config-url", default=None, help="OIDC configuration URL"
)
parser.add_argument("--oidc-client-id", default=None, help="OIDC client ID")
parser.add_argument("--oidc-client-secret", default=None, help="OIDC client secret")
parser.add_argument("--oidc-base-url", default=None, help="Base URL for OIDC Proxy")
# Remote OAuth params
parser.add_argument(
"--remote-auth-servers",
default=None,
help="Comma-separated list of authorization servers for Remote OAuth",
)
parser.add_argument(
"--remote-base-url", default=None, help="Base URL for Remote OAuth"
)
# Common
parser.add_argument(
"--allowed-client-redirect-uris",
default=None,
help="Comma-separated list of allowed client redirect URIs",
)
# Eunomia params
parser.add_argument(
"--eunomia-type",
default="none",
choices=["none", "embedded", "remote"],
help="Eunomia authorization type: 'none' (disabled), 'embedded' (built-in), 'remote' (external) (default: none)",
)
parser.add_argument(
"--eunomia-policy-file",
default="mcp_policies.json",
help="Policy file for embedded Eunomia (default: mcp_policies.json)",
)
parser.add_argument(
"--eunomia-remote-url", default=None, help="URL for remote Eunomia server"
)
# Delegation params
parser.add_argument(
"--enable-delegation",
action="store_true",
default=to_boolean(os.environ.get("ENABLE_DELEGATION", "False")),
help="Enable OIDC token delegation",
)
parser.add_argument(
"--audience",
default=os.environ.get("AUDIENCE", None),
help="Audience for the delegated token",
)
parser.add_argument(
"--delegated-scopes",
default=os.environ.get("DELEGATED_SCOPES", "api"),
help="Scopes for the delegated token (space-separated)",
)
parser.add_argument(
"--openapi-file",
default=None,
help="Path to the OpenAPI JSON file to import additional tools from",
)
parser.add_argument(
"--openapi-base-url",
default=None,
help="Base URL for the OpenAPI client (overrides instance URL)",
)
parser.add_argument(
"--openapi-use-token",
action="store_true",
help="Use the incoming Bearer token (from MCP request) to authenticate OpenAPI import",
)
parser.add_argument(
"--openapi-username",
default=os.getenv("OPENAPI_USERNAME"),
help="Username for basic auth during OpenAPI import",
)
parser.add_argument(
"--openapi-password",
default=os.getenv("OPENAPI_PASSWORD"),
help="Password for basic auth during OpenAPI import",
)
parser.add_argument(
"--openapi-client-id",
default=os.getenv("OPENAPI_CLIENT_ID"),
help="OAuth client ID for OpenAPI import",
)
parser.add_argument(
"--openapi-client-secret",
default=os.getenv("OPENAPI_CLIENT_SECRET"),
help="OAuth client secret for OpenAPI import",
)
args = parser.parse_args()
if args.port < 0 or args.port > 65535:
print(f"Error: Port {args.port} is out of valid range (0-65535).")
sys.exit(1)
# Update config with CLI arguments
config["enable_delegation"] = args.enable_delegation
config["audience"] = args.audience or config["audience"]
config["delegated_scopes"] = args.delegated_scopes or config["delegated_scopes"]
config["oidc_config_url"] = args.oidc_config_url or config["oidc_config_url"]
config["oidc_client_id"] = args.oidc_client_id or config["oidc_client_id"]
config["oidc_client_secret"] = (
args.oidc_client_secret or config["oidc_client_secret"]
)
# Configure delegation if enabled
if config["enable_delegation"]:
if args.auth_type != "oidc-proxy":
logger.error("Token delegation requires auth-type=oidc-proxy")
sys.exit(1)
if not config["audience"]:
logger.error("audience is required for delegation")
sys.exit(1)
if not all(
[
config["oidc_config_url"],
config["oidc_client_id"],
config["oidc_client_secret"],
]
):
logger.error(
"Delegation requires complete OIDC configuration (oidc-config-url, oidc-client-id, oidc-client-secret)"
)
sys.exit(1)
# Fetch OIDC configuration to get token_endpoint
try:
logger.info(
"Fetching OIDC configuration",
extra={"oidc_config_url": config["oidc_config_url"]},
)
oidc_config_resp = requests.get(config["oidc_config_url"])
oidc_config_resp.raise_for_status()
oidc_config = oidc_config_resp.json()
config["token_endpoint"] = oidc_config.get("token_endpoint")
if not config["token_endpoint"]:
logger.error("No token_endpoint found in OIDC configuration")
raise ValueError("No token_endpoint found in OIDC configuration")
logger.info(
"OIDC configuration fetched successfully",
extra={"token_endpoint": config["token_endpoint"]},
)
except Exception as e:
print(f"Failed to fetch OIDC configuration: {e}")
logger.error(
"Failed to fetch OIDC configuration",
extra={"error_type": type(e).__name__, "error_message": str(e)},
)
sys.exit(1)
# Set auth based on type
auth = None
allowed_uris = (
args.allowed_client_redirect_uris.split(",")
if args.allowed_client_redirect_uris
else None
)
if args.auth_type == "none":
auth = None
elif args.auth_type == "static":
auth = StaticTokenVerifier(
tokens={
"test-token": {"client_id": "test-user", "scopes": ["read", "write"]},
"admin-token": {"client_id": "admin", "scopes": ["admin"]},
}
)
elif args.auth_type == "jwt":
# Fallback to env vars if not provided via CLI
jwks_uri = args.token_jwks_uri or os.getenv("FASTMCP_SERVER_AUTH_JWT_JWKS_URI")
issuer = args.token_issuer or os.getenv("FASTMCP_SERVER_AUTH_JWT_ISSUER")
audience = args.token_audience or os.getenv("FASTMCP_SERVER_AUTH_JWT_AUDIENCE")
algorithm = args.token_algorithm
secret_or_key = args.token_secret or args.token_public_key
public_key_pem = None
if not (jwks_uri or secret_or_key):
logger.error(
"JWT auth requires either --token-jwks-uri or --token-secret/--token-public-key"
)
sys.exit(1)
if not (issuer and audience):
logger.error("JWT requires --token-issuer and --token-audience")
sys.exit(1)
# Load static public key from file if path is given
if args.token_public_key and os.path.isfile(args.token_public_key):
try:
with open(args.token_public_key, "r") as f:
public_key_pem = f.read()
logger.info(f"Loaded static public key from {args.token_public_key}")
except Exception as e:
print(f"Failed to read public key file: {e}")
logger.error(f"Failed to read public key file: {e}")
sys.exit(1)
elif args.token_public_key:
public_key_pem = args.token_public_key # Inline PEM
# Validation: Conflicting options
if jwks_uri and (algorithm or secret_or_key):
logger.warning(
"JWKS mode ignores --token-algorithm and --token-secret/--token-public-key"
)
# HMAC mode
if algorithm and algorithm.startswith("HS"):
if not secret_or_key:
logger.error(f"HMAC algorithm {algorithm} requires --token-secret")
sys.exit(1)
if jwks_uri:
logger.error("Cannot use --token-jwks-uri with HMAC")
sys.exit(1)
public_key = secret_or_key
else:
public_key = public_key_pem
# Required scopes
required_scopes = None
if args.required_scopes:
required_scopes = [
s.strip() for s in args.required_scopes.split(",") if s.strip()
]
try:
auth = JWTVerifier(
jwks_uri=jwks_uri,
public_key=public_key,
issuer=issuer,
audience=audience,
algorithm=(
algorithm if algorithm and algorithm.startswith("HS") else None
),
required_scopes=required_scopes,
)
logger.info(
"JWTVerifier configured",
extra={
"mode": (
"JWKS"
if jwks_uri
else (
"HMAC"
if algorithm and algorithm.startswith("HS")
else "Static Key"
)
),
"algorithm": algorithm,
"required_scopes": required_scopes,
},
)
except Exception as e:
print(f"Failed to initialize JWTVerifier: {e}")
logger.error(f"Failed to initialize JWTVerifier: {e}")
sys.exit(1)
elif args.auth_type == "oauth-proxy":
if not (
args.oauth_upstream_auth_endpoint
and args.oauth_upstream_token_endpoint
and args.oauth_upstream_client_id
and args.oauth_upstream_client_secret
and args.oauth_base_url
and args.token_jwks_uri
and args.token_issuer
and args.token_audience
):
print(
"oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, "
"oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, "
"token-issuer, token-audience"
)
logger.error(
"oauth-proxy requires oauth-upstream-auth-endpoint, oauth-upstream-token-endpoint, "
"oauth-upstream-client-id, oauth-upstream-client-secret, oauth-base-url, token-jwks-uri, "
"token-issuer, token-audience",
extra={
"auth_endpoint": args.oauth_upstream_auth_endpoint,
"token_endpoint": args.oauth_upstream_token_endpoint,
"client_id": args.oauth_upstream_client_id,
"base_url": args.oauth_base_url,
"jwks_uri": args.token_jwks_uri,
"issuer": args.token_issuer,
"audience": args.token_audience,
},
)
sys.exit(1)
token_verifier = JWTVerifier(
jwks_uri=args.token_jwks_uri,
issuer=args.token_issuer,
audience=args.token_audience,
)
auth = OAuthProxy(
upstream_authorization_endpoint=args.oauth_upstream_auth_endpoint,
upstream_token_endpoint=args.oauth_upstream_token_endpoint,
upstream_client_id=args.oauth_upstream_client_id,
upstream_client_secret=args.oauth_upstream_client_secret,
token_verifier=token_verifier,
base_url=args.oauth_base_url,
allowed_client_redirect_uris=allowed_uris,
)
elif args.auth_type == "oidc-proxy":
if not (
args.oidc_config_url
and args.oidc_client_id
and args.oidc_client_secret
and args.oidc_base_url
):
logger.error(
"oidc-proxy requires oidc-config-url, oidc-client-id, oidc-client-secret, oidc-base-url",
extra={
"config_url": args.oidc_config_url,
"client_id": args.oidc_client_id,
"base_url": args.oidc_base_url,
},
)
sys.exit(1)
auth = OIDCProxy(
config_url=args.oidc_config_url,
client_id=args.oidc_client_id,
client_secret=args.oidc_client_secret,
base_url=args.oidc_base_url,
allowed_client_redirect_uris=allowed_uris,
)
elif args.auth_type == "remote-oauth":
if not (
args.remote_auth_servers
and args.remote_base_url
and args.token_jwks_uri
and args.token_issuer
and args.token_audience
):
logger.error(
"remote-oauth requires remote-auth-servers, remote-base-url, token-jwks-uri, token-issuer, token-audience",
extra={
"auth_servers": args.remote_auth_servers,
"base_url": args.remote_base_url,
"jwks_uri": args.token_jwks_uri,
"issuer": args.token_issuer,
"audience": args.token_audience,
},
)
sys.exit(1)
auth_servers = [url.strip() for url in args.remote_auth_servers.split(",")]
token_verifier = JWTVerifier(
jwks_uri=args.token_jwks_uri,
issuer=args.token_issuer,
audience=args.token_audience,
)
auth = RemoteAuthProvider(
token_verifier=token_verifier,
authorization_servers=auth_servers,
base_url=args.remote_base_url,
)
# === 2. Build Middleware List ===
middlewares: List[
Union[
UserTokenMiddleware,
ErrorHandlingMiddleware,
RateLimitingMiddleware,
TimingMiddleware,
LoggingMiddleware,
JWTClaimsLoggingMiddleware,
EunomiaMcpMiddleware,
]
] = [
ErrorHandlingMiddleware(include_traceback=True, transform_errors=True),
RateLimitingMiddleware(max_requests_per_second=10.0, burst_capacity=20),
TimingMiddleware(),
LoggingMiddleware(),
JWTClaimsLoggingMiddleware(),
]
if config["enable_delegation"] or args.auth_type == "jwt":
middlewares.insert(0, UserTokenMiddleware()) # Must be first
if args.eunomia_type in ["embedded", "remote"]:
try:
from eunomia_mcp import create_eunomia_middleware
policy_file = args.eunomia_policy_file or "mcp_policies.json"
eunomia_endpoint = (
args.eunomia_remote_url if args.eunomia_type == "remote" else None
)
eunomia_mw = create_eunomia_middleware(
policy_file=policy_file, eunomia_endpoint=eunomia_endpoint
)
middlewares.append(eunomia_mw)
logger.info(f"Eunomia middleware enabled ({args.eunomia_type})")
except Exception as e:
print(f"Failed to load Eunomia middleware: {e}")
logger.error("Failed to load Eunomia middleware", extra={"error": str(e)})
sys.exit(1)
mcp = FastMCP(name="SearXNGServer", auth=auth)
register_tools(mcp)
register_prompts(mcp)
for mw in middlewares:
mcp.add_middleware(mw)
print("\nStarting SearXNG MCP Server")
print(f" Transport: {args.transport.upper()}")
print(f" Auth: {args.auth_type}")
print(f" Delegation: {'ON' if config['enable_delegation'] else 'OFF'}")
print(f" Eunomia: {args.eunomia_type}")
if args.transport == "stdio":
mcp.run(transport="stdio")
elif args.transport == "http":
mcp.run(transport="http", host=args.host, port=args.port)
elif args.transport == "sse":
mcp.run(transport="sse", host=args.host, port=args.port)
else:
logger.error("Invalid transport", extra={"transport": args.transport})
sys.exit(1)
if __name__ == "__main__":
searxng_mcp()