"""
Cliente HTTP para la API de CIRCL Vulnerability Lookup.
Este módulo proporciona una interfaz para interactuar con la API de CIRCL,
que ofrece información sobre vulnerabilidades (CVEs), puntuaciones EPSS,
y otros datos de seguridad.
API Documentation: https://vulnerability.circl.lu/api/
"""
import os
import logging
from typing import Optional, Dict, List, Any
import httpx
from pydantic import BaseModel, Field
# Configuración de logging
logger = logging.getLogger(__name__)
class CVEData(BaseModel):
"""Modelo para datos de CVE"""
id: str
summary: Optional[str] = None
cvss: Optional[float] = None
cvss_vector: Optional[str] = Field(None, alias="cvss-vector")
Published: Optional[str] = None
Modified: Optional[str] = None
references: Optional[List[str]] = None
vulnerable_configuration: Optional[List[str]] = None
cwe: Optional[List[str]] = None
class Config:
populate_by_name = True
class EPSSData(BaseModel):
"""Modelo para datos EPSS (Exploit Prediction Scoring System)"""
cve: str
epss: Optional[float] = None
percentile: Optional[float] = None
date: Optional[str] = None
class CIRCLClient:
"""Cliente para la API de CIRCL Vulnerability Lookup"""
def __init__(self, base_url: Optional[str] = None, timeout: int = 30):
"""
Inicializa el cliente CIRCL.
Args:
base_url: URL base de la API (por defecto usa variable de entorno)
timeout: Timeout en segundos para las peticiones HTTP
"""
self.base_url = base_url or os.getenv(
"CIRCL_API_BASE_URL",
"https://vulnerability.circl.lu/api"
)
self.timeout = timeout
self.client = httpx.AsyncClient(
base_url=self.base_url,
timeout=self.timeout,
follow_redirects=True
)
logger.info(f"CIRCL Client initialized with base URL: {self.base_url}")
async def close(self):
"""Cierra el cliente HTTP"""
await self.client.aclose()
async def get_vulnerability(self, cve_id: str) -> Dict[str, Any]:
"""
Obtiene información detallada de una vulnerabilidad por su ID.
Args:
cve_id: Identificador CVE (ej. "CVE-2024-1234")
Returns:
Diccionario con los datos de la vulnerabilidad
Raises:
httpx.HTTPStatusError: Si la petición falla
ValueError: Si el CVE ID es inválido
"""
if not cve_id or not cve_id.startswith("CVE-"):
raise ValueError(f"CVE ID inválido: {cve_id}")
logger.info(f"Fetching vulnerability data for {cve_id}")
try:
response = await self.client.get(f"/vulnerability/{cve_id}")
response.raise_for_status()
data = response.json()
logger.info(f"Successfully fetched data for {cve_id}")
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"CVE not found: {cve_id}")
raise ValueError(f"CVE {cve_id} no encontrado en la base de datos de CIRCL")
else:
logger.error(f"HTTP error fetching {cve_id}: {e}")
raise
except httpx.TimeoutException:
logger.error(f"Timeout fetching {cve_id}")
raise TimeoutError(f"Timeout al consultar {cve_id}")
except Exception as e:
logger.error(f"Unexpected error fetching {cve_id}: {e}")
raise
async def search_by_vendor_product(
self,
vendor: str,
product: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Busca vulnerabilidades por vendor y producto.
Args:
vendor: Nombre del vendor (ej. "apache")
product: Nombre del producto (ej. "http_server")
limit: Número máximo de resultados
Returns:
Lista de vulnerabilidades encontradas
"""
logger.info(f"Searching vulnerabilities for {vendor}/{product}")
try:
response = await self.client.get(
f"/vulnerability/search/{vendor}/{product}"
)
response.raise_for_status()
data = response.json()
# La API devuelve un diccionario con CVE IDs como claves
results = []
for cve_id, cve_data in list(data.items())[:limit]:
cve_data["id"] = cve_id
results.append(cve_data)
logger.info(f"Found {len(results)} vulnerabilities for {vendor}/{product}")
return results
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error searching {vendor}/{product}: {e}")
if e.response.status_code == 404:
return []
raise
except Exception as e:
logger.error(f"Error searching {vendor}/{product}: {e}")
raise
async def get_epss(self, cve_id: str) -> Optional[Dict[str, Any]]:
"""
Obtiene el EPSS score de una vulnerabilidad.
EPSS (Exploit Prediction Scoring System) predice la probabilidad
de que una vulnerabilidad sea explotada en los próximos 30 días.
Args:
cve_id: Identificador CVE
Returns:
Diccionario con datos EPSS o None si no está disponible
"""
logger.info(f"Fetching EPSS score for {cve_id}")
try:
response = await self.client.get(f"/epss/{cve_id}")
response.raise_for_status()
data = response.json()
logger.info(f"Successfully fetched EPSS for {cve_id}")
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"EPSS score not available for {cve_id}")
return None
logger.error(f"HTTP error fetching EPSS for {cve_id}: {e}")
raise
except Exception as e:
logger.error(f"Error fetching EPSS for {cve_id}: {e}")
return None
async def get_recent_vulnerabilities(
self,
source: str = "cve",
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Obtiene las vulnerabilidades más recientes de una fuente.
Args:
source: Fuente de datos (cve, ghsa, pysec, etc.)
limit: Número de vulnerabilidades a retornar
Returns:
Lista de vulnerabilidades recientes
"""
logger.info(f"Fetching {limit} recent vulnerabilities from {source}")
try:
response = await self.client.get(
f"/vulnerability/last/{source}/{limit}"
)
response.raise_for_status()
data = response.json()
logger.info(f"Fetched {len(data)} recent vulnerabilities")
return data
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching recent vulnerabilities: {e}")
raise
except Exception as e:
logger.error(f"Error fetching recent vulnerabilities: {e}")
raise
# Instancia singleton del cliente (opcional)
_client_instance: Optional[CIRCLClient] = None
def get_client() -> CIRCLClient:
"""
Obtiene la instancia singleton del cliente CIRCL.
Returns:
Instancia de CIRCLClient
"""
global _client_instance
if _client_instance is None:
_client_instance = CIRCLClient()
return _client_instance