"""Censys Platform API client."""
import httpx
from typing import Optional, Any
from dataclasses import dataclass
@dataclass
class CensysClient:
"""Client for Censys Platform API v3."""
api_token: Optional[str] = None
base_url: str = "https://api.platform.censys.io/v3"
_client: Optional[httpx.AsyncClient] = None
async def start(self):
"""Initialize HTTP client."""
if not self._client:
headers = {}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=headers,
timeout=30.0,
)
async def close(self):
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def host_lookup(self, ip: str) -> dict[str, Any]:
"""
Get detailed information about a host.
Args:
ip: IP address to look up
Returns:
Host details including services, location, ASN
"""
if not self.api_token:
return {"error": "Censys API token not configured"}
await self.start()
try:
resp = await self._client.get(f"/hosts/{ip}")
resp.raise_for_status()
data = resp.json()
host = data.get("result", {})
return {
"ip": ip,
"asn": host.get("autonomous_system", {}).get("asn"),
"asn_name": host.get("autonomous_system", {}).get("name"),
"country": host.get("location", {}).get("country"),
"city": host.get("location", {}).get("city"),
"services": [
{
"port": svc.get("port"),
"protocol": svc.get("transport_protocol"),
"service": svc.get("service_name"),
"software": svc.get("software", []),
"banner": svc.get("banner", "")[:500] if svc.get("banner") else None,
}
for svc in host.get("services", [])
],
"last_seen": host.get("last_updated_at"),
"labels": host.get("labels", []),
}
except httpx.HTTPStatusError as e:
return {"error": f"Censys API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def search(self, query: str, limit: int = 25) -> dict[str, Any]:
"""
Search Censys for hosts matching a query.
Args:
query: Censys query language search string
limit: Max results (up to 100)
Returns:
List of matching hosts
"""
if not self.api_token:
return {"error": "Censys API token not configured"}
await self.start()
try:
resp = await self._client.get(
"/hosts/search",
params={"q": query, "per_page": min(limit, 100)},
)
resp.raise_for_status()
data = resp.json()
hits = data.get("result", {}).get("hits", [])
return {
"query": query,
"total": data.get("result", {}).get("total", 0),
"hosts": [
{
"ip": h.get("ip"),
"asn": h.get("autonomous_system", {}).get("asn"),
"asn_name": h.get("autonomous_system", {}).get("name"),
"country": h.get("location", {}).get("country"),
"services": [
{"port": s.get("port"), "service": s.get("service_name")}
for s in h.get("services", [])
],
}
for h in hits
],
}
except httpx.HTTPStatusError as e:
return {"error": f"Censys API error: {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def cert_search(self, query: str, limit: int = 25) -> dict[str, Any]:
"""
Search for certificates.
Args:
query: Certificate search query
limit: Max results
Returns:
Matching certificates
"""
if not self.api_token:
return {"error": "Censys API token not configured"}
await self.start()
try:
# Search hosts with cert query
resp = await self._client.get(
"/hosts/search",
params={"q": query, "per_page": min(limit, 100)},
)
resp.raise_for_status()
data = resp.json()
return {
"query": query,
"total": data.get("result", {}).get("total", 0),
"results": data.get("result", {}).get("hits", []),
}
except Exception as e:
return {"error": str(e)}