"""Have I Been Pwned API client for breach checking."""
import httpx
from typing import Optional, Any
from dataclasses import dataclass
@dataclass
class HIBPClient:
"""Client for Have I Been Pwned API."""
api_key: Optional[str] = None
base_url: str = "https://haveibeenpwned.com/api/v3"
_client: Optional[httpx.AsyncClient] = None
async def start(self):
"""Initialize HTTP client."""
if not self._client and self.api_key:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"hibp-api-key": self.api_key,
"User-Agent": "OSINT-MCP",
},
timeout=30.0,
)
async def close(self):
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def check_breaches(self, email: str) -> dict[str, Any]:
"""
Check if email has been in data breaches.
Args:
email: Email address to check
Returns:
List of breaches and risk assessment
"""
if not self.api_key:
return {"error": "HIBP API key not configured"}
await self.start()
try:
resp = await self._client.get(
f"/breachedaccount/{email}",
params={"truncateResponse": "false"},
)
if resp.status_code == 404:
return {
"email": email,
"breached": False,
"breach_count": 0,
"breaches": [],
"risk_level": "low",
}
resp.raise_for_status()
breaches = resp.json()
# Calculate risk level
sensitive_count = sum(1 for b in breaches if b.get("IsSensitive"))
password_count = sum(1 for b in breaches if "Passwords" in b.get("DataClasses", []))
if sensitive_count > 0 or password_count > 3:
risk_level = "critical"
elif password_count > 0 or len(breaches) > 5:
risk_level = "high"
elif len(breaches) > 2:
risk_level = "medium"
else:
risk_level = "low"
return {
"email": email,
"breached": True,
"breach_count": len(breaches),
"risk_level": risk_level,
"breaches": [
{
"name": b.get("Name"),
"title": b.get("Title"),
"domain": b.get("Domain"),
"breach_date": b.get("BreachDate"),
"added_date": b.get("AddedDate"),
"pwn_count": b.get("PwnCount"),
"data_classes": b.get("DataClasses", []),
"is_verified": b.get("IsVerified"),
"is_sensitive": b.get("IsSensitive"),
}
for b in breaches
],
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
return {"error": "Invalid HIBP API key"}
return {"error": f"HIBP API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def check_pastes(self, email: str) -> dict[str, Any]:
"""
Check if email appears in paste sites.
Args:
email: Email address to check
Returns:
List of pastes containing the email
"""
if not self.api_key:
return {"error": "HIBP API key not configured"}
await self.start()
try:
resp = await self._client.get(f"/pasteaccount/{email}")
if resp.status_code == 404:
return {
"email": email,
"paste_count": 0,
"pastes": [],
}
resp.raise_for_status()
pastes = resp.json()
return {
"email": email,
"paste_count": len(pastes),
"pastes": [
{
"source": p.get("Source"),
"id": p.get("Id"),
"title": p.get("Title"),
"date": p.get("Date"),
"email_count": p.get("EmailCount"),
}
for p in pastes
],
}
except httpx.HTTPStatusError as e:
return {"error": f"HIBP API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}