"""Falcosidekick UI MCP server.
This server exposes a single tool that proxies the Falcosidekick UI
`/api/v1/events/search` endpoint so MCP clients can query Falco events
without leaving their workspace.
"""
from __future__ import annotations
import datetime
import os
import re
from typing import Annotated
import httpx
from mcp.server.fastmcp import FastMCP
from pydantic import Field
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
DEFAULT_BASE_URL = os.getenv("FALCO_BASE_URL", "http://localhost:8080")
EVENTS_ENDPOINT = os.getenv("FALCO_EVENTS_PATH", "/api/v1/events/search")
DEFAULT_USERNAME = os.getenv("FALCO_USERNAME", "admin")
DEFAULT_PASSWORD = os.getenv("FALCO_PASSWORD", "admin")
DEFAULT_TIMEOUT = float(os.getenv("FALCO_HTTP_TIMEOUT", "15"))
SEVERITY_LIST = ["Critical", "Error", "Informational", "Notice", "Warning"]
DESCRIPTION_WITH_SEVERITY_REGEX = re.compile(
r"^[0-9:.]+\s*:\s*(?:"
+ "|".join(SEVERITY_LIST)
+ r")\s+(.*?)(?:\s*\(|$)",
re.S,
)
DESCRIPTION_FALLBACK_REGEX = re.compile(r"^[0-9:.]+\s*:\s*(.*?)(?:\s*\(|$)", re.S)
SOURCE_FIELD_WHITELISTS = {
"aws_cloudtrail": [
"ct.name",
"ct.region",
"ct.request.name",
"ct.srcip",
"ct.user",
"ct.user.arn",
],
"syscall": [
"container.name",
"container.image.repository",
"container.image.tag",
"k8s.ns.name",
"k8s.pod.name",
"evt.type",
"evt.res",
"fd.name",
"proc.name",
"proc.cmdline",
],
"k8s_audit": [
"ka.target.name",
"ka.target.namespace",
"ka.user.name",
"ka.target.subresource",
],
}
OUTPUT_FIELDS_TO_DROP = {"evt.time"}
PROC_CMDLINE_MAX_LEN = 120
TIME_FILTER_GUARD = re.compile(r"\b(time|timestamp)\b", re.IGNORECASE)
mcp = FastMCP(
name="Falcosidekick Events MCP",
instructions=(
"Use the `query_falco_events` tool to fetch Falco security events via "
"the Falcosidekick UI search API. Provide optional filters such as "
"`priority`, `rule`, `source`, `tags`, or `since` to scope results. "
"To target a precise time window, supply `start_time` and/or `end_time` "
"using ISO 8601 UTC format like `2025-11-24T03:59:59.848208Z`. "
"The server will convert the strings to UTC timestamps, apply them "
"after fetching results, and only return events inside that interval. "
"Responses keep every event (uuid, time, priority, rule, source, description, hostname) "
"but streamline `output_fields` per source (syscall, k8s_audit, aws_cloudtrail) and drop "
"redundant keys like `evt.time`. Syscall `proc.cmdline` entries are truncated for brevity. "
"Use `start_time` / `end_time` arguments instead of crafting `filter` expressions for time ranges. "
"Use `falco_full_event_by_id` when you need the exact raw event (all fields) for a given UUID."
),
host=os.getenv("MCP_HOST", "0.0.0.0"),
port=int(os.getenv("PORT", "8080")),
streamable_http_path=os.getenv("MCP_HTTP_PATH", "/mcp"),
)
def _clean_base_url(base_url: str) -> str:
"""Normalize base URLs by trimming trailing slashes."""
if base_url.endswith("/"):
return base_url.rstrip("/")
return base_url
def _comma_join(values: list[str] | None) -> str | None:
return ",".join(values) if values else None
def _extract_description(output: str | None) -> str | None:
if not output:
return None
match = DESCRIPTION_WITH_SEVERITY_REGEX.search(output) or DESCRIPTION_FALLBACK_REGEX.search(output)
if not match:
return None
return match.group(1).strip()
def _rewrite_event(event: dict) -> None:
output = event.pop("output", None)
description = _extract_description(output)
if description:
event["description"] = description
event.pop("tags", None)
def _inject_descriptions(payload: object) -> None:
if isinstance(payload, dict):
results = payload.get("results")
if isinstance(results, dict):
for event in results.values():
if isinstance(event, dict):
_rewrite_event(event)
elif isinstance(results, list):
for event in results:
if isinstance(event, dict):
_rewrite_event(event)
else:
_rewrite_event(payload)
elif isinstance(payload, list):
for event in payload:
if isinstance(event, dict):
_rewrite_event(event)
def _parse_iso8601(ts: str | None, label: str) -> datetime.datetime | None:
if not ts:
return None
try:
normalized = ts.replace("Z", "+00:00")
return datetime.datetime.fromisoformat(normalized)
except ValueError as exc:
raise ValueError(f"Invalid {label} value '{ts}'. Use ISO 8601 UTC (e.g. 2025-11-24T03:59:59.848208Z).") from exc
def _event_in_range(event: dict, start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> bool:
if not (start_dt or end_dt):
return True
event_time = event.get("time") or event.get("timestamp")
if not isinstance(event_time, str):
return False
try:
evt_dt = datetime.datetime.fromisoformat(event_time.replace("Z", "+00:00"))
except ValueError:
return False
if start_dt and evt_dt < start_dt:
return False
if end_dt and evt_dt > end_dt:
return False
return True
def _filter_events(results: list[dict], start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> list[dict]:
return [event for event in results if _event_in_range(event, start_dt, end_dt)]
def _filter_payload(payload: object, start_dt: datetime.datetime | None, end_dt: datetime.datetime | None) -> object:
if not (start_dt or end_dt):
return payload
if isinstance(payload, dict):
results = payload.get("results")
if isinstance(results, list):
filtered = _filter_events([event for event in results if isinstance(event, dict)], start_dt, end_dt)
results[:] = filtered
elif isinstance(results, dict):
for key, val in list(results.items()):
if not (isinstance(val, dict) and _event_in_range(val, start_dt, end_dt)):
results.pop(key)
else:
if not _event_in_range(payload, start_dt, end_dt):
payload.clear()
elif isinstance(payload, list):
filtered = _filter_events([event for event in payload if isinstance(event, dict)], start_dt, end_dt)
payload[:] = filtered
return payload
def _truncate_cmdline(value: object) -> object:
if isinstance(value, str) and len(value) > PROC_CMDLINE_MAX_LEN:
return value[:PROC_CMDLINE_MAX_LEN] + "..."
return value
def _filter_output_fields_for_event(event: dict) -> None:
if not isinstance(event, dict):
return
output_fields = event.get("output_fields")
if not isinstance(output_fields, dict):
return
source_key = (event.get("source") or "").lower()
whitelist = SOURCE_FIELD_WHITELISTS.get(source_key)
cleaned: dict[str, object] = {}
if whitelist:
for key in whitelist:
if key in OUTPUT_FIELDS_TO_DROP:
continue
if key in output_fields:
cleaned[key] = _truncate_cmdline(output_fields[key]) if key == "proc.cmdline" else output_fields[key]
else:
for key, value in output_fields.items():
if key in OUTPUT_FIELDS_TO_DROP:
continue
cleaned[key] = _truncate_cmdline(value) if key == "proc.cmdline" else value
if cleaned:
event["output_fields"] = cleaned
else:
event.pop("output_fields", None)
def _prune_output_fields(payload: object) -> None:
def _process(event: object) -> None:
if isinstance(event, dict):
_filter_output_fields_for_event(event)
if isinstance(payload, dict):
results = payload.get("results")
if isinstance(results, list):
for event in results:
_process(event)
elif isinstance(results, dict):
for event in results.values():
_process(event)
else:
_process(payload)
elif isinstance(payload, list):
for event in payload:
_process(event)
def _validate_filter_query(filter_query: str | None) -> str | None:
if not filter_query:
return None
if TIME_FILTER_GUARD.search(filter_query) and any(token in filter_query for token in (">", "<")):
raise ValueError(
"Do not embed time/timestamp comparisons inside `filter_query`. "
"Use the `start_time` / `end_time` arguments instead."
)
return filter_query
def _build_uuid_filter(event_uuid: str) -> str:
normalized = (event_uuid or "").strip()
if not normalized:
raise ValueError("event_uuid must not be empty.")
escaped = (
normalized.replace("-", "\\")
)
return escaped
@mcp.tool()
async def query_falco_events(
since: Annotated[str, Field(description="Falcosidekick relative time window, e.g. 1M, 1h, 24h.")] = "1M",
pretty: Annotated[bool, Field(description="Return pretty-printed JSON from the Falcosidekick API.")] = True,
limit: Annotated[int, Field(ge=1, le=1000, description="Maximum number of events to return.")] = 1000,
priority: Annotated[str | None, Field(description="Optional Falco priority filter (e.g. critical, notice).")] = None,
source: Annotated[str | None, Field(description="Restrict results to a single Falco source.")] = None,
rule: Annotated[str | None, Field(description="Restrict by rule name. Supports exact matches.")] = None,
tags: Annotated[list[str] | None, Field(description="Optional list of tag names; combined as comma-separated.")] = None,
filter_query: Annotated[
str | None,
Field(description="Falco query DSL filter passed as `filter` (do NOT include time/timestamp comparisons)."),
] = None,
base_url: Annotated[
str | None,
Field(description="Override Falcosidekick UI base URL. Defaults to FALCO_BASE_URL env or http://localhost:8080."),
] = None,
username: Annotated[
str | None,
Field(description="Basic auth username. Defaults to FALCO_USERNAME or 'admin'."),
] = None,
password: Annotated[
str | None,
Field(description="Basic auth password. Defaults to FALCO_PASSWORD or 'admin'."),
] = None,
verify_tls: Annotated[
bool,
Field(description="Enable TLS certificate verification (set false for self-signed Falcosidekick endpoints)."),
] = False,
extra_params: Annotated[
dict[str, str] | None,
Field(description="Optional raw query parameters to merge into the request."),
] = None,
start_time: Annotated[
str | None,
Field(description="Optional inclusive start time in ISO 8601 UTC, e.g. 2025-11-24T03:59:59.848208Z."),
] = None,
end_time: Annotated[
str | None,
Field(description="Optional inclusive end time in ISO 8601 UTC, e.g. 2025-11-24T04:59:59.848208Z."),
] = None,
) -> dict:
"""
Query Falco events using the Falcosidekick UI `/api/v1/events/search` API.
The tool authenticates with HTTP Basic Auth (default `admin:admin`) and
mirrors query parameters documented in `doc.json`. All events (uuid, time,
priority, rule, source, description, hostname) are returned, but
`output_fields` are normalized per source (syscall, k8s_audit,
aws_cloudtrail) while redundant keys like `evt.time` are removed and
syscall `proc.cmdline` values are truncated for token efficiency. Use
`start_time` / `end_time` for temporal windows; reserve `filter_query`
for non-time Falco DSL predicates.
"""
base = _clean_base_url(base_url or DEFAULT_BASE_URL)
url = f"{base}{EVENTS_ENDPOINT}"
auth_user = username or DEFAULT_USERNAME
auth_pass = password or DEFAULT_PASSWORD
params: dict[str, str | int | bool] = {
"since": since,
"pretty": str(pretty).lower(),
"limit": limit,
}
optional_params = {
"priority": priority,
"source": source,
"rule": rule,
"tags": _comma_join(tags),
}
for key, value in optional_params.items():
if value:
params[key] = value
if filter_query:
params["filter"] = _validate_filter_query(filter_query)
if extra_params:
params.update({k: v for k, v in extra_params.items() if v is not None})
try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, verify=verify_tls) as client:
response = await client.get(
url,
params=params,
auth=(auth_user, auth_pass),
headers={"Accept": "application/json"},
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
body = exc.response.text[:800]
raise RuntimeError(
f"Falcosidekick UI returned HTTP {exc.response.status_code} for {exc.request.url}: {body}"
) from exc
except httpx.HTTPError as exc: # pragma: no cover - network failures
raise RuntimeError(f"Failed to reach Falcosidekick UI: {exc}") from exc
try:
payload = response.json()
except ValueError:
payload = response.text
start_dt = _parse_iso8601(start_time, "start_time")
end_dt = _parse_iso8601(end_time, "end_time")
if start_dt and end_dt and start_dt > end_dt:
raise ValueError("start_time must be earlier than or equal to end_time.")
if isinstance(payload, (dict, list)):
_inject_descriptions(payload)
_filter_payload(payload, start_dt, end_dt)
_prune_output_fields(payload)
return {
"request_url": str(response.url),
"status_code": response.status_code,
"result": payload,
}
@mcp.tool()
async def falco_full_event_by_id(
event_uuid: Annotated[str, Field(description="Exact Falco event UUID to retrieve.")],
since: Annotated[str, Field(description="Falcosidekick relative time window, e.g. 1M, 1h, 24h.")] = "1M",
pretty: Annotated[bool, Field(description="Return pretty-printed JSON from the Falcosidekick API.")] = True,
base_url: Annotated[
str | None,
Field(description="Override Falcosidekick UI base URL. Defaults to FALCO_BASE_URL env or http://localhost:8080."),
] = None,
username: Annotated[
str | None,
Field(description="Basic auth username. Defaults to FALCO_USERNAME or 'admin'."),
] = None,
password: Annotated[
str | None,
Field(description="Basic auth password. Defaults to FALCO_PASSWORD or 'admin'."),
] = None,
verify_tls: Annotated[
bool,
Field(description="Enable TLS certificate verification (set false for self-signed Falcosidekick endpoints)."),
] = False,
extra_params: Annotated[
dict[str, str] | None,
Field(description="Optional raw query parameters to merge into the request."),
] = None,
) -> dict:
"""
Fetch a single Falco event by UUID while leaving the payload untouched.
This helper hits the same `/api/v1/events/search` endpoint but injects a
`filter` query that matches the provided `uuid`, ensuring the Falcosidekick
API returns the exact raw event (all fields, no pruning).
"""
base = _clean_base_url(base_url or DEFAULT_BASE_URL)
url = f"{base}{EVENTS_ENDPOINT}"
auth_user = username or DEFAULT_USERNAME
auth_pass = password or DEFAULT_PASSWORD
params: dict[str, str | int | bool] = {
"since": since,
"pretty": str(pretty).lower(),
"limit": 1,
"filter": _build_uuid_filter(event_uuid),
}
if extra_params:
for key, value in extra_params.items():
if value is None or key == "filter":
continue
params[key] = value
try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, verify=verify_tls) as client:
response = await client.get(
url,
params=params,
auth=(auth_user, auth_pass),
headers={"Accept": "application/json"},
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
body = exc.response.text[:800]
raise RuntimeError(
f"Falcosidekick UI returned HTTP {exc.response.status_code} for {exc.request.url}: {body}"
) from exc
except httpx.HTTPError as exc:
raise RuntimeError(f"Failed to reach Falcosidekick UI: {exc}") from exc
try:
payload = response.json()
except ValueError:
payload = response.text
return {
"request_url": str(response.url),
"status_code": response.status_code,
"result": payload,
}
if __name__ == "__main__":
transport_mode = os.getenv("MCP_TRANSPORT", "streamable-http").lower()
if transport_mode not in {"streamable-http", "stdio"}:
raise ValueError("MCP_TRANSPORT must be either 'streamable-http' or 'stdio'")
mcp.run(transport=transport_mode)