Skip to main content
Glama
maratsal

Falcosidekick UI MCP Server

by maratsal
falco_mcp_server.py17.4 kB
"""Falcosidekick UI MCP server. This server exposes a single tool that proxies the Falcosidekick UI `/api/v1/events/search` endpoint so MCP clients can query Falco events without leaving their workspace. """ from __future__ import annotations import datetime import os import re from typing import Annotated import httpx from mcp.server.fastmcp import FastMCP from pydantic import Field # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- DEFAULT_BASE_URL = os.getenv("FALCO_BASE_URL", "http://localhost:8080") EVENTS_ENDPOINT = os.getenv("FALCO_EVENTS_PATH", "/api/v1/events/search") DEFAULT_USERNAME = os.getenv("FALCO_USERNAME", "admin") DEFAULT_PASSWORD = os.getenv("FALCO_PASSWORD", "admin") DEFAULT_TIMEOUT = float(os.getenv("FALCO_HTTP_TIMEOUT", "15")) SEVERITY_LIST = ["Critical", "Error", "Informational", "Notice", "Warning"] DESCRIPTION_WITH_SEVERITY_REGEX = re.compile( r"^[0-9:.]+\s*:\s*(?:" + "|".join(SEVERITY_LIST) + r")\s+(.*?)(?:\s*\(|$)", re.S, ) DESCRIPTION_FALLBACK_REGEX = re.compile(r"^[0-9:.]+\s*:\s*(.*?)(?:\s*\(|$)", re.S) SOURCE_FIELD_WHITELISTS = { "aws_cloudtrail": [ "ct.name", "ct.region", "ct.request.name", "ct.srcip", "ct.user", "ct.user.arn", ], "syscall": [ "container.name", "container.image.repository", "container.image.tag", "k8s.ns.name", "k8s.pod.name", "evt.type", "evt.res", "fd.name", "proc.name", "proc.cmdline", ], "k8s_audit": [ "ka.target.name", "ka.target.namespace", "ka.user.name", "ka.target.subresource", ], } OUTPUT_FIELDS_TO_DROP = {"evt.time"} PROC_CMDLINE_MAX_LEN = 120 TIME_FILTER_GUARD = re.compile(r"\b(time|timestamp)\b", re.IGNORECASE) mcp = FastMCP( name="Falcosidekick Events MCP", instructions=( "Use the `query_falco_events` tool to fetch Falco security events via " "the Falcosidekick UI search API. Provide optional filters such as " "`priority`, `rule`, `source`, `tags`, or `since` to scope results. " "To target a precise time window, supply `start_time` and/or `end_time` " "using ISO 8601 UTC format like `2025-11-24T03:59:59.848208Z`. " "The server will convert the strings to UTC timestamps, apply them " "after fetching results, and only return events inside that interval. " "Responses keep every event (uuid, time, priority, rule, source, description, hostname) " "but streamline `output_fields` per source (syscall, k8s_audit, aws_cloudtrail) and drop " "redundant keys like `evt.time`. Syscall `proc.cmdline` entries are truncated for brevity. " "Use `start_time` / `end_time` arguments instead of crafting `filter` expressions for time ranges. " "Use `falco_full_event_by_id` when you need the exact raw event (all fields) for a given UUID." ), host=os.getenv("MCP_HOST", "0.0.0.0"), port=int(os.getenv("PORT", "8080")), streamable_http_path=os.getenv("MCP_HTTP_PATH", "/mcp"), ) def _clean_base_url(base_url: str) -> str: """Normalize base URLs by trimming trailing slashes.""" if base_url.endswith("/"): return base_url.rstrip("/") return base_url def _comma_join(values: list[str] | None) -> str | None: return ",".join(values) if values else None def _extract_description(output: str | None) -> str | None: if not output: return None match = DESCRIPTION_WITH_SEVERITY_REGEX.search(output) or DESCRIPTION_FALLBACK_REGEX.search(output) if not match: return None return match.group(1).strip() def _rewrite_event(event: dict) -> None: output = event.pop("output", None) description = _extract_description(output) if description: event["description"] = description event.pop("tags", None) def _inject_descriptions(payload: object) -> None: if isinstance(payload, dict): results = payload.get("results") if isinstance(results, dict): for event in results.values(): if isinstance(event, dict): _rewrite_event(event) elif isinstance(results, list): for event in results: if isinstance(event, dict): _rewrite_event(event) else: _rewrite_event(payload) elif isinstance(payload, list): for event in payload: if isinstance(event, dict): _rewrite_event(event) def _parse_iso8601(ts: str | None, label: str) -> datetime.datetime | None: if not ts: return None try: normalized = ts.replace("Z", "+00:00") return datetime.datetime.fromisoformat(normalized) except ValueError as exc: raise ValueError(f"Invalid {label} value '{ts}'. Use ISO 8601 UTC (e.g. 2025-11-24T03:59:59.848208Z).") from exc def _event_in_range(event: dict, start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> bool: if not (start_dt or end_dt): return True event_time = event.get("time") or event.get("timestamp") if not isinstance(event_time, str): return False try: evt_dt = datetime.datetime.fromisoformat(event_time.replace("Z", "+00:00")) except ValueError: return False if start_dt and evt_dt < start_dt: return False if end_dt and evt_dt > end_dt: return False return True def _filter_events(results: list[dict], start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> list[dict]: return [event for event in results if _event_in_range(event, start_dt, end_dt)] def _filter_payload(payload: object, start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> object: if not (start_dt or end_dt): return payload if isinstance(payload, dict): results = payload.get("results") if isinstance(results, list): filtered = _filter_events([event for event in results if isinstance(event, dict)], start_dt, end_dt) results[:] = filtered elif isinstance(results, dict): for key, val in list(results.items()): if not (isinstance(val, dict) and _event_in_range(val, start_dt, end_dt)): results.pop(key) else: if not _event_in_range(payload, start_dt, end_dt): payload.clear() elif isinstance(payload, list): filtered = _filter_events([event for event in payload if isinstance(event, dict)], start_dt, end_dt) payload[:] = filtered return payload def _truncate_cmdline(value: object) -> object: if isinstance(value, str) and len(value) > PROC_CMDLINE_MAX_LEN: return value[:PROC_CMDLINE_MAX_LEN] + "..." return value def _filter_output_fields_for_event(event: dict) -> None: if not isinstance(event, dict): return output_fields = event.get("output_fields") if not isinstance(output_fields, dict): return source_key = (event.get("source") or "").lower() whitelist = SOURCE_FIELD_WHITELISTS.get(source_key) cleaned: dict[str, object] = {} if whitelist: for key in whitelist: if key in OUTPUT_FIELDS_TO_DROP: continue if key in output_fields: cleaned[key] = _truncate_cmdline(output_fields[key]) if key == "proc.cmdline" else output_fields[key] else: for key, value in output_fields.items(): if key in OUTPUT_FIELDS_TO_DROP: continue cleaned[key] = _truncate_cmdline(value) if key == "proc.cmdline" else value if cleaned: event["output_fields"] = cleaned else: event.pop("output_fields", None) def _prune_output_fields(payload: object) -> None: def _process(event: object) -> None: if isinstance(event, dict): _filter_output_fields_for_event(event) if isinstance(payload, dict): results = payload.get("results") if isinstance(results, list): for event in results: _process(event) elif isinstance(results, dict): for event in results.values(): _process(event) else: _process(payload) elif isinstance(payload, list): for event in payload: _process(event) def _validate_filter_query(filter_query: str | None) -> str | None: if not filter_query: return None if TIME_FILTER_GUARD.search(filter_query) and any(token in filter_query for token in (">", "<")): raise ValueError( "Do not embed time/timestamp comparisons inside `filter_query`. " "Use the `start_time` / `end_time` arguments instead." ) return filter_query def _build_uuid_filter(event_uuid: str) -> str: normalized = (event_uuid or "").strip() if not normalized: raise ValueError("event_uuid must not be empty.") escaped = ( normalized.replace("-", "\\") ) return escaped @mcp.tool() async def query_falco_events( since: Annotated[str, Field(description="Falcosidekick relative time window, e.g. 1M, 1h, 24h.")] = "1M", pretty: Annotated[bool, Field(description="Return pretty-printed JSON from the Falcosidekick API.")] = True, limit: Annotated[int, Field(ge=1, le=1000, description="Maximum number of events to return.")] = 1000, priority: Annotated[str | None, Field(description="Optional Falco priority filter (e.g. critical, notice).")] = None, source: Annotated[str | None, Field(description="Restrict results to a single Falco source.")] = None, rule: Annotated[str | None, Field(description="Restrict by rule name. Supports exact matches.")] = None, tags: Annotated[list[str] | None, Field(description="Optional list of tag names; combined as comma-separated.")] = None, filter_query: Annotated[ str | None, Field(description="Falco query DSL filter passed as `filter` (do NOT include time/timestamp comparisons)."), ] = None, base_url: Annotated[ str | None, Field(description="Override Falcosidekick UI base URL. Defaults to FALCO_BASE_URL env or http://localhost:8080."), ] = None, username: Annotated[ str | None, Field(description="Basic auth username. Defaults to FALCO_USERNAME or 'admin'."), ] = None, password: Annotated[ str | None, Field(description="Basic auth password. Defaults to FALCO_PASSWORD or 'admin'."), ] = None, verify_tls: Annotated[ bool, Field(description="Enable TLS certificate verification (set false for self-signed Falcosidekick endpoints)."), ] = False, extra_params: Annotated[ dict[str, str] | None, Field(description="Optional raw query parameters to merge into the request."), ] = None, start_time: Annotated[ str | None, Field(description="Optional inclusive start time in ISO 8601 UTC, e.g. 2025-11-24T03:59:59.848208Z."), ] = None, end_time: Annotated[ str | None, Field(description="Optional inclusive end time in ISO 8601 UTC, e.g. 2025-11-24T04:59:59.848208Z."), ] = None, ) -> dict: """ Query Falco events using the Falcosidekick UI `/api/v1/events/search` API. The tool authenticates with HTTP Basic Auth (default `admin:admin`) and mirrors query parameters documented in `doc.json`. All events (uuid, time, priority, rule, source, description, hostname) are returned, but `output_fields` are normalized per source (syscall, k8s_audit, aws_cloudtrail) while redundant keys like `evt.time` are removed and syscall `proc.cmdline` values are truncated for token efficiency. Use `start_time` / `end_time` for temporal windows; reserve `filter_query` for non-time Falco DSL predicates. """ base = _clean_base_url(base_url or DEFAULT_BASE_URL) url = f"{base}{EVENTS_ENDPOINT}" auth_user = username or DEFAULT_USERNAME auth_pass = password or DEFAULT_PASSWORD params: dict[str, str | int | bool] = { "since": since, "pretty": str(pretty).lower(), "limit": limit, } optional_params = { "priority": priority, "source": source, "rule": rule, "tags": _comma_join(tags), } for key, value in optional_params.items(): if value: params[key] = value if filter_query: params["filter"] = _validate_filter_query(filter_query) if extra_params: params.update({k: v for k, v in extra_params.items() if v is not None}) try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, verify=verify_tls) as client: response = await client.get( url, params=params, auth=(auth_user, auth_pass), headers={"Accept": "application/json"}, ) response.raise_for_status() except httpx.HTTPStatusError as exc: body = exc.response.text[:800] raise RuntimeError( f"Falcosidekick UI returned HTTP {exc.response.status_code} for {exc.request.url}: {body}" ) from exc except httpx.HTTPError as exc: # pragma: no cover - network failures raise RuntimeError(f"Failed to reach Falcosidekick UI: {exc}") from exc try: payload = response.json() except ValueError: payload = response.text start_dt = _parse_iso8601(start_time, "start_time") end_dt = _parse_iso8601(end_time, "end_time") if start_dt and end_dt and start_dt > end_dt: raise ValueError("start_time must be earlier than or equal to end_time.") if isinstance(payload, (dict, list)): _inject_descriptions(payload) _filter_payload(payload, start_dt, end_dt) _prune_output_fields(payload) return { "request_url": str(response.url), "status_code": response.status_code, "result": payload, } @mcp.tool() async def falco_full_event_by_id( event_uuid: Annotated[str, Field(description="Exact Falco event UUID to retrieve.")], since: Annotated[str, Field(description="Falcosidekick relative time window, e.g. 1M, 1h, 24h.")] = "1M", pretty: Annotated[bool, Field(description="Return pretty-printed JSON from the Falcosidekick API.")] = True, base_url: Annotated[ str | None, Field(description="Override Falcosidekick UI base URL. Defaults to FALCO_BASE_URL env or http://localhost:8080."), ] = None, username: Annotated[ str | None, Field(description="Basic auth username. Defaults to FALCO_USERNAME or 'admin'."), ] = None, password: Annotated[ str | None, Field(description="Basic auth password. Defaults to FALCO_PASSWORD or 'admin'."), ] = None, verify_tls: Annotated[ bool, Field(description="Enable TLS certificate verification (set false for self-signed Falcosidekick endpoints)."), ] = False, extra_params: Annotated[ dict[str, str] | None, Field(description="Optional raw query parameters to merge into the request."), ] = None, ) -> dict: """ Fetch a single Falco event by UUID while leaving the payload untouched. This helper hits the same `/api/v1/events/search` endpoint but injects a `filter` query that matches the provided `uuid`, ensuring the Falcosidekick API returns the exact raw event (all fields, no pruning). """ base = _clean_base_url(base_url or DEFAULT_BASE_URL) url = f"{base}{EVENTS_ENDPOINT}" auth_user = username or DEFAULT_USERNAME auth_pass = password or DEFAULT_PASSWORD params: dict[str, str | int | bool] = { "since": since, "pretty": str(pretty).lower(), "limit": 1, "filter": _build_uuid_filter(event_uuid), } if extra_params: for key, value in extra_params.items(): if value is None or key == "filter": continue params[key] = value try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, verify=verify_tls) as client: response = await client.get( url, params=params, auth=(auth_user, auth_pass), headers={"Accept": "application/json"}, ) response.raise_for_status() except httpx.HTTPStatusError as exc: body = exc.response.text[:800] raise RuntimeError( f"Falcosidekick UI returned HTTP {exc.response.status_code} for {exc.request.url}: {body}" ) from exc except httpx.HTTPError as exc: raise RuntimeError(f"Failed to reach Falcosidekick UI: {exc}") from exc try: payload = response.json() except ValueError: payload = response.text return { "request_url": str(response.url), "status_code": response.status_code, "result": payload, } if __name__ == "__main__": transport_mode = os.getenv("MCP_TRANSPORT", "streamable-http").lower() if transport_mode not in {"streamable-http", "stdio"}: raise ValueError("MCP_TRANSPORT must be either 'streamable-http' or 'stdio'") mcp.run(transport=transport_mode)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/maratsal/falco-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server