Skip to main content
Glama

SuricataMCP

by Medinios
suricata-mcp.py5.52 kB
import asyncio import os from typing import Any import config from CommandExecutionError import CommandExecutionError from mcp.server.fastmcp import FastMCP # Create an MCP server mcp = FastMCP("SuricataMCP") async def run_cmd(cmd_args: list[str], timeout: int = 300)-> dict[str, int | None | bool | Any] | str: """ Executes the Suricata command asynchronously with the given arguments and returns the output. This function builds the full Suricata command from the configured path, runs it using asyncio, captures stdout and stderr, handles timeouts, and raises structured exceptions for non-zero exit codes or unexpected errors. Args: cmd_args (list[str]): List of command-line arguments to pass to the Suricata executable. timeout (int, optional): Time in seconds before the command is forcibly terminated. Defaults to 300 seconds. Returns: str: The decoded stdout output from Suricata if the command runs successfully. Raises: CommandExecutionError: If the Suricata executable is not found, the command fails with a non-zero exit code, or if the execution times out or encounters any other exception. """ suricata_full_path = os.path.join(config.SURICATA_DIR, config.SURICATA_EXE_FILE) try: process = await asyncio.create_subprocess_exec( suricata_full_path, *cmd_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout) except asyncio.TimeoutError: process.kill() await process.wait() raise CommandExecutionError( message=f"Command timed out after {timeout} seconds", exit_code=None ) stdout_decoded = stdout.decode().strip() stderr_decoded = stderr.decode().strip() if process.returncode != 0: raise CommandExecutionError( message="Non-zero exit code", exit_code=process.returncode, stdout=stdout_decoded, stderr=stderr_decoded ) return stdout_decoded except FileNotFoundError: raise CommandExecutionError(message=f"Command not found: {' '.join(cmd_args)}") except Exception as e: raise CommandExecutionError(message=str(e)) @mcp.tool() async def get_suricata_version() -> dict[str, int | None | bool | Any] | str: return await run_cmd(["-V"]) @mcp.tool() async def get_suricata_help() -> dict[str, int | None | bool | Any] | str: return await run_cmd(["-h"]) @mcp.tool() async def get_alerts_from_pcap_file(pcap_destination: str, destination_folder_results:str) -> dict[str, int | None | bool | Any] | str: """ Processes a PCAP file with Suricata and returns the generated alert logs. This function runs Suricata against a given PCAP file, stores the results in a specified directory, and then reads the generated `fast.log` file to return its contents. Args: pcap_destination (str): The path to the PCAP file to analyze. destination_folder_results (str): The directory where Suricata should output its results. Returns: str: The contents of the `fast.log` file generated by Suricata. Raises: CommandExecutionError: If the `fast.log` file is not found or an unexpected error occurs during execution. """ try: await run_cmd(["-r", f'{pcap_destination}', "-l", f'{destination_folder_results}']) with open(destination_folder_results + "/fast.log", 'r', encoding='utf-8') as log_file: log_contents = log_file.read() return log_contents except FileNotFoundError: raise CommandExecutionError(message=f"Log file not found at {destination_folder_results}") except Exception as e: raise CommandExecutionError(message=f"An error occurred: {e}") @mcp.tool() async def get_stats_from_pcap_file(pcap_destination: str, destination_folder_results:str) -> dict[str, int | None | bool | Any] | str: """ Processes a PCAP file with Suricata and returns the generated statistics logs. This function runs Suricata against a given PCAP file, stores the results in a specified directory, and then reads the generated `stats.log` file to return its contents. Args: pcap_destination (str): The path to the PCAP file to analyze. destination_folder_results (str): The directory where Suricata should output its results. Returns: str: The contents of the `stats.log` file generated by Suricata. Raises: CommandExecutionError: If the `stats.log` file is not found or an unexpected error occurs during execution. """ try: await run_cmd(["-r", f'{pcap_destination}', "-l", f'{destination_folder_results}']) with open(destination_folder_results + "/stats.log", 'r', encoding='utf-8') as log_file: log_contents = log_file.read() return log_contents except FileNotFoundError: raise CommandExecutionError(message=f"Log file not found at {destination_folder_results}") except Exception as e: raise CommandExecutionError(message=f"An error occurred: {e}") if __name__ == "__main__": print("🚀 Starting Suricata MCP Server") print(f"📂 Suricata directory: {config.SURICATA_DIR}") mcp.run(transport="stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Medinios/SuricataMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server