Skip to main content
Glama

Quarkdown MCP Server

by Lillard01
batch.py22.7 kB
"""Batch conversion tool for Quarkdown MCP server. This module provides the convert_batch tool that processes multiple Quarkdown documents in batch mode for efficient bulk operations. """ import asyncio import os import tempfile from pathlib import Path from typing import Any, Dict, List, Optional from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from .base import BaseTool class ConvertBatchTool(BaseTool): """Tool for batch processing multiple Quarkdown documents. This tool allows users to convert multiple documents at once, apply consistent formatting, and generate reports on the conversion process. """ def get_tool_definition(self) -> Tool: """Get the MCP tool definition for batch conversion. Returns: Tool definition object for MCP registration """ return Tool( name="convert_batch", description="Convert multiple Quarkdown documents in batch mode with consistent settings", inputSchema={ "type": "object", "properties": { "documents": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string", "description": "Document name or identifier" }, "content": { "type": "string", "description": "Quarkdown source content" }, "output_name": { "type": "string", "description": "Custom output filename (optional)" } }, "required": ["name", "content"] }, "description": "List of documents to process" }, "output_format": { "type": "string", "enum": ["html", "pdf", "latex", "markdown", "docx"], "default": "html", "description": "Output format for all documents" }, "output_directory": { "type": "string", "description": "Directory to save converted files (optional, uses temp dir if not specified)" }, "template": { "type": "string", "description": "Template to apply to all documents (optional)" }, "parallel_processing": { "type": "boolean", "default": True, "description": "Enable parallel processing for faster conversion" }, "max_workers": { "type": "integer", "default": 4, "minimum": 1, "maximum": 16, "description": "Maximum number of parallel workers" }, "continue_on_error": { "type": "boolean", "default": True, "description": "Continue processing other documents if one fails" }, "generate_index": { "type": "boolean", "default": False, "description": "Generate an index file listing all converted documents" }, "common_variables": { "type": "object", "description": "Common variables to apply to all documents" } }, "required": ["documents"] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent | ImageContent | EmbeddedResource]: """Execute the batch conversion. Args: arguments: Tool execution arguments containing documents and conversion options Returns: List containing the batch conversion results and summary """ try: # Validate required arguments self._validate_required_args(arguments, ["documents"]) documents = arguments["documents"] output_format = arguments.get("output_format", "html") output_directory = arguments.get("output_directory") template = arguments.get("template") parallel_processing = arguments.get("parallel_processing", True) max_workers = arguments.get("max_workers", 4) continue_on_error = arguments.get("continue_on_error", True) generate_index = arguments.get("generate_index", False) common_variables = arguments.get("common_variables", {}) # Validate documents if not documents or len(documents) == 0: return [self._create_error_content("No documents provided for batch conversion")] # Setup output directory if output_directory: output_dir = Path(output_directory) output_dir.mkdir(parents=True, exist_ok=True) else: output_dir = Path(tempfile.mkdtemp(prefix="quarkdown_batch_")) # Initialize conversion tracking conversion_results = [] successful_conversions = [] failed_conversions = [] # Start batch conversion start_time = asyncio.get_event_loop().time() if parallel_processing and len(documents) > 1: # Parallel processing conversion_results = await self._process_documents_parallel( documents, output_dir, output_format, template, common_variables, max_workers, continue_on_error ) else: # Sequential processing conversion_results = await self._process_documents_sequential( documents, output_dir, output_format, template, common_variables, continue_on_error ) end_time = asyncio.get_event_loop().time() total_time = end_time - start_time # Categorize results for result in conversion_results: if result["success"]: successful_conversions.append(result) else: failed_conversions.append(result) # Generate index file if requested index_file = None if generate_index and successful_conversions: index_file = await self._generate_index_file( successful_conversions, output_dir, output_format ) # Prepare response response_parts = [] # Add summary summary = self._generate_conversion_summary( len(documents), len(successful_conversions), len(failed_conversions), total_time ) response_parts.append(self._create_text_content(f"**Batch Conversion Summary**\n{summary}")) # Add successful conversions details if successful_conversions: success_details = self._format_successful_conversions(successful_conversions) response_parts.append(self._create_success_content( f"**Successful Conversions ({len(successful_conversions)})**\n{success_details}" )) # Add failed conversions details if failed_conversions: failure_details = self._format_failed_conversions(failed_conversions) response_parts.append(self._create_error_content( f"**Failed Conversions ({len(failed_conversions)})**\n{failure_details}" )) # Add output directory information response_parts.append(self._create_text_content( f"**Output Directory**: {output_dir}\n" f"**Total Files Created**: {len(successful_conversions) + (1 if index_file else 0)}" )) # Add index file information if index_file: response_parts.append(self._create_text_content( f"**Index File**: {index_file}\n" "Open this file to navigate between all converted documents." )) # Add performance statistics perf_stats = self._generate_performance_stats( conversion_results, total_time, parallel_processing ) response_parts.append(self._create_text_content( f"**Performance Statistics**\n{perf_stats}" )) return response_parts except Exception as e: return [self._create_error_content(f"Batch conversion error: {str(e)}")] async def _process_documents_parallel( self, documents: List[Dict], output_dir: Path, output_format: str, template: Optional[str], common_variables: Dict, max_workers: int, continue_on_error: bool ) -> List[Dict[str, Any]]: """Process documents in parallel. Args: documents: List of documents to process output_dir: Output directory output_format: Target format template: Template to use common_variables: Common variables max_workers: Maximum parallel workers continue_on_error: Whether to continue on errors Returns: List of conversion results """ semaphore = asyncio.Semaphore(max_workers) async def process_single_document(doc: Dict) -> Dict[str, Any]: async with semaphore: return await self._convert_single_document( doc, output_dir, output_format, template, common_variables ) # Create tasks for all documents tasks = [process_single_document(doc) for doc in documents] if continue_on_error: # Use gather with return_exceptions to continue on errors results = await asyncio.gather(*tasks, return_exceptions=True) # Convert exceptions to error results processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append({ "name": documents[i]["name"], "success": False, "error": str(result), "output_file": None }) else: processed_results.append(result) return processed_results else: # Use gather without return_exceptions to fail fast return await asyncio.gather(*tasks) async def _process_documents_sequential( self, documents: List[Dict], output_dir: Path, output_format: str, template: Optional[str], common_variables: Dict, continue_on_error: bool ) -> List[Dict[str, Any]]: """Process documents sequentially. Args: documents: List of documents to process output_dir: Output directory output_format: Target format template: Template to use common_variables: Common variables continue_on_error: Whether to continue on errors Returns: List of conversion results """ results = [] for doc in documents: try: result = await self._convert_single_document( doc, output_dir, output_format, template, common_variables ) results.append(result) except Exception as e: error_result = { "name": doc["name"], "success": False, "error": str(e), "output_file": None } results.append(error_result) if not continue_on_error: break return results async def _convert_single_document( self, document: Dict, output_dir: Path, output_format: str, template: Optional[str], common_variables: Dict ) -> Dict[str, Any]: """Convert a single document. Args: document: Document to convert output_dir: Output directory output_format: Target format template: Template to use common_variables: Common variables Returns: Conversion result dictionary """ try: doc_name = document["name"] doc_content = document["content"] output_name = document.get("output_name", f"{doc_name}.{output_format}") # Apply common variables to content if common_variables: doc_content = self._apply_variables(doc_content, common_variables) # Convert the document converted_content = await self.wrapper.compile_document( source_content=doc_content, output_format=output_format, template=template ) # Save to output file output_file = output_dir / output_name with open(output_file, 'w', encoding='utf-8') as f: f.write(converted_content) return { "name": doc_name, "success": True, "output_file": str(output_file), "size": len(converted_content), "error": None } except Exception as e: return { "name": document["name"], "success": False, "output_file": None, "size": 0, "error": str(e) } def _apply_variables(self, content: str, variables: Dict) -> str: """Apply variables to document content. Args: content: Document content variables: Variables to apply Returns: Content with variables applied """ for key, value in variables.items(): placeholder = f"${{{key}}}" content = content.replace(placeholder, str(value)) return content async def _generate_index_file( self, successful_conversions: List[Dict], output_dir: Path, output_format: str ) -> str: """Generate an index file for all converted documents. Args: successful_conversions: List of successful conversion results output_dir: Output directory output_format: Output format Returns: Path to the generated index file """ index_content = self._create_index_content(successful_conversions, output_format) if output_format == "html": index_file = output_dir / "index.html" else: index_file = output_dir / "index.md" with open(index_file, 'w', encoding='utf-8') as f: f.write(index_content) return str(index_file) def _create_index_content(self, conversions: List[Dict], output_format: str) -> str: """Create content for the index file. Args: conversions: List of conversion results output_format: Output format Returns: Index file content """ if output_format == "html": return self._create_html_index(conversions) else: return self._create_markdown_index(conversions) def _create_html_index(self, conversions: List[Dict]) -> str: """Create HTML index content. Args: conversions: List of conversion results Returns: HTML index content """ html_content = [ "<!DOCTYPE html>", "<html lang='en'>", "<head>", " <meta charset='UTF-8'>", " <meta name='viewport' content='width=device-width, initial-scale=1.0'>", " <title>Quarkdown Batch Conversion Index</title>", " <style>", " body { font-family: Arial, sans-serif; margin: 40px; }", " h1 { color: #333; }", " .document-list { list-style-type: none; padding: 0; }", " .document-item { margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 5px; }", " .document-link { text-decoration: none; color: #0066cc; font-weight: bold; }", " .document-link:hover { text-decoration: underline; }", " .document-size { color: #666; font-size: 0.9em; }", " </style>", "</head>", "<body>", " <h1>Quarkdown Batch Conversion Index</h1>", f" <p>Generated on {asyncio.get_event_loop().time()}</p>", " <ul class='document-list'>" ] for conv in conversions: filename = os.path.basename(conv["output_file"]) size_kb = conv["size"] / 1024 html_content.extend([ " <li class='document-item'>", f" <a href='{filename}' class='document-link'>{conv['name']}</a>", f" <div class='document-size'>Size: {size_kb:.1f} KB</div>", " </li>" ]) html_content.extend([ " </ul>", "</body>", "</html>" ]) return "\n".join(html_content) def _create_markdown_index(self, conversions: List[Dict]) -> str: """Create Markdown index content. Args: conversions: List of conversion results Returns: Markdown index content """ md_content = [ "# Quarkdown Batch Conversion Index", "", f"Generated on {asyncio.get_event_loop().time()}", "", "## Converted Documents", "" ] for conv in conversions: filename = os.path.basename(conv["output_file"]) size_kb = conv["size"] / 1024 md_content.append(f"- [{conv['name']}]({filename}) ({size_kb:.1f} KB)") return "\n".join(md_content) def _generate_conversion_summary( self, total: int, successful: int, failed: int, time_taken: float ) -> str: """Generate conversion summary. Args: total: Total documents successful: Successful conversions failed: Failed conversions time_taken: Time taken in seconds Returns: Formatted summary string """ success_rate = (successful / total * 100) if total > 0 else 0 summary = [ f"- **Total Documents**: {total}", f"- **Successful**: {successful}", f"- **Failed**: {failed}", f"- **Success Rate**: {success_rate:.1f}%", f"- **Time Taken**: {time_taken:.2f} seconds", f"- **Average Time per Document**: {time_taken/total:.2f} seconds" if total > 0 else "- **Average Time**: N/A" ] return "\n".join(summary) def _format_successful_conversions(self, conversions: List[Dict]) -> str: """Format successful conversion details. Args: conversions: List of successful conversions Returns: Formatted details string """ details = [] for conv in conversions: size_kb = conv["size"] / 1024 details.append(f"✅ **{conv['name']}** → {conv['output_file']} ({size_kb:.1f} KB)") return "\n".join(details) def _format_failed_conversions(self, conversions: List[Dict]) -> str: """Format failed conversion details. Args: conversions: List of failed conversions Returns: Formatted details string """ details = [] for conv in conversions: details.append(f"❌ **{conv['name']}**: {conv['error']}") return "\n".join(details) def _generate_performance_stats( self, results: List[Dict], total_time: float, parallel: bool ) -> str: """Generate performance statistics. Args: results: Conversion results total_time: Total processing time parallel: Whether parallel processing was used Returns: Formatted performance statistics """ total_size = sum(r.get("size", 0) for r in results if r.get("success")) total_size_mb = total_size / (1024 * 1024) stats = [ f"- **Processing Mode**: {'Parallel' if parallel else 'Sequential'}", f"- **Total Output Size**: {total_size_mb:.2f} MB", f"- **Processing Speed**: {total_size_mb/total_time:.2f} MB/s" if total_time > 0 else "- **Processing Speed**: N/A", f"- **Documents per Second**: {len(results)/total_time:.2f}" if total_time > 0 else "- **Documents per Second**: N/A" ] return "\n".join(stats)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/quarkdown-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server