convert_edifact_to_bo4e
Convert EDIFACT messages to BO4E format for German energy market data exchange. This tool transforms standardized business documents between industry communication protocols.
Instructions
Convert an EDIFACT message to its BO4E equivalent
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| edifact | Yes | ||
| edifact_format_version | No |
Implementation Reference
- src/transformerbeemcp/server.py:75-102 (handler)The handler function decorated with @mcp.tool that implements the convert_edifact_to_bo4e tool. It uses the TransformerBeeClient from the context to perform the conversion and handles errors, supporting only single Marktnachricht and transaction.@mcp.tool(description="Convert an EDIFACT message to its BO4E equivalent") async def convert_edifact_to_bo4e( ctx: Context, # type:ignore[type-arg] # no idea what the second type arg is edifact: str, edifact_format_version: EdifactFormatVersion | None = None, ) -> dict[str, Any]: """Tool that uses initialized resources""" _logger.debug("Context: %s", str(ctx.request_context.lifespan_context)) client: TransformerBeeClient = ctx.request_context.lifespan_context.transformerbeeclient if not edifact_format_version: edifact_format_version = get_current_edifact_format_version() try: marktnachrichten = await client.convert_to_bo4e(edifact=edifact, edifact_format_version=edifact_format_version) except ClientResponseError as cre: _logger.warning("transformer.bee rejected the request %s: %s", cre.request_info, cre.message) _logger.exception(cre) raise except Exception: _logger.exception("Error while converting EDIFACT to BO4E") raise if len(marktnachrichten) > 1: raise NotImplementedError(f"More than 1 Marktnachricht (got {len(marktnachrichten)}) not support yet") marktnachricht = marktnachrichten[0] await ctx.info(f"Successfully converted Marktnachricht with UNH {marktnachricht.unh} to BO4E") if len(marktnachricht.transaktionen) > 1: raise NotImplementedError(f"More than 1 transaction (got {len(marktnachricht.transaktionen)}) not support yet") transaktion = marktnachricht.transaktionen[0] return transaktion.model_dump(mode="json")