import re
from typing import Any
from mcp.server.fastmcp import FastMCP
from ..tshark.client import TSharkClient
from .envelope import error_response, normalize_tool_result, parse_tool_result, success_response
def _parse_io_graph(output: str) -> list[tuple[float, int]]:
"""
Parse output of `tshark -z io,stat,interval`.
Returns list of (time_start, packet_count).
"""
data = []
lines = output.splitlines()
for line in lines:
line = line.strip()
# Skip headers/footers
if "=|" in line or "|---" in line or "Duration" in line or "Interval:" in line:
continue
# Regex to find float range and int
# Matches: "000.000-001.000 154" or "| 0.0- 1.0| 20|"
# Group 1: Start Time, Group 2: Count
match = re.search(r"([\d\.]+)\s*-\s*[\d\.]+\s+[|]?\s*(\d+)", line)
if match:
try:
t = float(match.group(1))
count = int(match.group(2))
data.append((t, count))
except ValueError:
continue
return data
def _render_ascii_bar_chart(data: list[tuple[float, int]], height: int = 15) -> str:
if not data:
return "No traffic data found."
max_val = max(d[1] for d in data) if data else 0
if max_val == 0:
return "No packets found in this capture."
# Scaling
scale = max_val / height if max_val > height else 1
# Build chart rows (top to bottom)
lines = []
lines.append(f"[Traffic I/O Graph] Max: {max_val} pkts/interval")
# Y-axis + Bars
for h in range(height, -1, -1):
threshold = h * scale
row = f"{str(int(threshold)).rjust(5)} | "
for _, count in data:
if count >= threshold and count > 0: # Only draw if count > 0
if count >= threshold + (scale * 0.5):
row += "█" # Full block
else:
row += "▄" # Half block
elif h == 0:
row += "_" # Baseline
else:
row += " "
lines.append(row)
# X-axis Labels (Start and End time)
start_time = data[0][0]
duration = (data[1][0] - data[0][0]) * len(data) if len(data) > 1 else 0
end_time = start_time + duration
# Footer
footer = " " * 8
# Try to align labels
footer += f"{start_time}s"
spacer_len = max(0, len(data) - len(str(start_time)) - len(str(end_time)) - 2)
footer += " " * spacer_len
footer += f"{end_time}s"
lines.append(footer)
total_pkts = sum(d[1] for d in data)
avg_rate = total_pkts / duration if duration > 0 else 0
lines.append(f"\nStats: Total Packets: {total_pkts} | Avg Rate: {avg_rate:.2f} pkts/s")
return "\n".join(lines)
def _parse_protocol_hierarchy(output: str) -> dict[str, Any]:
"""Parse tshark -z io,phs output into a nested dict."""
lines = output.splitlines()
# Find root indentation logic
# TShark output usually starts with the protocol name. hierarchy is by indentation.
# We need a dummy root to hold everything.
root = {"name": "root", "frames": 0, "bytes": 0, "children": []}
stack = [(root, -1)] # (node, indent_level)
for line in lines:
if "frames:bytes" in line or "Filter:" in line or "Statistics" in line or "====" in line:
continue
if not line.strip():
continue
# Calculate indent: number of leading spaces
indent = len(line) - len(line.lstrip())
# Regex: protocol name can contain dots, dashes, underscores
# "ip 100:150000"
match = re.search(r"([a-zA-Z0-9\-\._]+)\s+(\d+):(\d+)", line)
if match:
name = match.group(1)
frames = int(match.group(2))
bytes_val = int(match.group(3))
node = {"name": name, "frames": frames, "bytes": bytes_val, "children": []}
# Find parent: parent indent must be less than current indent
while stack and stack[-1][1] >= indent:
stack.pop()
if stack:
parent = stack[-1][0]
parent["children"].append(node)
stack.append((node, indent))
else:
# Should not happen if we have a root, but as fallback
root["children"].append(node)
stack.append((node, indent))
return root
def _render_ascii_tree(node: dict[str, Any], total_frames: int, prefix: str = "", is_last: bool = True) -> list[str]:
lines = []
if node["name"] != "root":
percent = (node["frames"] / total_frames * 100) if total_frames > 0 else 0
connector = "└── " if is_last else "├── "
# Format: └── ip (99.5%) [100 pkts]
line = f"{prefix}{connector}{node['name']} ({percent:.1f}%) [{node['frames']} pkts]"
lines.append(line)
prefix += " " if is_last else "│ "
children = node["children"]
count = len(children)
for i, child in enumerate(children):
lines.extend(_render_ascii_tree(child, total_frames, prefix, i == count - 1))
return lines
def register_visualize_tools(mcp: FastMCP, client: TSharkClient):
@mcp.tool()
async def wireshark_plot_traffic(pcap_file: str, interval: int = 1) -> str:
"""
[Visualization] Generate an ASCII bar chart of traffic volume (I/O Graph).
Useful for identifying traffic spikes, DDoS start times, or silence patterns.
Args:
pcap_file: Path to pcap file
interval: Time interval bucket in seconds (default: 1)
Returns:
String containing the ASCII chart
"""
raw_output_result = parse_tool_result(await client.get_io_graph_data(pcap_file, interval))
if not raw_output_result["success"]:
return normalize_tool_result(raw_output_result)
raw_output = raw_output_result.get("data")
if not isinstance(raw_output, str):
return error_response(
"Unexpected data format from I/O graph tool",
error_type="DependencyError",
details={"expected": "string", "received": raw_output.__class__.__name__},
)
data = _parse_io_graph(raw_output)
return success_response(_render_ascii_bar_chart(data))
@mcp.tool()
async def wireshark_plot_protocols(pcap_file: str) -> str:
"""
[Visualization] Generate an ASCII tree of protocol hierarchy.
Shows the distribution of protocols (e.g., how much is HTTP vs DNS).
Args:
pcap_file: Path to pcap file
Returns:
String containing the ASCII tree
"""
raw_output_result = parse_tool_result(await client.get_protocol_stats_data(pcap_file))
if not raw_output_result["success"]:
return normalize_tool_result(raw_output_result)
raw_output = raw_output_result.get("data")
if not isinstance(raw_output, str):
return error_response(
"Unexpected data format from protocol hierarchy tool",
error_type="DependencyError",
details={"expected": "string", "received": raw_output.__class__.__name__},
)
root = _parse_protocol_hierarchy(raw_output)
# Calculate total frames from top-level children
total_frames = sum(c["frames"] for c in root["children"]) if root["children"] else 0
if total_frames == 0 and root["children"]:
# If root sum is 0 (unlikely), try max of children
total_frames = max(c["frames"] for c in root["children"])
tree_lines = _render_ascii_tree(root, total_frames)
if not tree_lines:
return success_response("No protocol hierarchy data found.")
header = "[Protocol Hierarchy Statistics]\n"
return success_response(header + "\n".join(tree_lines))