"""Local OSINT tools - Sherlock, Holehe, etc."""
import asyncio
import subprocess
import json
from typing import Any
from dataclasses import dataclass
@dataclass
class SherlockClient:
"""Client for Sherlock username enumeration."""
timeout: int = 60
async def search(self, username: str) -> dict[str, Any]:
"""
Search for username across social networks.
Args:
username: Username to search for
Returns:
List of platforms where username was found
"""
try:
proc = await asyncio.create_subprocess_exec(
"sherlock", username, "--print-found", "--timeout", "10",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=self.timeout,
)
output = stdout.decode()
# Parse sherlock output
found_profiles = []
for line in output.split("\n"):
if line.startswith("[+]") or "http" in line.lower():
# Extract URL
if "http" in line:
parts = line.split()
for part in parts:
if part.startswith("http"):
found_profiles.append(part)
return {
"username": username,
"profiles_found": len(found_profiles),
"profiles": found_profiles,
}
except FileNotFoundError:
return {"error": "Sherlock not installed. Run: pip install sherlock-project"}
except asyncio.TimeoutError:
return {"error": f"Sherlock search timed out after {self.timeout}s", "username": username}
except Exception as e:
return {"error": str(e)}
@dataclass
class HoleheCl:
"""Client for Holehe email service detection."""
timeout: int = 60
async def check(self, email: str) -> dict[str, Any]:
"""
Check which services an email is registered with.
Args:
email: Email address to check
Returns:
List of services where email is registered
"""
try:
proc = await asyncio.create_subprocess_exec(
"holehe", email, "--no-color",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=self.timeout,
)
output = stdout.decode()
# Parse holehe output
services = []
for line in output.split("\n"):
line = line.strip()
if "[+]" in line:
# Extract service name
parts = line.replace("[+]", "").strip().split()
if parts:
services.append(parts[0])
return {
"email": email,
"services_found": len(services),
"services": services,
}
except FileNotFoundError:
return {"error": "Holehe not installed. Run: pip install holehe"}
except asyncio.TimeoutError:
return {"error": f"Holehe check timed out after {self.timeout}s", "email": email}
except Exception as e:
return {"error": str(e)}
@dataclass
class DNSClient:
"""Simple DNS lookup client."""
async def lookup(self, domain: str) -> dict[str, Any]:
"""
Perform DNS lookups for a domain.
Args:
domain: Domain to look up
Returns:
DNS records (A, AAAA, MX, NS, TXT)
"""
import socket
results = {"domain": domain}
try:
# A records
try:
results["a_records"] = socket.gethostbyname_ex(domain)[2]
except socket.gaierror:
results["a_records"] = []
# Use dig for other records if available
for record_type in ["MX", "NS", "TXT"]:
try:
proc = await asyncio.create_subprocess_exec(
"dig", "+short", record_type, domain,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
records = [r.strip() for r in stdout.decode().split("\n") if r.strip()]
results[f"{record_type.lower()}_records"] = records
except:
results[f"{record_type.lower()}_records"] = []
return results
except Exception as e:
return {"error": str(e), "domain": domain}
@dataclass
class WHOISClient:
"""WHOIS lookup client."""
async def lookup(self, domain: str) -> dict[str, Any]:
"""
Perform WHOIS lookup for a domain.
Args:
domain: Domain to look up
Returns:
WHOIS registration data
"""
try:
proc = await asyncio.create_subprocess_exec(
"whois", domain,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode()
# Parse key fields
data = {"domain": domain, "raw": output[:2000]}
field_mappings = {
"Registrar:": "registrar",
"Creation Date:": "created",
"Updated Date:": "updated",
"Registry Expiry Date:": "expires",
"Registrant Organization:": "registrant_org",
"Registrant Country:": "registrant_country",
"Name Server:": "nameservers",
}
nameservers = []
for line in output.split("\n"):
for pattern, field in field_mappings.items():
if pattern.lower() in line.lower():
value = line.split(":", 1)[1].strip() if ":" in line else ""
if field == "nameservers":
nameservers.append(value)
else:
data[field] = value
if nameservers:
data["nameservers"] = nameservers
return data
except FileNotFoundError:
return {"error": "whois command not found", "domain": domain}
except Exception as e:
return {"error": str(e), "domain": domain}