We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/sarthaksiddha/Wireshark-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Security Analyzer module for Wireshark MCP.
This module provides advanced security analysis for network packet captures,
detecting potential security issues, suspicious patterns, and anomalies.
"""
import ipaddress
import itertools
import logging
from typing import List, Dict, Any, Set, Tuple
logger = logging.getLogger(__name__)
class SecurityAnalyzer:
"""
Performs security-focused analysis on network packet captures.
Features include detection of:
- Port scanning patterns
- Known malware communication patterns
- Unusual port usage
- Unencrypted protocols
- Suspicious behavioral patterns
"""
def __init__(self, packets: List[Dict[str, Any]]):
"""
Initialize the security analyzer with packet data.
Args:
packets: List of packet dictionaries
"""
self.packets = packets
self.ip_data = {} # Store IP-related data
self.connection_data = {} # Store connection data
self.port_data = {} # Store port usage data
# Known suspicious ports (example list, would be more comprehensive in production)
self.suspicious_ports = {
'4444': 'Metasploit default',
'1080': 'SOCKS proxy',
'6667': 'IRC (often used by botnets)',
'6666': 'IRC alternate',
'31337': 'Back Orifice',
'12345': 'NetBus',
'5900': 'VNC',
'3389': 'RDP',
'8080': 'Common alternate HTTP',
'8443': 'Common alternate HTTPS',
'6000': 'X11',
}
# Common well-known ports that are less suspicious
self.common_ports = {
'80': 'HTTP',
'443': 'HTTPS',
'22': 'SSH',
'23': 'Telnet',
'25': 'SMTP',
'53': 'DNS',
'110': 'POP3',
'143': 'IMAP',
'161': 'SNMP',
'389': 'LDAP',
'3306': 'MySQL',
'5432': 'PostgreSQL',
'27017': 'MongoDB',
}
# Initialize data structures from packets
self._process_packets()
def _process_packets(self):
"""Process packets to extract security-relevant information."""
for packet in self.packets:
# Process IP layer data
if 'ip' in packet:
src_ip = packet['ip'].get('src')
dst_ip = packet['ip'].get('dst')
if src_ip:
if src_ip not in self.ip_data:
self.ip_data[src_ip] = {
'sent_packets': 0,
'received_packets': 0,
'dst_ips': set(),
'dst_ports': {}, # port -> count
'protocols': set(),
}
self.ip_data[src_ip]['sent_packets'] += 1
if dst_ip:
self.ip_data[src_ip]['dst_ips'].add(dst_ip)
if dst_ip:
if dst_ip not in self.ip_data:
self.ip_data[dst_ip] = {
'sent_packets': 0,
'received_packets': 0,
'dst_ips': set(),
'dst_ports': {},
'protocols': set(),
}
self.ip_data[dst_ip]['received_packets'] += 1
# Process TCP and UDP port data
for proto in ['tcp', 'udp']:
if proto in packet:
src_port = packet[proto].get('srcport')
dst_port = packet[proto].get('dstport')
# Protocol detection
if 'ip' in packet and src_ip:
if proto == 'tcp':
self.ip_data[src_ip]['protocols'].add('TCP')
elif proto == 'udp':
self.ip_data[src_ip]['protocols'].add('UDP')
# Track destination ports
if src_ip and dst_port:
if dst_port not in self.ip_data[src_ip]['dst_ports']:
self.ip_data[src_ip]['dst_ports'][dst_port] = 0
self.ip_data[src_ip]['dst_ports'][dst_port] += 1
# Track general port usage
for port in [src_port, dst_port]:
if port:
if port not in self.port_data:
self.port_data[port] = {
'count': 0,
'protocol': proto.upper(),
'ips': set(),
}
self.port_data[port]['count'] += 1
if 'ip' in packet and src_ip:
self.port_data[port]['ips'].add(src_ip)
if 'ip' in packet and dst_ip:
self.port_data[port]['ips'].add(dst_ip)
# Process protocol information
for layer in packet.get('layers', []):
protocol = layer.get('protocol')
if protocol and 'ip' in packet and src_ip:
self.ip_data[src_ip]['protocols'].add(protocol.upper())
# Connection tracking (for flow analysis)
if 'ip' in packet and 'tcp' in packet:
src_ip = packet['ip'].get('src')
dst_ip = packet['ip'].get('dst')
src_port = packet['tcp'].get('srcport')
dst_port = packet['tcp'].get('dstport')
if src_ip and dst_ip and src_port and dst_port:
conn_key = f"{src_ip}:{src_port}-{dst_ip}:{dst_port}"
if conn_key not in self.connection_data:
self.connection_data[conn_key] = {
'packets': 0,
'bytes': 0,
'src_ip': src_ip,
'dst_ip': dst_ip,
'src_port': src_port,
'dst_port': dst_port,
'protocol': 'TCP',
'flags': set(),
}
self.connection_data[conn_key]['packets'] += 1
# Track packet length
if 'length' in packet:
self.connection_data[conn_key]['bytes'] += int(packet['length'])
# Track TCP flags
if 'flags' in packet['tcp']:
for flag, value in packet['tcp']['flags'].items():
if int(value) == 1:
self.connection_data[conn_key]['flags'].add(flag)
def analyze(self,
detect_scanning: bool = True,
detect_malware_patterns: bool = True,
highlight_unusual_ports: bool = True,
check_encryption: bool = True) -> Dict[str, Any]:
"""
Perform security analysis on the packet capture.
Args:
detect_scanning: Whether to look for port scanning patterns
detect_malware_patterns: Whether to check for known malware patterns
highlight_unusual_ports: Whether to highlight unusual port usage
check_encryption: Whether to analyze encryption usage
Returns:
Dictionary containing security analysis results
"""
results = {
'summary': {
'total_packets': len(self.packets),
'unique_ips': len(self.ip_data),
'unique_ports': len(self.port_data),
'connections': len(self.connection_data),
},
'alerts': [],
}
# Detect port scanning
if detect_scanning:
scanning_results = self._detect_scanning()
results['port_scanning'] = scanning_results
# Add alerts for significant findings
for scanner in scanning_results.get('potential_scanners', []):
results['alerts'].append({
'type': 'port_scan',
'severity': 'medium',
'source_ip': scanner['ip'],
'description': f"Potential port scan: {scanner['unique_ports']} ports across {scanner['unique_hosts']} hosts",
'details': scanner
})
# Check for malware patterns
if detect_malware_patterns:
malware_results = self._detect_malware_patterns()
results['malware_indicators'] = malware_results
# Add alerts for significant findings
for indicator in malware_results.get('indicators', []):
results['alerts'].append({
'type': 'malware_indicator',
'severity': 'high',
'source_ip': indicator['ip'],
'description': indicator['description'],
'details': indicator
})
# Highlight unusual ports
if highlight_unusual_ports:
port_results = self._highlight_unusual_ports()
results['unusual_ports'] = port_results
# Add alerts for significant findings
for port_info in port_results.get('suspicious_ports', []):
results['alerts'].append({
'type': 'suspicious_port',
'severity': 'low',
'port': port_info['port'],
'protocol': port_info['protocol'],
'description': f"Suspicious port {port_info['port']}/{port_info['protocol']} ({port_info['service_name']}) used by {len(port_info['ips'])} hosts",
'details': port_info
})
# Check encryption
if check_encryption:
encryption_results = self._check_encryption()
results['encryption_analysis'] = encryption_results
# Add alerts for significant findings
for unencrypted in encryption_results.get('unencrypted_services', []):
results['alerts'].append({
'type': 'unencrypted_traffic',
'severity': 'medium',
'port': unencrypted['port'],
'protocol': unencrypted['protocol'],
'description': f"Unencrypted {unencrypted['service']} traffic detected on port {unencrypted['port']}",
'details': unencrypted
})
# Sort alerts by severity
severity_rank = {'high': 0, 'medium': 1, 'low': 2}
results['alerts'] = sorted(
results['alerts'],
key=lambda x: severity_rank.get(x['severity'], 100)
)
return results
def _detect_scanning(self) -> Dict[str, Any]:
"""
Detect potential port scanning activity.
Returns:
Dictionary with port scanning analysis
"""
results = {
'potential_scanners': [],
'scan_targets': [],
}
# Method 1: IP that connects to many ports on a single host
for src_ip, data in self.ip_data.items():
# Group by destination IP
ip_port_map = {}
for dst_ip in data['dst_ips']:
ip_port_map[dst_ip] = set()
# Count ports per destination
for conn_key, conn_data in self.connection_data.items():
if conn_data['src_ip'] == src_ip:
dst_ip = conn_data['dst_ip']
if dst_ip in ip_port_map:
ip_port_map[dst_ip].add(conn_data['dst_port'])
# Check for scanning behavior
for dst_ip, ports in ip_port_map.items():
if len(ports) >= 10: # Arbitrary threshold
results['potential_scanners'].append({
'ip': src_ip,
'target': dst_ip,
'unique_ports': len(ports),
'ports': list(ports)[:20], # Limit to 20 examples
'unique_hosts': 1,
'scan_type': 'vertical'
})
# Method 2: IP that connects to the same port on many hosts
for src_ip, data in self.ip_data.items():
if len(data['dst_ips']) > 10: # Arbitrary threshold
# Count same port connections
port_host_map = {}
for conn_key, conn_data in self.connection_data.items():
if conn_data['src_ip'] == src_ip:
dst_port = conn_data['dst_port']
if dst_port not in port_host_map:
port_host_map[dst_port] = set()
port_host_map[dst_port].add(conn_data['dst_ip'])
# Check for horizontal scanning
for port, hosts in port_host_map.items():
if len(hosts) > 5: # Arbitrary threshold
results['potential_scanners'].append({
'ip': src_ip,
'port': port,
'unique_hosts': len(hosts),
'hosts': list(hosts)[:20], # Limit to 20 examples
'unique_ports': 1,
'scan_type': 'horizontal'
})
# Method 3: Identify scan targets
target_count = {}
for scanner in results['potential_scanners']:
if 'target' in scanner:
target = scanner['target']
if target not in target_count:
target_count[target] = 0
target_count[target] += 1
# Hosts targeted by multiple scanners
for target, count in target_count.items():
if count > 1:
results['scan_targets'].append({
'ip': target,
'scanner_count': count
})
return results
def _detect_malware_patterns(self) -> Dict[str, Any]:
"""
Detect common malware communication patterns.
Returns:
Dictionary with malware indicator analysis
"""
results = {
'indicators': [],
}
# Pattern 1: Beaconing behavior (regular communication intervals)
# Simplified version - in production would analyze temporal patterns
# Pattern 2: Connection to known suspicious ports
suspicious_connections = []
for conn_key, conn_data in self.connection_data.items():
dst_port = conn_data['dst_port']
if dst_port in self.suspicious_ports:
suspicious_connections.append({
'src_ip': conn_data['src_ip'],
'dst_ip': conn_data['dst_ip'],
'port': dst_port,
'service': self.suspicious_ports[dst_port],
'packets': conn_data['packets']
})
# Pattern 3: Unusual protocol behavior
# e.g., DNS requests with abnormally high data volume or frequency
unusual_protocols = []
for ip, data in self.ip_data.items():
protocol_count = {}
for protocol in data['protocols']:
if protocol not in protocol_count:
protocol_count[protocol] = 0
protocol_count[protocol] += 1
# Detect unusual protocol patterns (example: HTTP over non-standard port)
# This is a simplified example
# Combine detected patterns into indicators
for conn in suspicious_connections:
results['indicators'].append({
'ip': conn['src_ip'],
'type': 'suspicious_port',
'description': f"Connection to suspicious service: {conn['service']} on port {conn['port']}",
'details': conn
})
# Add other indicators based on analysis...
return results
def _highlight_unusual_ports(self) -> Dict[str, Any]:
"""
Highlight unusual port usage.
Returns:
Dictionary with unusual port analysis
"""
results = {
'high_numbered_ports': [],
'suspicious_ports': [],
'uncommon_service_ports': [],
}
for port, data in self.port_data.items():
port_num = int(port)
# High-numbered ephemeral ports with server behavior
if port_num > 10000 and data['count'] > 5:
results['high_numbered_ports'].append({
'port': port,
'protocol': data['protocol'],
'packet_count': data['count'],
'ips': list(data['ips'])
})
# Known suspicious ports
if port in self.suspicious_ports:
results['suspicious_ports'].append({
'port': port,
'protocol': data['protocol'],
'packet_count': data['count'],
'ips': list(data['ips']),
'service_name': self.suspicious_ports[port]
})
# Standard services on non-standard ports
# This is a simplified example - would be more sophisticated in production
return results
def _check_encryption(self) -> Dict[str, Any]:
"""
Analyze encryption usage in the traffic.
Returns:
Dictionary with encryption analysis
"""
results = {
'encrypted_protocols': [],
'unencrypted_services': [],
'encryption_statistics': {
'encrypted_packets': 0,
'unencrypted_packets': 0,
'encrypted_bytes': 0,
'unencrypted_bytes': 0,
}
}
# Known encrypted protocols
encrypted_protocols = {'TLS', 'SSL', 'SSH', 'HTTPS', 'SMTPS', 'FTPS', 'SFTP', 'IMAPS', 'POP3S', 'LDAPS'}
# Known unencrypted protocols that should be encrypted
sensitive_unencrypted = {
'HTTP': 80,
'FTP': 21,
'TELNET': 23,
'SMTP': 25,
'POP3': 110,
'IMAP': 143,
'SNMP': 161,
'LDAP': 389,
}
# Count encrypted vs unencrypted
encrypted_conn_count = 0
unencrypted_conn_count = 0
# Track services observed on standard ports
observed_services = {}
# Analyze connections
for conn_key, conn_data in self.connection_data.items():
is_encrypted = False
for protocol in self.ip_data.get(conn_data['src_ip'], {}).get('protocols', set()):
if protocol in encrypted_protocols:
is_encrypted = True
break
# Classify connection
if is_encrypted:
encrypted_conn_count += 1
results['encryption_statistics']['encrypted_packets'] += conn_data['packets']
results['encryption_statistics']['encrypted_bytes'] += conn_data['bytes']
else:
unencrypted_conn_count += 1
results['encryption_statistics']['unencrypted_packets'] += conn_data['packets']
results['encryption_statistics']['unencrypted_bytes'] += conn_data['bytes']
# Check if this is a sensitive unencrypted service
dst_port = conn_data['dst_port']
for service, port in sensitive_unencrypted.items():
if dst_port == str(port):
observed_services[service] = observed_services.get(service, 0) + 1
# Add results for unencrypted sensitive services
for service, count in observed_services.items():
port = sensitive_unencrypted[service]
results['unencrypted_services'].append({
'service': service,
'port': str(port),
'protocol': 'TCP', # Most services are TCP, could be more precise
'connection_count': count
})
return results