es__parse_aeat_response
Parse AEAT XML responses from VERI*FACTU or SII into structured JSON, extracting status, verification code, and error details.
Instructions
Analiza y normaliza una respuesta XML de la AEAT (VERI*FACTU o SII) a JSON estructurado. Extrae EstadoEnvio, CSV (código seguro de verificación) y detalle de errores.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| xml | Yes | Respuesta XML de la AEAT en crudo. | |
| response_type | No | Tipo de respuesta a analizar (por defecto: 'verifactu'). | verifactu |
Implementation Reference
- AEATResponseType StrEnum — defines valid response types 'verifactu' and 'sii' used as input schema for the tool.
class AEATResponseType(StrEnum): """AEAT XML response type for es__parse_aeat_response.""" verifactu = "verifactu" sii = "sii" - TOOL_ES_PARSE_AEAT_RESPONSE — MCP Tool definition with name 'es__parse_aeat_response', description, and inputSchema (xml string required, response_type enum optional).
TOOL_ES_PARSE_AEAT_RESPONSE = types.Tool( name="es__parse_aeat_response", description=( "Analiza y normaliza una respuesta XML de la AEAT (VERI*FACTU o SII) a JSON estructurado. " "Extrae EstadoEnvio, CSV (código seguro de verificación) y detalle de errores." ), inputSchema={ "type": "object", "properties": { "xml": { "type": "string", "description": "Respuesta XML de la AEAT en crudo.", }, "response_type": { "type": "string", "enum": ["verifactu", "sii"], "description": "Tipo de respuesta a analizar (por defecto: 'verifactu').", "default": "verifactu", }, }, "required": ["xml"], }, ) - handle_es_parse_aeat_response — async handler that parses AEAT XML responses (VERI*FACTU or SII) using lxml, extracting EstadoEnvio, CSV, error details, and line-level responses into structured JSON.
async def handle_es_parse_aeat_response( arguments: dict[str, Any], ) -> list[types.TextContent]: """Parse an AEAT XML response (VERI*FACTU or SII) into structured JSON.""" try: xml_str = arguments.get("xml", "") if not xml_str: return err("xml is required", "MISSING_PARAM") response_type_str = arguments.get("response_type", "verifactu") try: response_type = AEATResponseType(response_type_str) except ValueError: return err( f"Invalid response_type: {response_type_str!r}. Must be 'verifactu' or 'sii'." ) xml_bytes = xml_str.encode() if isinstance(xml_str, str) else xml_str try: root = etree.fromstring(xml_bytes) except etree.XMLSyntaxError as exc: return err(f"XML inválido: {exc}", "XML_PARSE_ERROR") # Strip namespaces for easier querying def _text(elem: etree._Element | None) -> str | None: if elem is None: return None return (elem.text or "").strip() or None def _find(parent: etree._Element, *local_names: str) -> etree._Element | None: for name in local_names: results = parent.xpath(f".//*[local-name()='{name}']") found = results[0] if results else None if found is not None: return found return None result: dict[str, Any] = { "response_type": response_type.value, "raw_root_tag": root.tag, } if response_type == AEATResponseType.verifactu: # VERI*FACTU response fields (RespuestaRegFactuSistemaFacturacion) result["estado_envio"] = _text(_find(root, "EstadoEnvio")) result["csv"] = _text(_find(root, "CSV")) result["tiempo_espera_envio"] = _text(_find(root, "TiempoEsperaEnvio")) # Extract line-level errors / acknowledgements registros = root.xpath(".//*[local-name()='RespuestaLinea']") if registros: result["respuestas_linea"] = [] for reg in registros: linea: dict[str, Any] = { "estado_registro": _text(_find(reg, "EstadoRegistro")), "codigo_error_registro": _text(_find(reg, "CodigoErrorRegistro")), "descripcion_error_registro": _text( _find(reg, "DescripcionErrorRegistro") ), } # IDFactura within this line idf = _find(reg, "IDFactura") if idf is not None: linea["id_factura"] = { "emisor_nif": _text(_find(idf, "IDEmisorFactura")), "num_serie": _text(_find(idf, "NumSerieFactura")), "fecha": _text(_find(idf, "FechaExpedicionFactura")), } result["respuestas_linea"].append(linea) else: # SII response fields (RespuestaSuministroLR) result["estado_envio"] = _text(_find(root, "EstadoEnvio")) result["csv"] = _text(_find(root, "CSV")) result["tiempo_espera_envio"] = _text(_find(root, "TiempoEsperaEnvio")) result["num_total"] = _text(_find(root, "NumTotal")) result["num_correctos"] = _text(_find(root, "NumCorrectos")) result["num_errores"] = _text(_find(root, "NumErrores")) lineas = root.xpath(".//*[local-name()='RespuestaLinea']") if lineas: result["respuestas_linea"] = [] for linea_elem in lineas: linea = { "estado_registro": _text(_find(linea_elem, "EstadoRegistro")), "codigo_error": _text(_find(linea_elem, "CodigoErrorRegistro")), "descripcion_error": _text( _find(linea_elem, "DescripcionErrorRegistro") ), } result["respuestas_linea"].append(linea) # Derive overall success flag estado = result.get("estado_envio", "") result["success"] = estado == "Correcto" result["accepted_with_errors"] = estado == "AceptadoConErrores" return ok(result) except Exception as exc: logger.exception("es__parse_aeat_response failed") return err(str(exc)) - mcp_facturacion_electronica_es/server.py:102-106 (registration)TOOL_ES_PARSE_AEAT_RESPONSE registered in _ALL_TOOLS list for tool discovery.
# Utilities TOOL_ES_DETECT_REGIONAL_REGIME, TOOL_ES_GET_COMPLIANCE_STATUS, TOOL_ES_PARSE_AEAT_RESPONSE, ] - mcp_facturacion_electronica_es/server.py:134-137 (registration)'es__parse_aeat_response' mapped to handle_es_parse_aeat_response in _TOOL_HANDLERS dict.
"es__detect_regional_regime": handle_es_detect_regional_regime, "es__get_compliance_status": handle_es_get_compliance_status, "es__parse_aeat_response": handle_es_parse_aeat_response, }