suricata-mcp.py•5.52 kB
import asyncio
import os
from typing import Any
import config
from CommandExecutionError import CommandExecutionError
from mcp.server.fastmcp import FastMCP
# Create an MCP server
mcp = FastMCP("SuricataMCP")
async def run_cmd(cmd_args: list[str], timeout: int = 300)-> dict[str, int | None | bool | Any] | str:
"""
Executes the Suricata command asynchronously with the given arguments and returns the output.
This function builds the full Suricata command from the configured path, runs it using asyncio,
captures stdout and stderr, handles timeouts, and raises structured exceptions for non-zero exit codes
or unexpected errors.
Args:
cmd_args (list[str]): List of command-line arguments to pass to the Suricata executable.
timeout (int, optional): Time in seconds before the command is forcibly terminated. Defaults to 300 seconds.
Returns:
str: The decoded stdout output from Suricata if the command runs successfully.
Raises:
CommandExecutionError: If the Suricata executable is not found, the command fails with a non-zero exit code,
or if the execution times out or encounters any other exception.
"""
suricata_full_path = os.path.join(config.SURICATA_DIR, config.SURICATA_EXE_FILE)
try:
process = await asyncio.create_subprocess_exec(
suricata_full_path,
*cmd_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise CommandExecutionError(
message=f"Command timed out after {timeout} seconds",
exit_code=None
)
stdout_decoded = stdout.decode().strip()
stderr_decoded = stderr.decode().strip()
if process.returncode != 0:
raise CommandExecutionError(
message="Non-zero exit code",
exit_code=process.returncode,
stdout=stdout_decoded,
stderr=stderr_decoded
)
return stdout_decoded
except FileNotFoundError:
raise CommandExecutionError(message=f"Command not found: {' '.join(cmd_args)}")
except Exception as e:
raise CommandExecutionError(message=str(e))
@mcp.tool()
async def get_suricata_version() -> dict[str, int | None | bool | Any] | str:
return await run_cmd(["-V"])
@mcp.tool()
async def get_suricata_help() -> dict[str, int | None | bool | Any] | str:
return await run_cmd(["-h"])
@mcp.tool()
async def get_alerts_from_pcap_file(pcap_destination: str, destination_folder_results:str) -> dict[str, int | None | bool | Any] | str:
"""
Processes a PCAP file with Suricata and returns the generated alert logs.
This function runs Suricata against a given PCAP file, stores the results in a specified directory,
and then reads the generated `fast.log` file to return its contents.
Args:
pcap_destination (str): The path to the PCAP file to analyze.
destination_folder_results (str): The directory where Suricata should output its results.
Returns:
str: The contents of the `fast.log` file generated by Suricata.
Raises:
CommandExecutionError: If the `fast.log` file is not found or an unexpected error occurs during execution.
"""
try:
await run_cmd(["-r", f'{pcap_destination}', "-l", f'{destination_folder_results}'])
with open(destination_folder_results + "/fast.log", 'r', encoding='utf-8') as log_file:
log_contents = log_file.read()
return log_contents
except FileNotFoundError:
raise CommandExecutionError(message=f"Log file not found at {destination_folder_results}")
except Exception as e:
raise CommandExecutionError(message=f"An error occurred: {e}")
@mcp.tool()
async def get_stats_from_pcap_file(pcap_destination: str, destination_folder_results:str) -> dict[str, int | None | bool | Any] | str:
"""
Processes a PCAP file with Suricata and returns the generated statistics logs.
This function runs Suricata against a given PCAP file, stores the results in a specified directory,
and then reads the generated `stats.log` file to return its contents.
Args:
pcap_destination (str): The path to the PCAP file to analyze.
destination_folder_results (str): The directory where Suricata should output its results.
Returns:
str: The contents of the `stats.log` file generated by Suricata.
Raises:
CommandExecutionError: If the `stats.log` file is not found or an unexpected error occurs during execution.
"""
try:
await run_cmd(["-r", f'{pcap_destination}', "-l", f'{destination_folder_results}'])
with open(destination_folder_results + "/stats.log", 'r', encoding='utf-8') as log_file:
log_contents = log_file.read()
return log_contents
except FileNotFoundError:
raise CommandExecutionError(message=f"Log file not found at {destination_folder_results}")
except Exception as e:
raise CommandExecutionError(message=f"An error occurred: {e}")
if __name__ == "__main__":
print("🚀 Starting Suricata MCP Server")
print(f"📂 Suricata directory: {config.SURICATA_DIR}")
mcp.run(transport="stdio")