Skip to main content
Glama
benchmark_porthunter.py3.86 kB
import os import sys import time import json import asyncio from pathlib import Path from typing import Any, Dict from mcp import StdioServerParameters, types from mcp.client.stdio import stdio_client from mcp.client.session import ClientSession ROOT_DIR = Path(__file__).resolve().parents[1] DEFAULT_PCAP_DIR = ROOT_DIR / "captures" TOKEN = os.getenv("PORT_HUNTER_TOKEN", "MiTOKENultraSecreto123") PCAP_DIR = Path(os.getenv("PORT_HUNTER_ALLOWED_DIR", str(DEFAULT_PCAP_DIR))).resolve() SERVER_ENV = { "PORT_HUNTER_TOKEN": TOKEN, "PORT_HUNTER_ALLOWED_DIR": str(PCAP_DIR), "PORT_HUNTER_ALLOW_PRIVATE": os.getenv("PORT_HUNTER_ALLOW_PRIVATE", "false"), "PORT_HUNTER_CACHE_DIR": os.getenv("PORT_HUNTER_CACHE_DIR", ".cache/porthunter"), "PORT_HUNTER_REQUIRE_TOKEN": os.getenv("PORT_HUNTER_REQUIRE_TOKEN", "true"), "PORT_HUNTER_MAX_PCAP_MB": os.getenv("PORT_HUNTER_MAX_PCAP_MB", "200"), "OTX_API_KEY": os.getenv("OTX_API_KEY", ""), "GREYNOISE_API_KEY": os.getenv("GREYNOISE_API_KEY", ""), "GEOLITE2_CITY_DB": os.getenv("GEOLITE2_CITY_DB") or os.getenv("GEOIP_DB_PATH", ""), } PORT_HUNTER = StdioServerParameters( command=sys.executable, args=["-m", "porthunter.server"], # <- lo mismo que usa tu CLI env=SERVER_ENV, ) async def _call_tool(name: str, arguments: Dict[str, Any], timeout_s: float = 30.0) -> Any: async with stdio_client(PORT_HUNTER) as (read, write): async with ClientSession(read, write) as session: await session.initialize() rsp = await asyncio.wait_for( session.call_tool(name=name, arguments=arguments), timeout=timeout_s, ) sc = getattr(rsp, "structuredContent", None) if isinstance(sc, dict): return sc if rsp.content: txt = "" for block in rsp.content: if isinstance(block, types.TextContent): txt += block.text if txt: try: return json.loads(txt) except Exception: return txt return None def _abs_pcap(path: str) -> str: p = Path(path) if not p.is_absolute(): p = (PCAP_DIR / p).resolve() return str(p) async def run_bench(pcap_name: str) -> Dict[str, Any]: results: Dict[str, Any] = {"pcap": pcap_name} t0 = time.perf_counter() info = await _call_tool("get_info", {"auth_token": TOKEN}) t1 = time.perf_counter() p = _abs_pcap(pcap_name) ov0 = time.perf_counter() overview = await _call_tool("scan_overview", {"path": p, "auth_token": TOKEN}) ov1 = time.perf_counter() fe0 = time.perf_counter() first = await _call_tool("first_scan_event", {"path": p, "auth_token": TOKEN}) fe1 = time.perf_counter() results["get_info_s"] = round(t1 - t0, 3) results["overview_s"] = round(ov1 - ov0, 3) results["first_event_s"] = round(fe1 - fe0, 3) try: ov = overview.get("overview", {}) results["total_pkts"] = ov.get("total_pkts") results["interval_s"] = ov.get("interval_s") results["suspected_patterns"] = ov.get("suspected_patterns", []) results["scanners_count"] = len(ov.get("scanners", [])) except Exception: pass results["ok"] = bool(overview and first) return results def main(): import argparse ap = argparse.ArgumentParser(description="Benchmark PortHunter (MCP stdio)") ap.add_argument("pcap", help="Archivo .pcap o .pcapng dentro de PORT_HUNTER_ALLOWED_DIR") args = ap.parse_args() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) res = loop.run_until_complete(run_bench(args.pcap)) print(json.dumps(res, indent=2, ensure_ascii=False)) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Abysswalkr/porthunter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server