"""Shodan API client for infrastructure intelligence."""
import httpx
from typing import Optional, Any
from dataclasses import dataclass
@dataclass
class ShodanClient:
"""Client for Shodan API."""
api_key: Optional[str] = None
base_url: str = "https://api.shodan.io"
_client: Optional[httpx.AsyncClient] = None
async def start(self):
"""Initialize HTTP client."""
if not self._client:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=30.0,
)
async def close(self):
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def host_lookup(self, ip: str) -> dict[str, Any]:
"""
Get Shodan data for an IP address.
Args:
ip: IP address to look up
Returns:
Host info including ports, services, vulns
"""
if not self.api_key:
return {"error": "Shodan API key not configured"}
await self.start()
try:
resp = await self._client.get(f"/shodan/host/{ip}", params={"key": self.api_key})
resp.raise_for_status()
data = resp.json()
return {
"ip": ip,
"hostnames": data.get("hostnames", []),
"country": data.get("country_name"),
"city": data.get("city"),
"org": data.get("org"),
"asn": data.get("asn"),
"isp": data.get("isp"),
"ports": data.get("ports", []),
"vulns": data.get("vulns", []),
"last_update": data.get("last_update"),
"services": [
{
"port": svc.get("port"),
"protocol": svc.get("transport"),
"product": svc.get("product"),
"version": svc.get("version"),
"banner": svc.get("data", "")[:500],
}
for svc in data.get("data", [])
],
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {"ip": ip, "found": False, "message": "No data available"}
return {"error": f"Shodan API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def domain_lookup(self, domain: str) -> dict[str, Any]:
"""
Get Shodan DNS data for a domain.
Args:
domain: Domain to look up
Returns:
Subdomains and DNS records
"""
if not self.api_key:
return {"error": "Shodan API key not configured"}
await self.start()
try:
resp = await self._client.get(f"/dns/domain/{domain}", params={"key": self.api_key})
resp.raise_for_status()
data = resp.json()
return {
"domain": domain,
"subdomains": data.get("subdomains", []),
"records": data.get("data", []),
"total": len(data.get("subdomains", [])),
}
except httpx.HTTPStatusError as e:
return {"error": f"Shodan API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def search(self, query: str, limit: int = 10) -> dict[str, Any]:
"""
Search Shodan for hosts.
Args:
query: Shodan search query
limit: Max results
Returns:
Matching hosts
"""
if not self.api_key:
return {"error": "Shodan API key not configured"}
await self.start()
try:
resp = await self._client.get(
"/shodan/host/search",
params={"key": self.api_key, "query": query, "limit": limit},
)
resp.raise_for_status()
data = resp.json()
return {
"query": query,
"total": data.get("total", 0),
"matches": [
{
"ip": m.get("ip_str"),
"port": m.get("port"),
"org": m.get("org"),
"product": m.get("product"),
"country": m.get("location", {}).get("country_name"),
}
for m in data.get("matches", [])[:limit]
],
}
except httpx.HTTPStatusError as e:
return {"error": f"Shodan API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}