"""
HTTP Summary Tool
Summarizes HTTP traffic: URLs, status codes, host headers, methods,
user agents, content types, error rates, and response times.
"""
import logging
import subprocess
from collections import Counter
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 10000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-T", "fields",
"-c", str(max_packets),
]
if display_filter:
cmd.extend(["-Y", display_filter])
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def _safe_run(pcap_path: str, display_filter: str, fields: List[str]) -> List[List[str]]:
try:
return _run_tshark_fields(pcap_path, display_filter, fields)
except Exception:
return []
def http_summary_execute(
project_name: str,
pcap_name: str,
) -> Dict[str, Any]:
"""
Summarize HTTP traffic from a locally synced PCAP file.
Extracts request methods, URLs, host headers, status codes, user agents,
content types, error rates, and response timing to help troubleshoot
web and API traffic.
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Analyzing HTTP traffic in: {pcap_path}")
# ── Requests ──
req_fields = [
"frame.number", "frame.time_relative",
"ip.src", "ip.dst",
"http.request.method", "http.request.uri", "http.request.full_uri",
"http.host", "http.user_agent", "http.content_type",
"tcp.stream",
]
req_rows = _safe_run(pcap_path, "http.request", req_fields)
# ── Responses ──
resp_fields = [
"frame.number", "frame.time_relative",
"ip.src", "ip.dst",
"http.response.code", "http.response.phrase",
"http.content_type", "http.content_length",
"http.time", "tcp.stream",
]
resp_rows = _safe_run(pcap_path, "http.response", resp_fields)
total_requests = len(req_rows)
total_responses = len(resp_rows)
if total_requests == 0 and total_responses == 0:
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_requests": 0,
"total_responses": 0,
"message": "No HTTP traffic found in capture. Traffic may be encrypted (HTTPS/TLS).",
}
# ── Parse requests ──
methods = Counter()
hosts = Counter()
uris = Counter()
user_agents = Counter()
request_content_types = Counter()
client_ips = Counter()
server_ips = Counter()
requests_by_host = {}
for row in req_rows:
if len(row) < 8:
continue
method = row[4] if row[4] else ""
uri = row[5] if row[5] else ""
full_uri = row[6] if row[6] else ""
host = row[7] if row[7] else ""
ua = row[8] if len(row) > 8 and row[8] else ""
ct = row[9] if len(row) > 9 and row[9] else ""
src_ip = row[2] if row[2] else ""
dst_ip = row[3] if row[3] else ""
if method:
methods[method] += 1
if host:
hosts[host] += 1
display_uri = full_uri or (f"{host}{uri}" if host and uri else uri)
if display_uri:
uris[display_uri] += 1
if ua:
user_agents[ua] += 1
if ct:
request_content_types[ct] += 1
if src_ip:
client_ips[src_ip] += 1
if dst_ip:
server_ips[dst_ip] += 1
if host:
if host not in requests_by_host:
requests_by_host[host] = {"methods": Counter(), "uris": []}
requests_by_host[host]["methods"][method] += 1
if uri and len(requests_by_host[host]["uris"]) < 20:
requests_by_host[host]["uris"].append(uri)
# ── Parse responses ──
status_codes = Counter()
response_content_types = Counter()
response_times = []
response_sizes = []
errors = []
for row in resp_rows:
if len(row) < 6:
continue
code = row[4] if row[4] else ""
phrase = row[5] if row[5] else ""
ct = row[6] if len(row) > 6 and row[6] else ""
content_len = row[7] if len(row) > 7 and row[7] else ""
http_time = row[8] if len(row) > 8 and row[8] else ""
stream = row[9] if len(row) > 9 and row[9] else ""
if code:
status_codes[code] += 1
code_int = 0
try:
code_int = int(code)
except ValueError:
pass
if code_int >= 400:
errors.append({
"status_code": code,
"reason": phrase,
"frame": row[0],
"stream": stream,
"src_ip": row[2],
"dst_ip": row[3],
})
if ct:
response_content_types[ct] += 1
if content_len:
try:
response_sizes.append(int(content_len))
except ValueError:
pass
if http_time:
try:
response_times.append(float(http_time))
except ValueError:
pass
# ── Response time stats ──
time_stats = {}
if response_times:
response_times.sort()
time_stats = {
"min_ms": round(min(response_times) * 1000, 2),
"max_ms": round(max(response_times) * 1000, 2),
"avg_ms": round((sum(response_times) / len(response_times)) * 1000, 2),
"median_ms": round(response_times[len(response_times) // 2] * 1000, 2),
"p95_ms": round(response_times[int(len(response_times) * 0.95)] * 1000, 2),
"samples": len(response_times),
}
# ── Response size stats ──
size_stats = {}
if response_sizes:
size_stats = {
"min_bytes": min(response_sizes),
"max_bytes": max(response_sizes),
"avg_bytes": round(sum(response_sizes) / len(response_sizes), 0),
"total_bytes": sum(response_sizes),
}
# ── Error rate ──
error_count = sum(1 for c in status_codes if c.isdigit() and int(c) >= 400)
error_packets = sum(v for c, v in status_codes.items() if c.isdigit() and int(c) >= 400)
error_rate = round((error_packets / total_responses) * 100, 1) if total_responses > 0 else 0
# ── Status code categories ──
status_categories = {
"1xx_informational": sum(v for c, v in status_codes.items() if c.startswith("1")),
"2xx_success": sum(v for c, v in status_codes.items() if c.startswith("2")),
"3xx_redirect": sum(v for c, v in status_codes.items() if c.startswith("3")),
"4xx_client_error": sum(v for c, v in status_codes.items() if c.startswith("4")),
"5xx_server_error": sum(v for c, v in status_codes.items() if c.startswith("5")),
}
# ── Per-host breakdown ──
host_details = []
for host, data in sorted(hosts.items(), key=lambda x: x[1], reverse=True)[:15]:
entry = {
"host": host,
"total_requests": data,
"methods": dict(requests_by_host.get(host, {}).get("methods", {})),
"sample_uris": requests_by_host.get(host, {}).get("uris", [])[:10],
}
host_details.append(entry)
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_requests": total_requests,
"total_responses": total_responses,
"request_methods": dict(methods),
"status_codes": dict(status_codes.most_common(20)),
"status_categories": status_categories,
"error_rate_percent": error_rate,
"hosts": host_details,
"top_uris": dict(uris.most_common(20)),
"user_agents": dict(user_agents.most_common(10)),
"request_content_types": dict(request_content_types.most_common(10)),
"response_content_types": dict(response_content_types.most_common(10)),
"top_client_ips": dict(client_ips.most_common(10)),
"top_server_ips": dict(server_ips.most_common(10)),
"response_time_stats": time_stats,
"response_size_stats": size_stats,
"errors": errors[:30],
"total_errors": error_packets,
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark failed: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error in HTTP analysis: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}