"""
MCP Cybersecurity Server
Servidor MCP que expone herramientas y recursos para análisis de vulnerabilidades CVE,
integrado con la API de CIRCL Vulnerability Lookup.
Componentes:
- 3 Tools: search_cve_by_id, search_cves_by_keyword, summarize_cve_risk
- 2 Resources: playbook_high_risk.md, risk_matrix.json
- 1 Prompt: analyze_asset_vulnerabilities
"""
import asyncio
import logging
import os
import json
from pathlib import Path
from typing import Optional
from mcp.server import Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
Prompt,
PromptArgument,
PromptMessage,
GetPromptResult
)
import mcp.types as types
# Importar nuestras tools
from .tools.cve_search import search_cve_by_id
from .tools.cve_keyword import search_cves_by_keyword
from .tools.cve_risk import summarize_cve_risk
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Inicializar servidor MCP
server = Server("mcp-cybersecurity-server")
# Directorio base del proyecto
BASE_DIR = Path(__file__).parent.parent
RESOURCES_DIR = BASE_DIR / "resources"
# ============================================================================
# TOOLS REGISTRATION
# ============================================================================
@server.list_tools()
async def list_tools() -> list[Tool]:
"""
Lista todas las herramientas disponibles en el servidor.
"""
logger.info("Listing available tools")
return [
Tool(
name="search_cve_by_id",
description=(
"Busca información detallada de una vulnerabilidad CVE por su identificador. "
"Retorna descripción completa, puntuación CVSS, severidad, referencias, "
"fechas de publicación/modificación, y CWEs asociados."
),
inputSchema={
"type": "object",
"properties": {
"cve_id": {
"type": "string",
"description": "Identificador CVE (formato: CVE-YYYY-NNNN, ej. CVE-2024-1234)"
}
},
"required": ["cve_id"]
}
),
Tool(
name="search_cves_by_keyword",
description=(
"Busca vulnerabilidades CVE relacionadas con un producto, vendor o keyword. "
"Retorna una lista resumida de CVEs con IDs, severidad, CVSS y fechas. "
"Útil para descubrir vulnerabilidades de un producto específico."
),
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "Término de búsqueda, típicamente nombre de producto (ej. 'apache', 'wordpress', 'linux')"
},
"vendor": {
"type": "string",
"description": "Nombre del vendor (opcional, por defecto usa keyword)"
},
"limit": {
"type": "integer",
"description": "Número máximo de resultados (default: 10, máximo: 100)",
"minimum": 1,
"maximum": 100,
"default": 10
}
},
"required": ["keyword"]
}
),
Tool(
name="summarize_cve_risk",
description=(
"Genera un análisis de riesgo detallado de un CVE orientado a equipos SOC/Blue Team. "
"Incluye probabilidad de explotación (EPSS), impacto potencial, nivel de urgencia, "
"SLA de respuesta recomendado, y recomendaciones de mitigación específicas. "
"Considera el contexto del activo afectado para personalizar el análisis."
),
inputSchema={
"type": "object",
"properties": {
"cve_id": {
"type": "string",
"description": "Identificador CVE (formato: CVE-YYYY-NNNN)"
},
"context": {
"type": "string",
"description": "Contexto del activo afectado (ej. 'servidor web público', 'base de datos interna', 'API de producción')",
"default": "sistema genérico"
}
},
"required": ["cve_id"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""
Ejecuta una herramienta específica.
"""
logger.info(f"Calling tool: {name} with arguments: {arguments}")
try:
if name == "search_cve_by_id":
cve_id = arguments.get("cve_id", "").strip()
result = await search_cve_by_id(cve_id)
return [TextContent(type="text", text=result)]
elif name == "search_cves_by_keyword":
keyword = arguments.get("keyword", "").strip()
vendor = arguments.get("vendor", None)
limit = arguments.get("limit", 10)
result = await search_cves_by_keyword(keyword, vendor, limit)
return [TextContent(type="text", text=result)]
elif name == "summarize_cve_risk":
cve_id = arguments.get("cve_id", "").strip()
context = arguments.get("context", None)
result = await summarize_cve_risk(cve_id, context)
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
error_message = f"❌ Error ejecutando {name}: {str(e)}"
return [TextContent(type="text", text=error_message)]
# ============================================================================
# RESOURCES REGISTRATION
# ============================================================================
@server.list_resources()
async def list_resources() -> list[Resource]:
"""
Lista todos los recursos disponibles en el servidor.
"""
logger.info("Listing available resources")
return [
Resource(
uri="playbook://high_risk",
name="Playbook de Vulnerabilidades Críticas",
mimeType="text/markdown",
description=(
"Procedimiento operativo estándar para gestión de vulnerabilidades críticas (CVSS >= 9.0). "
"Incluye 4 fases: Detección y Validación, Mitigación Temporal, Parcheo y Remediación, "
"y Cierre. Define SLA de 4 horas, acciones específicas por fase, criterios de escalación "
"y contactos clave."
)
),
Resource(
uri="matrix://risk",
name="Matriz de Riesgos de Vulnerabilidades",
mimeType="application/json",
description=(
"Matriz de priorización de riesgos basada en CVSS, EPSS y contexto del activo. "
"Define 5 niveles de riesgo (CRITICAL, HIGH, MEDIUM, LOW, INFORMATIONAL) con "
"criterios específicos, SLAs, acciones recomendadas y procedimientos de escalación. "
"Incluye definiciones de criticidad de activos, sensibilidad de datos, y árboles de decisión."
)
)
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""
Lee el contenido de un recurso específico.
"""
logger.info(f"Reading resource: {uri}")
try:
if uri == "playbook://high_risk":
playbook_path = RESOURCES_DIR / "playbook_high_risk.md"
if not playbook_path.exists():
raise FileNotFoundError(f"Playbook not found at {playbook_path}")
with open(playbook_path, "r", encoding="utf-8") as f:
content = f.read()
logger.info("Playbook loaded successfully")
return content
elif uri == "matrix://risk":
matrix_path = RESOURCES_DIR / "risk_matrix.json"
if not matrix_path.exists():
raise FileNotFoundError(f"Risk matrix not found at {matrix_path}")
with open(matrix_path, "r", encoding="utf-8") as f:
content = f.read()
logger.info("Risk matrix loaded successfully")
return content
else:
raise ValueError(f"Unknown resource URI: {uri}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
raise
# ============================================================================
# PROMPTS REGISTRATION
# ============================================================================
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""
Lista todos los prompts preconfigurados disponibles.
"""
logger.info("Listing available prompts")
return [
Prompt(
name="analyze_asset_vulnerabilities",
description=(
"Análisis completo de vulnerabilidades de un activo específico. "
"Este prompt coordina múltiples tools y resources para generar un reporte "
"ejecutivo que incluye: resumen de riesgo global, tabla priorizada de CVEs, "
"timeline de remediación basado en SLAs, y recomendaciones específicas del playbook. "
"Ideal para análisis periódicos de seguridad o respuesta a incidentes."
),
arguments=[
PromptArgument(
name="asset_name",
description="Nombre identificativo del activo (ej. 'Servidor Web de Facturación', 'API de Pagos')",
required=True
),
PromptArgument(
name="asset_type",
description="Tipo de activo: web_server, database, api, network_device, application, etc.",
required=True
),
PromptArgument(
name="cve_list",
description="Lista de CVE IDs separados por coma (ej. 'CVE-2024-1234,CVE-2024-5678')",
required=True
),
PromptArgument(
name="internet_facing",
description="¿El activo es accesible desde Internet? Valores: 'true' o 'false' (default: 'false')",
required=False
),
PromptArgument(
name="criticality",
description="Criticidad del activo: 'high', 'medium', o 'low' (default: 'medium')",
required=False
)
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict) -> GetPromptResult:
"""
Obtiene un prompt preconfigurado con sus instrucciones.
"""
logger.info(f"Getting prompt: {name} with arguments: {arguments}")
if name == "analyze_asset_vulnerabilities":
# Extraer argumentos
asset_name = arguments.get("asset_name", "Activo Desconocido")
asset_type = arguments.get("asset_type", "desconocido")
cve_list_str = arguments.get("cve_list", "")
internet_facing_str = arguments.get("internet_facing", "false")
criticality = arguments.get("criticality", "medium")
# Parsear lista de CVEs
cve_list = [cve.strip() for cve in cve_list_str.split(",") if cve.strip()]
# Parsear internet_facing
internet_facing = internet_facing_str.lower() in ["true", "yes", "1", "sí"]
# Construir contexto del activo
if internet_facing:
asset_context = f"{asset_name} (tipo: {asset_type}, expuesto a Internet, criticidad: {criticality})"
else:
asset_context = f"{asset_name} (tipo: {asset_type}, interno, criticidad: {criticality})"
# Construir prompt detallado
prompt_text = f"""# Análisis de Vulnerabilidades: {asset_name}
Realiza un análisis completo de seguridad del activo "{asset_name}".
## Contexto del Activo
- **Nombre**: {asset_name}
- **Tipo**: {asset_type}
- **Exposición**: {"🌐 Internet-facing (PÚBLICO)" if internet_facing else "🔒 Interno (RED PRIVADA)"}
- **Criticidad**: {criticality.upper()}
## CVEs a Analizar
{", ".join(cve_list)}
---
## Instrucciones Detalladas
### Paso 1: Recopilación de Datos (Para CADA CVE)
Para cada CVE en la lista:
1. **Usar tool `search_cve_by_id`** con el CVE ID:
- Obtener descripción completa
- CVSS score y severidad
- Fechas de publicación/modificación
- CWEs asociados
2. **Usar tool `summarize_cve_risk`** con:
- CVE ID
- Contexto: "{asset_context}"
Esto proporcionará:
- Análisis de riesgo orientado a SOC
- EPSS score (probabilidad de explotación)
- Prioridad (P1-P4)
- SLA de respuesta
- Recomendaciones específicas
### Paso 2: Consultar Matriz de Riesgos
**Leer el resource `matrix://risk`** para obtener:
- Criterios de clasificación de riesgo
- SLAs por nivel de prioridad
- Acciones recomendadas por nivel
- Criterios de escalación
- Definiciones de criticidad de activos
Usar esta información para:
- Validar la clasificación automática de cada CVE
- Ajustar prioridad si el contexto del activo lo requiere (ej. internet-facing + high criticality puede elevar prioridad)
- Determinar procedimientos de escalación necesarios
### Paso 3: Generar Reporte Ejecutivo
Crea un reporte estructurado en markdown con las siguientes secciones:
#### 3.1 Resumen Ejecutivo
- Estado general del activo (CRÍTICO / ALTO RIESGO / MODERADO / BAJO)
- Número total de vulnerabilidades por severidad
- Tabla resumen:
| Severidad | Cantidad | SLA Máximo |
|-----------|----------|-----------|
| 🔴 Crítica | X | 4 horas |
| 🟠 Alta | X | 48 horas |
| 🟡 Media | X | 1 semana |
| 🟢 Baja | X | 1 mes |
#### 3.2 Análisis Detallado de Vulnerabilidades
Para cada CVE, en orden de prioridad (P1 primero):
**CVE-XXXX-YYYY [Prioridad]**
- CVSS Score: X.X (Severidad)
- EPSS Score: X.XX% (interpretación)
- Resumen: [breve descripción]
- Impacto en este activo: [específico al contexto]
- SLA: [tiempo máximo de respuesta]
#### 3.3 Timeline de Remediación
Basándote en los SLAs de cada CVE, crea un timeline:
**Acciones Inmediatas (0-4 horas)** - SI HAY CVEs P1:
- Lista de CVEs críticos
- Acciones de mitigación temporal requeridas
**Acciones a Corto Plazo (4-48 horas)** - SI HAY CVEs P2:
- Lista de CVEs alta prioridad
- Plan de parcheo
**Backlog de Parcheo (1-4 semanas)**:
- CVEs P3 y P4
#### 3.4 Recomendaciones Específicas
**Si hay al menos un CVE P1 (CRÍTICO):**
- **Leer el resource `playbook://high_risk`**
- Resumir las 4 fases principales del playbook
- Indicar acciones críticas de las primeras 2 horas
- Listar contactos de escalación necesarios
**Para todos los casos:**
- Controles compensatorios recomendados
- Consideraciones de monitorización
- Puntos de validación post-remediación
#### 3.5 Escalación
Basándote en la matriz de riesgos:
- Si hay CVEs P1: Escalación a CISO + CTO inmediata
- Si hay CVEs P2: Escalación a Security Team Lead
- Frecuencia de comunicación recomendada
---
## Formato de Salida
- Usa markdown profesional con headers claros
- Incluye emojis para severidad (🔴🟠🟡🟢) para fácil lectura visual
- Tablas para datos tabulares
- Listas con checkboxes [ ] para acciones
- Secciones colapsables si el reporte es muy largo
## Consideraciones Importantes
1. **Contexto es clave**: El hecho de que el activo sea {internet_facing and "internet-facing" or "interno"} y de criticidad {criticality} debe influir en el análisis
2. **SLAs son MÁXIMOS**: Recomendar actuar antes si es posible
3. **Priorización**: Ordenar siempre por prioridad (P1 > P2 > P3 > P4), no por CVSS únicamente
4. **Accionable**: El reporte debe ser práctico y accionable para un SOC analyst
5. **Completo pero conciso**: Balancear detalle técnico con claridad ejecutiva
---
**Genera ahora el reporte completo siguiendo estas instrucciones.**
"""
return GetPromptResult(
description=f"Análisis de vulnerabilidades para {asset_name}",
messages=[
PromptMessage(
role="user",
content=TextContent(type="text", text=prompt_text)
)
]
)
else:
raise ValueError(f"Unknown prompt: {name}")
# ============================================================================
# MAIN - Entry Point
# ============================================================================
async def main():
"""
Punto de entrada principal del servidor MCP.
"""
logger.info("Starting MCP Cybersecurity Server")
logger.info(f"Resources directory: {RESOURCES_DIR}")
# Verificar que los recursos existen
playbook_path = RESOURCES_DIR / "playbook_high_risk.md"
matrix_path = RESOURCES_DIR / "risk_matrix.json"
if not playbook_path.exists():
logger.warning(f"Playbook not found at {playbook_path}")
else:
logger.info(f"Playbook loaded from {playbook_path}")
if not matrix_path.exists():
logger.warning(f"Risk matrix not found at {matrix_path}")
else:
logger.info(f"Risk matrix loaded from {matrix_path}")
# Importar stdio transport
from mcp.server.stdio import stdio_server
# Ejecutar servidor
async with stdio_server() as (read_stream, write_stream):
logger.info("Server running on stdio transport")
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
logger.info("=" * 80)
logger.info("MCP CYBERSECURITY SERVER")
logger.info("Version: 1.0.0")
logger.info("Author: José María Teba")
logger.info("=" * 80)
asyncio.run(main())