Skip to main content
Glama
converter.py11.9 kB
"""Pandoc conversion service.""" import asyncio import base64 import builtins import tempfile from dataclasses import dataclass, field from pathlib import Path from typing import Any from src.config import get_settings from src.core.exceptions import ConversionError, FormatNotSupportedError, TimeoutError from src.core.formats import ( FormatList, FormatManager, get_content_type, get_format_manager, is_binary_format, ) @dataclass class ConversionOptions: """Options for document conversion.""" standalone: bool = True table_of_contents: bool = False number_sections: bool = False wrap: str = "auto" # auto, none, preserve columns: int = 80 extra_args: list[str] = field(default_factory=list) @dataclass class ConversionResult: """Result of a conversion operation.""" content: bytes content_type: str is_binary: bool from_format: str to_format: str def to_dict(self) -> dict[str, Any]: """Convert to dictionary for API responses.""" if self.is_binary: content_value = base64.b64encode(self.content).decode("utf-8") else: content_value = self.content.decode("utf-8") return { "content": content_value, "content_type": self.content_type, "is_binary": self.is_binary, "from_format": self.from_format, "to_format": self.to_format, } class ConversionService: """Service for converting documents using Pandoc.""" def __init__(self, format_manager: FormatManager | None = None) -> None: self.format_manager = format_manager or get_format_manager() self.settings = get_settings() async def convert_text( self, content: str, from_format: str, to_format: str, options: ConversionOptions | None = None, ) -> ConversionResult: """Convert text content from one format to another. Args: content: The text content to convert from_format: Source format (e.g., 'markdown') to_format: Target format (e.g., 'html') options: Optional conversion options Returns: ConversionResult with the converted content """ options = options or ConversionOptions() # Validate formats await self._validate_formats(from_format, to_format) # Build pandoc command cmd = self._build_command(from_format, to_format, options) # Execute conversion output = await self._execute_pandoc( cmd, input_data=content.encode("utf-8"), from_format=from_format, to_format=to_format, ) return ConversionResult( content=output, content_type=get_content_type(to_format), is_binary=is_binary_format(to_format), from_format=from_format, to_format=to_format, ) async def convert_file( self, file_bytes: bytes, filename: str, to_format: str, from_format: str | None = None, options: ConversionOptions | None = None, ) -> ConversionResult: """Convert a file from one format to another. Args: file_bytes: The file content as bytes filename: Original filename (used to detect format) to_format: Target format from_format: Source format (optional, detected from extension if not provided) options: Optional conversion options Returns: ConversionResult with the converted content """ options = options or ConversionOptions() # Detect source format from filename if not provided if from_format is None: from_format = self._detect_format(filename) # Validate formats await self._validate_formats(from_format, to_format) # Use temporary files for binary formats with tempfile.TemporaryDirectory() as tmpdir: input_path = Path(tmpdir) / filename output_ext = self._get_extension(to_format) output_path = Path(tmpdir) / f"output.{output_ext}" # Write input file input_path.write_bytes(file_bytes) # Build command with file paths cmd = self._build_command( from_format, to_format, options, input_file=str(input_path), output_file=str(output_path), ) # Execute conversion await self._execute_pandoc( cmd, from_format=from_format, to_format=to_format, ) # Read output if output_path.exists(): output = output_path.read_bytes() else: raise ConversionError( "Output file was not created", from_format=from_format, to_format=to_format, ) return ConversionResult( content=output, content_type=get_content_type(to_format), is_binary=is_binary_format(to_format), from_format=from_format, to_format=to_format, ) async def convert_file_base64( self, file_base64: str, filename: str, to_format: str, from_format: str | None = None, options: ConversionOptions | None = None, ) -> ConversionResult: """Convert a base64-encoded file. Args: file_base64: Base64-encoded file content filename: Original filename to_format: Target format from_format: Source format (optional) options: Conversion options Returns: ConversionResult with converted content """ try: file_bytes = base64.b64decode(file_base64) except Exception as e: raise ConversionError(f"Invalid base64 encoding: {e}") from e return await self.convert_file( file_bytes=file_bytes, filename=filename, to_format=to_format, from_format=from_format, options=options, ) async def get_formats(self) -> FormatList: """Get supported input and output formats.""" return await self.format_manager.get_formats() async def get_pandoc_version(self) -> str: """Get Pandoc version string.""" try: proc = await asyncio.create_subprocess_exec( "pandoc", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() if proc.returncode == 0: # First line contains version first_line = stdout.decode().strip().split("\n")[0] return first_line return "unknown" except FileNotFoundError: return "not installed" async def _validate_formats(self, from_format: str, to_format: str) -> None: """Validate that formats are supported.""" formats = await self.format_manager.get_formats() if from_format.lower() not in formats.input: raise FormatNotSupportedError( from_format, format_type="input", supported_formats=formats.input[:10], # Show first 10 ) if to_format.lower() not in formats.output: raise FormatNotSupportedError( to_format, format_type="output", supported_formats=formats.output[:10], ) def _build_command( self, from_format: str, to_format: str, options: ConversionOptions, input_file: str | None = None, output_file: str | None = None, ) -> list[str]: """Build pandoc command line arguments.""" cmd = ["pandoc", "-f", from_format, "-t", to_format] if options.standalone: cmd.append("--standalone") if options.table_of_contents: cmd.append("--toc") if options.number_sections: cmd.append("--number-sections") if options.wrap != "auto": cmd.extend(["--wrap", options.wrap]) cmd.extend(["--columns", str(options.columns)]) if options.extra_args: cmd.extend(options.extra_args) if input_file: cmd.append(input_file) if output_file: cmd.extend(["-o", output_file]) return cmd async def _execute_pandoc( self, cmd: list[str], input_data: bytes | None = None, from_format: str = "", to_format: str = "", ) -> bytes: """Execute pandoc command and return output.""" try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE if input_data else None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=input_data), timeout=self.settings.conversion_timeout, ) except builtins.TimeoutError as e: proc.kill() await proc.wait() raise TimeoutError(self.settings.conversion_timeout) from e if proc.returncode != 0: error_msg = stderr.decode("utf-8", errors="replace").strip() raise ConversionError( f"Pandoc conversion failed: {error_msg}", from_format=from_format, to_format=to_format, pandoc_error=error_msg, ) return stdout except FileNotFoundError as e: raise ConversionError( "Pandoc is not installed or not found in PATH", from_format=from_format, to_format=to_format, ) from e def _detect_format(self, filename: str) -> str: """Detect format from filename extension.""" ext_to_format: dict[str, str] = { ".md": "markdown", ".markdown": "markdown", ".html": "html", ".htm": "html", ".tex": "latex", ".latex": "latex", ".docx": "docx", ".odt": "odt", ".epub": "epub", ".rst": "rst", ".txt": "plain", ".json": "json", ".xml": "docbook", ".org": "org", ".rtf": "rtf", ".ipynb": "ipynb", } suffix = Path(filename).suffix.lower() format_name = ext_to_format.get(suffix) if format_name is None: raise FormatNotSupportedError( suffix, format_type="input (file extension)", ) return format_name def _get_extension(self, format_name: str) -> str: """Get file extension for a format.""" format_to_ext: dict[str, str] = { "markdown": "md", "gfm": "md", "commonmark": "md", "html": "html", "html5": "html", "latex": "tex", "pdf": "pdf", "docx": "docx", "odt": "odt", "epub": "epub", "pptx": "pptx", "rst": "rst", "plain": "txt", "json": "json", "native": "hs", "org": "org", "rtf": "rtf", "asciidoc": "adoc", "man": "1", } return format_to_ext.get(format_name.lower(), format_name)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Fu-Jie/MCP-OPENAPI-Pandoc'

If you have feedback or need assistance with the MCP directory API, please join our Discord server