"""
DNS Analysis Tool
Analyzes DNS traffic: queried domains, response codes (NXDOMAIN rates),
query types, response times, and suspicious patterns.
"""
import logging
import subprocess
from collections import Counter
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
def _run_tshark_fields(pcap_path: str, display_filter: str, fields: List[str], max_packets: int = 10000) -> List[List[str]]:
"""Run tshark with field extraction and return parsed rows."""
cmd = [
"tshark", "-r", pcap_path,
"-Y", display_filter,
"-T", "fields",
"-c", str(max_packets),
]
for field in fields:
cmd.extend(["-e", field])
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=120
)
rows = []
for line in result.stdout.decode("utf-8").strip().split("\n"):
if line.strip():
rows.append(line.split("\t"))
return rows
def dns_analysis_execute(
project_name: str,
pcap_name: str,
) -> Dict[str, Any]:
"""
Analyze DNS traffic from a locally synced PCAP file.
Extracts queried domains, response codes, query types, response times,
and flags suspicious patterns (high NXDOMAIN, tunneling indicators,
long domain names).
Args:
project_name: Name of the project containing the PCAP
pcap_name: Name of the PCAP file
"""
try:
from .workspace_sync import get_pcap_path as get_project_pcap_path
pcap_path = get_project_pcap_path(project_name, pcap_name)
if not pcap_path:
return {"ok": False, "error": f"PCAP '{pcap_name}' not found in project '{project_name}'"}
logger.info(f"Analyzing DNS for: {pcap_path}")
# --- DNS Queries ---
query_fields = [
"frame.time_relative",
"ip.src", "ip.dst",
"dns.qry.name", "dns.qry.type",
"dns.flags.response",
]
query_rows = _run_tshark_fields(pcap_path, "dns", query_fields)
if not query_rows or (len(query_rows) == 1 and not query_rows[0][0]):
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_dns_packets": 0,
"message": "No DNS traffic found in capture",
}
# --- DNS Responses with rcodes ---
response_fields = [
"frame.time_relative",
"ip.src", "ip.dst",
"dns.qry.name",
"dns.flags.rcode",
"dns.resp.name", "dns.a", "dns.aaaa", "dns.cname",
"dns.resp.ttl",
]
response_rows = _run_tshark_fields(pcap_path, "dns.flags.response == 1", response_fields)
# Parse queries
total_queries = 0
total_responses = 0
queried_domains = Counter()
query_types = Counter()
client_ips = Counter()
server_ips = Counter()
for row in query_rows:
if len(row) < 6:
continue
is_response = row[5].strip()
domain = row[3].strip() if row[3] else ""
qtype = row[4].strip() if row[4] else ""
src_ip = row[1].strip() if row[1] else ""
dst_ip = row[2].strip() if row[2] else ""
if is_response == "0":
total_queries += 1
if domain:
queried_domains[domain] += 1
if qtype:
query_types[qtype] += 1
if src_ip:
client_ips[src_ip] += 1
if dst_ip:
server_ips[dst_ip] += 1
else:
total_responses += 1
# Parse responses for rcodes and answer data
rcode_counts = Counter()
nxdomain_domains = Counter()
answer_ips = Counter()
ttl_values = []
for row in response_rows:
if len(row) < 5:
continue
domain = row[3].strip() if row[3] else ""
rcode = row[4].strip() if row[4] else ""
if rcode:
rcode_name = _rcode_to_name(rcode)
rcode_counts[rcode_name] += 1
if rcode == "3":
if domain:
nxdomain_domains[domain] += 1
# Collect resolved IPs
for field_idx in [6, 7]:
if len(row) > field_idx and row[field_idx]:
for ip in row[field_idx].split(","):
ip = ip.strip()
if ip:
answer_ips[ip] += 1
# Collect TTLs
if len(row) > 9 and row[9]:
for ttl_str in row[9].split(","):
try:
ttl_values.append(int(ttl_str.strip()))
except ValueError:
pass
# --- DNS Response Time ---
# Use dns.time field which tshark calculates automatically
time_fields = ["dns.time"]
try:
time_rows = _run_tshark_fields(pcap_path, "dns.flags.response == 1 && dns.time", time_fields)
response_times = []
for row in time_rows:
if row and row[0]:
try:
response_times.append(float(row[0]))
except ValueError:
pass
except subprocess.CalledProcessError:
response_times = []
response_time_stats = {}
if response_times:
response_times.sort()
response_time_stats = {
"min_ms": round(min(response_times) * 1000, 2),
"max_ms": round(max(response_times) * 1000, 2),
"avg_ms": round((sum(response_times) / len(response_times)) * 1000, 2),
"median_ms": round(response_times[len(response_times) // 2] * 1000, 2),
"p95_ms": round(response_times[int(len(response_times) * 0.95)] * 1000, 2),
"samples": len(response_times),
}
# --- Anomaly Detection ---
anomalies = []
nxdomain_count = rcode_counts.get("NXDOMAIN", 0)
total_resp = sum(rcode_counts.values())
if total_resp > 0:
nxdomain_rate = (nxdomain_count / total_resp) * 100
if nxdomain_rate > 20:
anomalies.append({
"type": "high_nxdomain_rate",
"severity": "high",
"detail": f"NXDOMAIN rate is {nxdomain_rate:.1f}% ({nxdomain_count}/{total_resp}) - possible misconfiguration, DGA malware, or DNS tunneling",
"top_nxdomains": dict(nxdomain_domains.most_common(10)),
})
elif nxdomain_rate > 5:
anomalies.append({
"type": "elevated_nxdomain_rate",
"severity": "medium",
"detail": f"NXDOMAIN rate is {nxdomain_rate:.1f}% ({nxdomain_count}/{total_resp})",
"top_nxdomains": dict(nxdomain_domains.most_common(10)),
})
# Check for unusually long domain names (possible tunneling)
long_domains = [d for d in queried_domains if len(d) > 60]
if long_domains:
anomalies.append({
"type": "long_domain_names",
"severity": "high",
"detail": f"{len(long_domains)} domains exceed 60 characters - possible DNS tunneling or exfiltration",
"examples": long_domains[:10],
})
# Check for high subdomain entropy / many unique subdomains of same base
base_domain_counts = Counter()
for domain in queried_domains:
parts = domain.rstrip(".").split(".")
if len(parts) >= 3:
base = ".".join(parts[-2:])
base_domain_counts[base] += 1
tunneling_suspects = {base: count for base, count in base_domain_counts.items() if count > 50}
if tunneling_suspects:
anomalies.append({
"type": "high_subdomain_variety",
"severity": "high",
"detail": f"Domains with many unique subdomains (>50) - possible DNS tunneling",
"suspects": tunneling_suspects,
})
# Check for TXT queries (often used in tunneling)
txt_count = query_types.get("16", 0) # type 16 = TXT
if total_queries > 0 and txt_count > 10 and (txt_count / total_queries) > 0.1:
anomalies.append({
"type": "high_txt_query_rate",
"severity": "medium",
"detail": f"{txt_count} TXT queries ({(txt_count/total_queries)*100:.1f}% of queries) - TXT records are sometimes used for DNS tunneling",
})
# Slow responses
if response_time_stats and response_time_stats.get("p95_ms", 0) > 500:
anomalies.append({
"type": "slow_dns_responses",
"severity": "medium",
"detail": f"P95 response time is {response_time_stats['p95_ms']}ms - DNS resolution is slow",
})
# Map numeric query types to names
qtype_named = {}
for qtype_num, count in query_types.most_common(20):
qtype_named[_qtype_to_name(qtype_num)] = count
return {
"ok": True,
"project_name": project_name,
"pcap_name": pcap_name,
"total_dns_packets": len(query_rows),
"total_queries": total_queries,
"total_responses": total_responses,
"top_queried_domains": dict(queried_domains.most_common(25)),
"query_types": qtype_named,
"response_codes": dict(rcode_counts),
"nxdomain_count": nxdomain_count,
"nxdomain_domains": dict(nxdomain_domains.most_common(15)),
"top_dns_clients": dict(client_ips.most_common(10)),
"top_dns_servers": dict(server_ips.most_common(10)),
"top_resolved_ips": dict(answer_ips.most_common(15)),
"response_time_stats": response_time_stats,
"ttl_stats": {
"min": min(ttl_values) if ttl_values else None,
"max": max(ttl_values) if ttl_values else None,
"avg": round(sum(ttl_values) / len(ttl_values), 1) if ttl_values else None,
},
"anomalies": anomalies,
"unique_domains_queried": len(queried_domains),
}
except subprocess.TimeoutExpired:
logger.error("tshark command timed out")
return {"ok": False, "error": "Analysis timed out"}
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode("utf-8") if e.stderr else str(e)
logger.error(f"tshark failed: {error_msg}")
return {"ok": False, "error": f"tshark failed: {error_msg}"}
except Exception as e:
logger.error(f"Error in DNS analysis: {e}", exc_info=True)
return {"ok": False, "error": f"Failed: {str(e)}"}
def _rcode_to_name(rcode: str) -> str:
"""Map DNS response code number to name."""
rcode_map = {
"0": "NOERROR",
"1": "FORMERR",
"2": "SERVFAIL",
"3": "NXDOMAIN",
"4": "NOTIMP",
"5": "REFUSED",
"6": "YXDOMAIN",
"7": "YXRRSET",
"8": "NXRRSET",
"9": "NOTAUTH",
"10": "NOTZONE",
}
return rcode_map.get(rcode.strip(), f"RCODE_{rcode}")
def _qtype_to_name(qtype: str) -> str:
"""Map DNS query type number to name."""
qtype_map = {
"1": "A",
"2": "NS",
"5": "CNAME",
"6": "SOA",
"12": "PTR",
"15": "MX",
"16": "TXT",
"28": "AAAA",
"33": "SRV",
"35": "NAPTR",
"43": "DS",
"46": "RRSIG",
"47": "NSEC",
"48": "DNSKEY",
"52": "TLSA",
"65": "HTTPS",
"99": "SPF",
"255": "ANY",
"256": "URI",
"257": "CAA",
}
return qtype_map.get(qtype.strip(), f"TYPE{qtype}")