"""
Utilidades para formatear respuestas de la API CIRCL en formatos legibles.
Este módulo convierte los datos crudos de la API en formatos markdown
optimizados para su consumo por LLMs y usuarios finales.
"""
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime
def _extract_cve_data(cve_data: Dict[str, Any]) -> Tuple[str, str, float, str]:
"""
Extrae datos básicos del CVE independientemente del formato.
Returns:
Tuple de (cve_id, summary, cvss_score, severity)
"""
is_cve_5_format = "cveMetadata" in cve_data
if is_cve_5_format:
cve_id = cve_data.get("cveMetadata", {}).get("cveId", "N/A")
descriptions = cve_data.get("containers", {}).get("cna", {}).get("descriptions", [])
summary = descriptions[0].get("value", "No disponible") if descriptions else "No disponible"
cvss_score = 0.0
severity = "N/A"
metrics = cve_data.get("containers", {}).get("cna", {}).get("metrics", [])
for metric in metrics:
if "cvssV3_1" in metric or "cvssV3_0" in metric:
cvss_key = "cvssV3_1" if "cvssV3_1" in metric else "cvssV3_0"
cvss_data = metric[cvss_key]
cvss_score = cvss_data.get("baseScore", 0.0)
severity = cvss_data.get("baseSeverity", "N/A")
break
else:
cve_id = cve_data.get("id", "N/A")
summary = cve_data.get("summary", "No disponible")
cvss_score = cve_data.get("cvss", 0.0) if cve_data.get("cvss") else 0.0
# Determinar severidad
if isinstance(cvss_score, (int, float)):
if cvss_score >= 9.0:
severity = "CRITICAL"
elif cvss_score >= 7.0:
severity = "HIGH"
elif cvss_score >= 4.0:
severity = "MEDIUM"
else:
severity = "LOW"
else:
severity = "N/A"
return cve_id, summary, cvss_score, severity
def format_cve_details(cve_data: Dict[str, Any]) -> str:
"""
Formatea los detalles completos de un CVE en markdown.
Args:
cve_data: Datos del CVE de la API CIRCL (soporta formato CVE 5.0)
Returns:
String formateado en markdown
"""
# Detectar formato CVE 5.0 vs formato antiguo
is_cve_5_format = "cveMetadata" in cve_data
if is_cve_5_format:
# Formato CVE 5.0
cve_id = cve_data.get("cveMetadata", {}).get("cveId", "N/A")
# Descripción
descriptions = cve_data.get("containers", {}).get("cna", {}).get("descriptions", [])
summary = descriptions[0].get("value", "No disponible") if descriptions else "No disponible"
# CVSS Score y Severidad
cvss_score = "N/A"
severity = "N/A"
vector_string = "N/A"
metrics = cve_data.get("containers", {}).get("cna", {}).get("metrics", [])
for metric in metrics:
if "cvssV3_1" in metric:
cvss_data = metric["cvssV3_1"]
cvss_score = cvss_data.get("baseScore", "N/A")
severity = cvss_data.get("baseSeverity", "N/A")
vector_string = cvss_data.get("vectorString", "N/A")
break
elif "cvssV3_0" in metric:
cvss_data = metric["cvssV3_0"]
cvss_score = cvss_data.get("baseScore", "N/A")
severity = cvss_data.get("baseSeverity", "N/A")
vector_string = cvss_data.get("vectorString", "N/A")
break
# Fechas
published = cve_data.get("cveMetadata", {}).get("datePublished", "N/A")
modified = cve_data.get("cveMetadata", {}).get("dateUpdated", "N/A")
# Referencias
references = cve_data.get("containers", {}).get("cna", {}).get("references", [])
ref_list = "\n".join([f"- [{ref.get('name', ref.get('url', 'Link'))}]({ref.get('url', '#')})"
for ref in references[:10]]) if references else "- No disponible"
# CWEs
problem_types = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
cwes = []
for pt in problem_types:
for desc in pt.get("descriptions", []):
if "cweId" in desc:
cwes.append(desc["cweId"])
cwe_list = ", ".join(cwes) if cwes else "No disponible"
# Productos afectados
affected = cve_data.get("containers", {}).get("cna", {}).get("affected", [])
config_count = len(affected)
else:
# Formato antiguo
cve_id = cve_data.get("id", "N/A")
summary = cve_data.get("summary", "No disponible")
# CVSS Score y Severidad
cvss_score = "N/A"
severity = "N/A"
vector_string = "N/A"
if "cvss" in cve_data and cve_data["cvss"]:
cvss_score = cve_data["cvss"]
# Determinar severidad basada en CVSS v3
if isinstance(cvss_score, (int, float)):
if cvss_score >= 9.0:
severity = "CRITICAL"
elif cvss_score >= 7.0:
severity = "HIGH"
elif cvss_score >= 4.0:
severity = "MEDIUM"
else:
severity = "LOW"
# Fechas
published = cve_data.get("Published", "N/A")
modified = cve_data.get("Modified", "N/A")
# Referencias
references = cve_data.get("references", [])
ref_list = "\n".join([f"- {ref}" for ref in references[:5]]) if references else "- No disponible"
# CWEs
cwes = cve_data.get("cwe", [])
cwe_list = ", ".join(cwes) if cwes else "No disponible"
# Configuraciones vulnerables
vuln_configs = cve_data.get("vulnerable_configuration", [])
config_count = len(vuln_configs) if vuln_configs else 0
# Template de salida
cvss_section = f"- **CVSS Score**: {cvss_score}\n- **Severidad**: {severity}"
if is_cve_5_format and vector_string != "N/A":
cvss_section += f"\n- **Vector CVSS**: `{vector_string}`"
formatted = f"""# {cve_id}
## Información General
{cvss_section}
- **Publicado**: {published}
- **Última modificación**: {modified}
## Descripción
{summary}
## Debilidades Comunes (CWE)
{cwe_list}
## Referencias
{ref_list}
## Productos/Configuraciones Afectadas
- Total de componentes afectados: {config_count}
---
*Datos obtenidos de CIRCL Vulnerability Lookup API*
"""
return formatted
def format_cve_list(cves: List[Dict[str, Any]], title: str = "Vulnerabilidades Encontradas") -> str:
"""
Formatea una lista de CVEs como tabla markdown.
Args:
cves: Lista de CVEs
title: Título de la lista
Returns:
String formateado en markdown
"""
if not cves:
return f"## {title}\n\nNo se encontraron vulnerabilidades."
# Encabezado de la tabla
table = f"""## {title}
Total de resultados: {len(cves)}
| CVE ID | Severidad | CVSS | Fecha | Resumen |
|--------|-----------|------|-------|---------|
"""
# Filas de la tabla
for cve in cves:
cve_id = cve.get("id", "N/A")
summary = cve.get("summary", "")
# Truncar resumen a 100 caracteres
if len(summary) > 100:
summary = summary[:97] + "..."
# CVSS
cvss_score = cve.get("cvss", "N/A")
severity = "N/A"
if isinstance(cvss_score, (int, float)):
if cvss_score >= 9.0:
severity = "🔴 CRITICAL"
elif cvss_score >= 7.0:
severity = "🟠 HIGH"
elif cvss_score >= 4.0:
severity = "🟡 MEDIUM"
else:
severity = "🟢 LOW"
# Fecha
published = cve.get("Published", "N/A")
if published != "N/A":
try:
# Intentar formatear la fecha
date_obj = datetime.fromisoformat(published.replace("Z", "+00:00"))
published = date_obj.strftime("%Y-%m-%d")
except:
pass
table += f"| {cve_id} | {severity} | {cvss_score} | {published} | {summary} |\n"
return table
def format_risk_summary(
cve_data: Dict[str, Any],
epss_data: Optional[Dict[str, Any]],
context: str = "sistema genérico"
) -> str:
"""
Genera un resumen de riesgo orientado a equipos SOC/Blue Team.
Args:
cve_data: Datos del CVE (soporta formato CVE 5.0)
epss_data: Datos EPSS (puede ser None)
context: Contexto del activo afectado
Returns:
String formateado en markdown con análisis de riesgo
"""
# Usar la función auxiliar para extraer datos
cve_id, summary, cvss_score, severity = _extract_cve_data(cve_data)
# Determinar nivel de urgencia
if isinstance(cvss_score, (int, float)):
if cvss_score >= 9.0:
urgency = "🔴 **CRÍTICA - ACCIÓN INMEDIATA REQUERIDA**"
priority = "P1"
sla = "4 horas"
elif cvss_score >= 7.0:
urgency = "🟠 **ALTA - Acción requerida en 48h**"
priority = "P2"
sla = "48 horas"
elif cvss_score >= 4.0:
urgency = "🟡 **MEDIA - Incluir en próximo ciclo de parcheo**"
priority = "P3"
sla = "1 semana"
else:
urgency = "🟢 **BAJA - Monitorear**"
priority = "P4"
sla = "1 mes"
else:
urgency = "⚪ **DESCONOCIDA**"
priority = "P3"
sla = "Evaluar"
# EPSS Score
epss_section = ""
if epss_data and "epss" in epss_data:
epss_score = epss_data["epss"]
percentile = epss_data.get("percentile", "N/A")
epss_interpretation = ""
if isinstance(epss_score, (int, float)):
if epss_score >= 0.5:
epss_interpretation = "**MUY ALTA** - Exploit en la wild es muy probable"
elif epss_score >= 0.3:
epss_interpretation = "**ALTA** - Exploit probable en próximos 30 días"
elif epss_score >= 0.1:
epss_interpretation = "**MODERADA** - Posibilidad de exploit existe"
else:
epss_interpretation = "**BAJA** - Exploit improbable a corto plazo"
epss_section = f"""
## Probabilidad de Explotación (EPSS)
- **EPSS Score**: {epss_score:.4f} ({epss_score*100:.2f}%)
- **Percentil**: {percentile}
- **Interpretación**: {epss_interpretation}
"""
else:
epss_section = """
## Probabilidad de Explotación (EPSS)
- **EPSS Score**: No disponible
- Sin datos EPSS para esta vulnerabilidad
"""
# CWEs para contexto adicional
cwes = cve_data.get("cwe", [])
cwe_context = ""
if cwes:
cwe_context = f"\n**Categorías de debilidad**: {', '.join(cwes)}"
# Recomendaciones basadas en urgencia
if priority == "P1":
recommendations = """
### Acciones Inmediatas (0-4 horas)
1. ✅ **Verificar exposición**: Identificar todos los activos afectados
2. ✅ **Mitigación temporal**: Implementar controles compensatorios (WAF, IDS, ACLs)
3. ✅ **Notificar escalación**: CISO + CTO deben ser informados
4. ✅ **Planificar ventana de emergencia**: Preparar parcheo inmediato
### Seguimiento (4-24 horas)
1. Aplicar parche en entorno de staging
2. Validar compatibilidad
3. Despliegue en producción con plan de rollback
4. Validación post-parcheo completa
"""
elif priority == "P2":
recommendations = """
### Acciones Recomendadas (0-48 horas)
1. ✅ **Evaluar exposición**: Inventariar activos afectados
2. ✅ **Análisis de riesgo**: Determinar si exploit público está disponible
3. ✅ **Planificar parcheo**: Incluir en próxima ventana de mantenimiento
4. ✅ **Monitorización**: Configurar alertas específicas
### Seguimiento
1. Testing en pre-producción
2. Despliegue coordinado en producción
3. Documentar en sistema de gestión de vulnerabilidades
"""
else:
recommendations = """
### Acciones Recomendadas
1. ✅ **Documentar**: Registrar en backlog de seguridad
2. ✅ **Planificar**: Incluir en próximo ciclo de parcheo mensual
3. ✅ **Evaluar**: Revisar si activos críticos están afectados
### Seguimiento
1. Incluir en próxima ventana de mantenimiento regular
2. Validar que la vulnerabilidad no escala en prioridad
"""
formatted = f"""# Análisis de Riesgo: {cve_id}
## Resumen Ejecutivo
{urgency}
**Contexto**: {context}
**Prioridad**: {priority}
**SLA de Respuesta**: {sla}
## Descripción de la Vulnerabilidad
{summary}{cwe_context}
## Puntuación de Impacto
- **CVSS Score**: {cvss_score} ({urgency.split()[1] if '**' in urgency else 'N/A'})
{epss_section}
## Evaluación de Riesgo Global
Considerando el contexto de "{context}", esta vulnerabilidad representa un riesgo **{priority}**.
{recommendations}
---
*Análisis generado por MCP Cybersecurity Server*
*Datos: CIRCL Vulnerability Lookup*
"""
return formatted
def format_asset_vulnerability_report(
asset_name: str,
asset_type: str,
cve_analyses: List[Dict[str, Any]],
risk_matrix: Dict[str, Any]
) -> str:
"""
Genera un reporte completo de vulnerabilidades para un activo.
Args:
asset_name: Nombre del activo
asset_type: Tipo de activo
cve_analyses: Lista de análisis de CVEs
risk_matrix: Matriz de riesgos
Returns:
Reporte formateado en markdown
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Contar vulnerabilidades por severidad
critical_count = sum(1 for cve in cve_analyses if cve.get("priority") == "P1")
high_count = sum(1 for cve in cve_analyses if cve.get("priority") == "P2")
medium_count = sum(1 for cve in cve_analyses if cve.get("priority") == "P3")
low_count = sum(1 for cve in cve_analyses if cve.get("priority") == "P4")
report = f"""# Reporte de Vulnerabilidades: {asset_name}
**Tipo de Activo**: {asset_type}
**Fecha de Análisis**: {timestamp}
**Total de Vulnerabilidades**: {len(cve_analyses)}
## Resumen Ejecutivo
| Severidad | Cantidad | SLA |
|-----------|----------|-----|
| 🔴 Crítica | {critical_count} | 4 horas |
| 🟠 Alta | {high_count} | 48 horas |
| 🟡 Media | {medium_count} | 1 semana |
| 🟢 Baja | {low_count} | 1 mes |
## Estado General del Activo
"""
# Determinar estado general
if critical_count > 0:
report += "**🔴 CRÍTICO** - Requiere atención inmediata\n\n"
elif high_count > 0:
report += "**🟠 ALTO RIESGO** - Acción requerida en 48h\n\n"
elif medium_count > 0:
report += "**🟡 RIESGO MODERADO** - Incluir en próximo ciclo de parcheo\n\n"
else:
report += "**🟢 BAJO RIESGO** - Monitoreo continuo\n\n"
# Detalles de vulnerabilidades priorizadas
report += "## Vulnerabilidades Priorizadas\n\n"
for cve in sorted(cve_analyses, key=lambda x: x.get("priority", "P4")):
cve_id = cve.get("cve_id", "N/A")
priority = cve.get("priority", "P4")
cvss = cve.get("cvss_score", "N/A")
summary = cve.get("summary", "")[:100] + "..."
report += f"### {cve_id} [{priority}]\n"
report += f"- **CVSS**: {cvss}\n"
report += f"- **Resumen**: {summary}\n\n"
return report