We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/sarthaksiddha/Wireshark-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
HTTP protocol analyzer implementation.
"""
from typing import Dict, List, Any, Optional
from collections import defaultdict
import re
from urllib.parse import urlparse
from .base import BaseProtocolAnalyzer
class HTTPProtocolAnalyzer(BaseProtocolAnalyzer):
"""
HTTP protocol analyzer for extracting and analyzing HTTP traffic.
"""
protocol_name = "HTTP"
# Common HTTP status code categories
STATUS_CATEGORIES = {
"1xx": "Informational",
"2xx": "Success",
"3xx": "Redirection",
"4xx": "Client Error",
"5xx": "Server Error"
}
# Security-sensitive HTTP headers
SECURITY_HEADERS = [
"content-security-policy",
"x-xss-protection",
"x-content-type-options",
"strict-transport-security",
"x-frame-options",
"referrer-policy"
]
def extract_features(self,
packets: List[Dict[str, Any]],
include_headers: bool = True,
include_body: bool = False,
**kwargs) -> Dict[str, Any]:
"""
Extract HTTP-specific features from packet data.
Args:
packets: List of packet dictionaries
include_headers: Whether to include HTTP headers
include_body: Whether to include HTTP bodies
**kwargs: Additional extraction parameters
Returns:
Dictionary of extracted HTTP features
"""
# Filter to only HTTP packets
http_packets = self._filter_packets(packets)
# Group packets into HTTP conversations
conversations = self._extract_conversations(http_packets)
# Extract HTTP requests and responses
http_data = {
"conversations": {},
"statistics": {
"total_requests": 0,
"total_responses": 0,
"status_codes": defaultdict(int),
"methods": defaultdict(int),
"hosts": defaultdict(int),
"content_types": defaultdict(int),
"total_bytes": 0
},
"security_findings": []
}
# Process each conversation
for conv_id, conv_packets in conversations.items():
requests = []
responses = []
for packet in conv_packets:
http = packet.get('http', {})
# Determine if this is a request or response
if 'request' in http:
requests.append(self._extract_http_request(packet, include_headers, include_body))
http_data["statistics"]["total_requests"] += 1
# Update method statistics
method = http.get('request_method', '')
if method:
http_data["statistics"]["methods"][method] += 1
# Update host statistics
host = None
for header in http.get('request_header', []):
if header.get('name', '').lower() == 'host':
host = header.get('value', '')
break
if host:
http_data["statistics"]["hosts"][host] += 1
elif 'response' in http:
response = self._extract_http_response(packet, include_headers, include_body)
responses.append(response)
http_data["statistics"]["total_responses"] += 1
# Update status code statistics
status_code = http.get('response_code', '')
if status_code:
http_data["statistics"]["status_codes"][status_code] += 1
# Update content type statistics
content_type = None
for header in http.get('response_header', []):
if header.get('name', '').lower() == 'content-type':
content_type = header.get('value', '').split(';')[0].strip()
break
if content_type:
http_data["statistics"]["content_types"][content_type] += 1
# Check for missing security headers
self._check_security_headers(response, http_data["security_findings"])
# Update byte count
http_data["statistics"]["total_bytes"] += int(packet.get('length', 0))
# Match requests with responses where possible
matched_exchanges = self._match_requests_responses(requests, responses)
http_data["conversations"][conv_id] = {
"exchanges": matched_exchanges,
"unmatched_requests": [req for req in requests if not req.get('_matched')],
"unmatched_responses": [resp for resp in responses if not resp.get('_matched')]
}
# Remove temporary matching flags
for req in requests:
req.pop('_matched', None)
for resp in responses:
resp.pop('_matched', None)
return http_data
def generate_context(self,
features: Dict[str, Any],
detail_level: int = 2,
max_conversations: int = 10,
**kwargs) -> Dict[str, Any]:
"""
Generate AI-friendly context from HTTP features.
Args:
features: Dictionary of extracted HTTP features
detail_level: Level of detail (1-3, where 3 is most detailed)
max_conversations: Maximum number of conversations to include
**kwargs: Additional context parameters
Returns:
Dictionary with formatted HTTP context
"""
conversations = features.get("conversations", {})
statistics = features.get("statistics", {})
security_findings = features.get("security_findings", [])
# Prepare context
context = {
"protocol": "HTTP",
"summary": {
"total_requests": statistics.get("total_requests", 0),
"total_responses": statistics.get("total_responses", 0),
"total_conversations": len(conversations),
"total_bytes_transferred": statistics.get("total_bytes", 0)
},
"top_statistics": {
"status_codes": self._get_top_items(statistics.get("status_codes", {}), 10),
"methods": self._get_top_items(statistics.get("methods", {}), 5),
"hosts": self._get_top_items(statistics.get("hosts", {}), 5),
"content_types": self._get_top_items(statistics.get("content_types", {}), 5)
},
"security": {
"findings": security_findings[:10] # Limit to top 10 findings
},
"conversations": {}
}
# Add HTTP status code descriptions
status_descriptions = {}
for status, count in statistics.get("status_codes", {}).items():
category = self._get_status_category(status)
status_descriptions[status] = f"{status} - {category}"
context["status_code_descriptions"] = status_descriptions
# Add conversation details based on detail level
sorted_conversations = sorted(
conversations.items(),
key=lambda x: len(x[1]["exchanges"]),
reverse=True
)[:max_conversations]
for conv_id, conv_data in sorted_conversations:
exchanges = conv_data["exchanges"]
context["conversations"][conv_id] = {
"exchanges_count": len(exchanges),
"exchanges": []
}
# Add exchange details based on detail level
for exchange in exchanges:
exchange_summary = {
"request": {
"method": exchange["request"].get("method", ""),
"uri": exchange["request"].get("uri", ""),
"version": exchange["request"].get("version", "")
},
"response": {
"status_code": exchange["response"].get("status_code", ""),
"status_phrase": exchange["response"].get("status_phrase", ""),
"content_length": exchange["response"].get("content_length", 0),
"content_type": exchange["response"].get("content_type", "")
}
}
# Add headers for higher detail levels
if detail_level >= 2:
exchange_summary["request"]["headers"] = exchange["request"].get("headers", {})
exchange_summary["response"]["headers"] = exchange["response"].get("headers", {})
# Add body preview for highest detail level
if detail_level >= 3:
# Include a preview of the body (truncated)
req_body = exchange["request"].get("body", "")
resp_body = exchange["response"].get("body", "")
exchange_summary["request"]["body_preview"] = req_body[:200] + "..." if len(req_body) > 200 else req_body
exchange_summary["response"]["body_preview"] = resp_body[:200] + "..." if len(resp_body) > 200 else resp_body
context["conversations"][conv_id]["exchanges"].append(exchange_summary)
return context
def extract_insights(self,
packets: List[Dict[str, Any]],
extract_queries: bool = True,
analyze_response_codes: bool = True,
detect_tunneling: bool = False,
**kwargs) -> Dict[str, Any]:
"""
Extract deeper HTTP-specific insights.
Args:
packets: List of packet dictionaries
extract_queries: Whether to extract URL query patterns
analyze_response_codes: Whether to analyze response code patterns
detect_tunneling: Whether to look for HTTP tunneling
**kwargs: Additional parameters
Returns:
Dictionary of HTTP insights
"""
features = self.extract_features(packets)
insights = {
"protocol": "HTTP",
"findings": [],
"patterns": {}
}
# Extract URL and query patterns
if extract_queries:
query_patterns = self._analyze_query_patterns(features)
insights["patterns"]["queries"] = query_patterns
# Analyze response code patterns
if analyze_response_codes:
code_patterns = self._analyze_response_codes(features)
insights["patterns"]["response_codes"] = code_patterns
# Detect possible HTTP tunneling
if detect_tunneling:
tunneling = self._detect_tunneling(features)
if tunneling:
insights["tunneling"] = tunneling
return insights
def _extract_http_request(self,
packet: Dict[str, Any],
include_headers: bool = True,
include_body: bool = False) -> Dict[str, Any]:
"""Extract HTTP request details from a packet."""
http = packet.get('http', {})
request = {
"method": http.get('request_method', ''),
"uri": http.get('request_uri', ''),
"version": http.get('request_version', ''),
"timestamp": packet.get('timestamp', 0),
"frame_number": packet.get('frame_number', '')
}
# Parse URI components if available
uri = request["uri"]
if uri:
parsed_uri = urlparse(uri)
request["uri_path"] = parsed_uri.path
request["uri_query"] = parsed_uri.query
# Extract headers if requested
if include_headers:
headers = {}
for header in http.get('request_header', []):
name = header.get('name', '').lower()
value = header.get('value', '')
headers[name] = value
request["headers"] = headers
# Extract body if requested
if include_body and 'request_body' in http:
request["body"] = http.get('request_body', '')
request["body_length"] = len(request["body"])
return request
def _extract_http_response(self,
packet: Dict[str, Any],
include_headers: bool = True,
include_body: bool = False) -> Dict[str, Any]:
"""Extract HTTP response details from a packet."""
http = packet.get('http', {})
response = {
"status_code": http.get('response_code', ''),
"status_phrase": http.get('response_phrase', ''),
"version": http.get('response_version', ''),
"timestamp": packet.get('timestamp', 0),
"frame_number": packet.get('frame_number', '')
}
# Extract headers if requested
if include_headers:
headers = {}
for header in http.get('response_header', []):
name = header.get('name', '').lower()
value = header.get('value', '')
headers[name] = value
response["headers"] = headers
# Extract common useful headers
response["content_type"] = headers.get('content-type', '').split(';')[0].strip()
try:
response["content_length"] = int(headers.get('content-length', 0))
except (ValueError, TypeError):
response["content_length"] = 0
# Extract body if requested
if include_body and 'response_body' in http:
response["body"] = http.get('response_body', '')
response["body_length"] = len(response["body"])
return response
def _match_requests_responses(self,
requests: List[Dict[str, Any]],
responses: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Match HTTP requests with their corresponding responses."""
exchanges = []
# Sort by timestamp
sorted_requests = sorted(requests, key=lambda x: float(x.get('timestamp', 0)))
sorted_responses = sorted(responses, key=lambda x: float(x.get('timestamp', 0)))
# Match requests with responses
for req in sorted_requests:
req_time = float(req.get('timestamp', 0))
# Find the first response that comes after this request
for resp in sorted_responses:
if resp.get('_matched'):
continue
resp_time = float(resp.get('timestamp', 0))
if resp_time > req_time:
# Match found
exchanges.append({
"request": req,
"response": resp,
"time_delta": resp_time - req_time
})
# Mark as matched
req['_matched'] = True
resp['_matched'] = True
break
return exchanges
def _check_security_headers(self,
response: Dict[str, Any],
findings: List[str]) -> None:
"""Check HTTP response for missing security headers."""
if 'headers' not in response:
return
headers = response.get('headers', {})
for header in self.SECURITY_HEADERS:
if header not in headers:
findings.append(f"Missing security header: {header}")
def _get_status_category(self, status_code: str) -> str:
"""Get the category for an HTTP status code."""
try:
category_key = status_code[0] + "xx"
return self.STATUS_CATEGORIES.get(category_key, "Unknown")
except (IndexError, TypeError):
return "Unknown"
def _get_top_items(self,
items: Dict[str, int],
limit: int = 10) -> Dict[str, int]:
"""Get the top N items from a dictionary by count."""
return dict(sorted(items.items(), key=lambda x: x[1], reverse=True)[:limit])
def _analyze_query_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze HTTP query patterns."""
query_patterns = {
"common_parameters": defaultdict(int),
"parameter_types": defaultdict(set),
"suspicious_patterns": []
}
# Extract all request URIs with queries
for conv_id, conv_data in features.get("conversations", {}).items():
for exchange in conv_data.get("exchanges", []):
request = exchange.get("request", {})
uri = request.get("uri", "")
if "?" in uri:
parsed = urlparse(uri)
query = parsed.query
if query:
# Extract parameters
params = query.split("&")
for param in params:
if "=" in param:
name, value = param.split("=", 1)
query_patterns["common_parameters"][name] += 1
# Detect parameter types
param_type = self._detect_parameter_type(value)
query_patterns["parameter_types"][name].add(param_type)
# Check for suspicious patterns
if self._is_suspicious_parameter(name, value):
query_patterns["suspicious_patterns"].append({
"parameter": name,
"value_sample": value,
"uri": uri
})
# Convert parameter types from sets to lists
query_patterns["parameter_types"] = {
k: list(v) for k, v in query_patterns["parameter_types"].items()
}
# Get top parameters
query_patterns["top_parameters"] = dict(
sorted(query_patterns["common_parameters"].items(),
key=lambda x: x[1],
reverse=True)[:10]
)
return query_patterns
def _detect_parameter_type(self, value: str) -> str:
"""Detect the data type of a parameter value."""
if not value:
return "empty"
# Check for numeric types
if value.isdigit():
return "integer"
if re.match(r'^-?\d+(\.\d+)?$', value):
return "number"
# Check for dates
if re.match(r'^\d{4}-\d{2}-\d{2}', value):
return "date"
# Check for UUIDs
if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', value, re.I):
return "uuid"
# Check for email addresses
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return "email"
# Check for URLs
if re.match(r'^https?://', value):
return "url"
# Otherwise, assume it's a string
return "string"
def _is_suspicious_parameter(self, name: str, value: str) -> bool:
"""Check if a parameter name or value looks suspicious."""
# Check for common injection patterns
suspicious_patterns = [
r"['\"].*--", # SQL injection
r"<script.*>", # XSS
r"/etc/passwd", # Path traversal
r"\.\.(/|\\)", # Directory traversal
r";.*\s*\w+\s*=", # Command injection
r"(exec|eval|system)\(", # Code injection
]
for pattern in suspicious_patterns:
if re.search(pattern, value, re.I):
return True
# Check for suspicious parameter names
suspicious_names = [
"passwd", "password", "pwd",
"token", "key", "secret",
"command", "cmd", "exec",
"query", "sql", "debug"
]
name_lower = name.lower()
for sus_name in suspicious_names:
if sus_name in name_lower:
return True
return False
def _analyze_response_codes(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze HTTP response code patterns."""
status_codes = features.get("statistics", {}).get("status_codes", {})
code_patterns = {
"success_rate": 0,
"error_rate": 0,
"redirection_rate": 0,
"top_codes": {},
"unusual_codes": []
}
total_responses = features.get("statistics", {}).get("total_responses", 0)
if total_responses > 0:
success_count = sum(count for code, count in status_codes.items()
if code.startswith('2'))
error_count = sum(count for code, count in status_codes.items()
if code.startswith('4') or code.startswith('5'))
redirect_count = sum(count for code, count in status_codes.items()
if code.startswith('3'))
code_patterns["success_rate"] = success_count / total_responses
code_patterns["error_rate"] = error_count / total_responses
code_patterns["redirection_rate"] = redirect_count / total_responses
# Get top status codes
code_patterns["top_codes"] = dict(
sorted(status_codes.items(), key=lambda x: x[1], reverse=True)[:5]
)
# Identify unusual status codes
common_codes = {'200', '301', '302', '304', '400', '401', '403', '404', '500'}
for code in status_codes:
if code not in common_codes:
code_patterns["unusual_codes"].append(code)
return code_patterns
def _detect_tunneling(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Detect potential HTTP tunneling."""
tunneling_indicators = []
# Look for indicators of HTTP tunneling
for conv_id, conv_data in features.get("conversations", {}).items():
for exchange in conv_data.get("exchanges", []):
request = exchange.get("request", {})
response = exchange.get("response", {})
# Check for CONNECT method
if request.get("method") == "CONNECT":
tunneling_indicators.append({
"type": "CONNECT method",
"uri": request.get("uri", ""),
"frame": request.get("frame_number")
})
# Check for unusually large request or response bodies
if request.get("body_length", 0) > 10000:
tunneling_indicators.append({
"type": "Large request body",
"size": request.get("body_length", 0),
"uri": request.get("uri", ""),
"frame": request.get("frame_number")
})
# Check for unusual content types in large responses
if response.get("body_length", 0) > 10000:
content_type = response.get("content_type", "")
if content_type and content_type not in ["text/html", "application/json", "text/javascript"]:
tunneling_indicators.append({
"type": "Unusual content type for large response",
"content_type": content_type,
"size": response.get("body_length", 0),
"frame": response.get("frame_number")
})
if tunneling_indicators:
return {
"detected": True,
"indicators": tunneling_indicators
}
return {"detected": False}