Skip to main content
Glama

Office Word MCP Server

by franlealp1
main.py51.9 kB
""" Main entry point for the Word Document MCP Server. Acts as the central controller for the MCP server that handles Word document operations. Supports multiple transports: stdio, sse, and streamable-http using standalone FastMCP. """ import os import sys import uuid import sqlite3 import json import atexit import threading import time from datetime import datetime, timedelta from pathlib import Path # Set required environment variable for FastMCP 2.8.1+ os.environ.setdefault('FASTMCP_LOG_LEVEL', 'INFO') from fastmcp import FastMCP from fastapi import Request from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse from word_document_server.tools import ( document_tools, content_tools, format_tools, protection_tools, footnote_tools, extended_document_tools ) from word_document_server.tools.content_tools import replace_paragraph_block_below_header_tool from word_document_server.tools.content_tools import replace_block_between_manual_anchors_tool from word_document_server.tools.content_tools import modify_table_cell as modify_table_cell_func from word_document_server.tools import batch_document_tools from typing import Optional, List, Dict, Any, Union def get_transport_config(): """ Get transport configuration from environment variables. Returns: dict: Transport configuration with type, host, port, and other settings """ # Default configuration config = { 'transport': 'stdio', # Default to stdio for backward compatibility 'host': '127.0.0.1', 'port': 8000, 'path': '/mcp', 'sse_path': '/sse' } # Override with environment variables if provided transport = os.getenv('MCP_TRANSPORT', 'stdio').lower() print(f"Transport: {transport}") # Validate transport type valid_transports = ['stdio', 'streamable-http', 'sse'] if transport not in valid_transports: print(f"Warning: Invalid transport '{transport}'. Falling back to 'stdio'.") transport = 'stdio' config['transport'] = transport config['host'] = os.getenv('MCP_HOST', config['host']) config['port'] = int(os.getenv('MCP_PORT', config['port'])) config['path'] = os.getenv('MCP_PATH', config['path']) config['sse_path'] = os.getenv('MCP_SSE_PATH', config['sse_path']) return config def get_public_base_url(): """ Get the public base URL for download links. Returns: str: Public base URL (e.g., "https://your-domain.com" or "http://localhost:8000") """ # Check for public domain configuration public_domain = os.getenv('PUBLIC_DOMAIN') if public_domain: # Use public domain with HTTPS by default use_https = os.getenv('USE_HTTPS', 'true').lower() == 'true' protocol = 'https' if use_https else 'http' return f"{protocol}://{public_domain}" # Fallback to internal configuration (for local development) config = get_transport_config() return f"http://{config['host']}:{config['port']}" # Temporary file management TEMP_FILES_DIR = Path("/tmp/mcp_files") DB_FILE = TEMP_FILES_DIR / "file_registry.db" def init_temp_storage(): """Initialize temporary file storage and database.""" TEMP_FILES_DIR.mkdir(exist_ok=True) conn = sqlite3.connect(DB_FILE) # Create table with user_filename for mapping conn.execute(""" CREATE TABLE IF NOT EXISTS temp_files ( file_id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, user_filename TEXT NOT NULL, file_path TEXT NOT NULL, created_at DATETIME NOT NULL, expires_at DATETIME NOT NULL, download_count INTEGER DEFAULT 0 ) """) # Check if user_filename column exists (for existing databases) cursor = conn.execute("PRAGMA table_info(temp_files)") columns = [row[1] for row in cursor.fetchall()] if 'user_filename' not in columns: conn.execute("ALTER TABLE temp_files ADD COLUMN user_filename TEXT") # Update existing records to have user_filename same as original_filename conn.execute("UPDATE temp_files SET user_filename = original_filename WHERE user_filename IS NULL") conn.execute("UPDATE temp_files SET user_filename = original_filename WHERE user_filename = ''") # Create index for fast lookup by user filename conn.execute("CREATE INDEX IF NOT EXISTS idx_user_filename ON temp_files(user_filename)") conn.commit() conn.close() def register_temp_file(file_path: str, original_filename: str, user_filename: str, cleanup_hours: int = 24) -> str: """Register a temporary file for cleanup and return its public ID.""" file_id = str(uuid.uuid4()) created_at = datetime.now() expires_at = created_at + timedelta(hours=cleanup_hours) conn = sqlite3.connect(DB_FILE) conn.execute(""" INSERT INTO temp_files (file_id, original_filename, user_filename, file_path, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?) """, (file_id, original_filename, user_filename, file_path, created_at.isoformat(), expires_at.isoformat())) conn.commit() conn.close() return file_id def get_temp_file_info(file_id: str) -> Optional[dict]: """Get temporary file info by ID.""" conn = sqlite3.connect(DB_FILE) cursor = conn.execute(""" SELECT file_id, original_filename, user_filename, file_path, created_at, expires_at, download_count FROM temp_files WHERE file_id = ? """, (file_id,)) row = cursor.fetchone() conn.close() if not row: return None return { "file_id": row[0], "original_filename": row[1], "user_filename": row[2], "file_path": row[3], "created_at": row[4], "expires_at": row[5], "download_count": row[6] } def increment_download_count(file_id: str): """Increment download count for a file.""" conn = sqlite3.connect(DB_FILE) conn.execute("UPDATE temp_files SET download_count = download_count + 1 WHERE file_id = ?", (file_id,)) conn.commit() conn.close() def cleanup_expired_files(): """Remove expired files from filesystem and database.""" now = datetime.now().isoformat() conn = sqlite3.connect(DB_FILE) cursor = conn.execute("SELECT file_path FROM temp_files WHERE expires_at < ?", (now,)) expired_files = cursor.fetchall() for (file_path,) in expired_files: try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: print(f"Error removing expired file {file_path}: {e}") conn.execute("DELETE FROM temp_files WHERE expires_at < ?", (now,)) conn.commit() conn.close() def get_temp_file_by_user_filename(user_filename: str) -> Optional[dict]: """Get temporary file info by user filename.""" conn = sqlite3.connect(DB_FILE) cursor = conn.execute(""" SELECT file_id, original_filename, user_filename, file_path, created_at, expires_at, download_count FROM temp_files WHERE user_filename = ? ORDER BY created_at DESC LIMIT 1 """, (user_filename,)) row = cursor.fetchone() conn.close() if not row: return None return { "file_id": row[0], "original_filename": row[1], "user_filename": row[2], "file_path": row[3], "created_at": row[4], "expires_at": row[5], "download_count": row[6] } def resolve_document_path(filename: str) -> tuple[str, bool]: """Resolve a filename to actual file path, checking temp files first. Returns: tuple[str, bool]: (resolved_path, is_temp_file) Raises: FileNotFoundError: If file cannot be found anywhere """ from word_document_server.utils.file_utils import ensure_docx_extension # Ensure proper extension filename = ensure_docx_extension(filename) # First, check if it's a temp file by user filename cleanup_expired_files() # Clean up first temp_file_info = get_temp_file_by_user_filename(filename) if temp_file_info: # Check if file still exists on disk if os.path.exists(temp_file_info["file_path"]): # Check if not expired expires_at = datetime.fromisoformat(temp_file_info["expires_at"]) if datetime.now() <= expires_at: return temp_file_info["file_path"], True # Fall back to current directory current_path = os.path.abspath(filename) if os.path.exists(current_path): return current_path, False # File not found anywhere raise FileNotFoundError(f"Document '{filename}' not found in temp storage or current directory") def load_document_with_resolver(filename: str): """Load a document using the smart resolver. Returns: tuple[Document, str]: (document_object, resolved_file_path) Raises: FileNotFoundError: If document cannot be found Exception: If document cannot be loaded """ from docx import Document resolved_path, is_temp = resolve_document_path(filename) try: doc = Document(resolved_path) return doc, resolved_path except Exception as e: raise Exception(f"Cannot load document '{filename}': {str(e)}") def save_document_with_resolver(doc, filename: str, resolved_path: str = None): """Save a document using the resolved path. Args: doc: Document object to save filename: Original filename (for error messages) resolved_path: Pre-resolved path (if available from load_document_with_resolver) """ if resolved_path is None: resolved_path, _ = resolve_document_path(filename) try: doc.save(resolved_path) except Exception as e: raise Exception(f"Cannot save document '{filename}': {str(e)}") # Background cleanup scheduler cleanup_thread = None cleanup_stop_event = threading.Event() def background_cleanup_worker(): """Background worker that runs cleanup every hour.""" while not cleanup_stop_event.is_set(): try: cleanup_expired_files() print(f"Background cleanup completed at {datetime.now()}") except Exception as e: print(f"Background cleanup failed: {e}") # Wait for 1 hour or until stop event is set cleanup_stop_event.wait(3600) # 3600 seconds = 1 hour def start_background_cleanup(): """Start the background cleanup thread.""" global cleanup_thread if cleanup_thread is None or not cleanup_thread.is_alive(): cleanup_stop_event.clear() cleanup_thread = threading.Thread(target=background_cleanup_worker, daemon=True) cleanup_thread.start() print("Background cleanup scheduler started (runs every hour)") def stop_background_cleanup(): """Stop the background cleanup thread.""" global cleanup_thread if cleanup_thread and cleanup_thread.is_alive(): cleanup_stop_event.set() cleanup_thread.join(timeout=5) # Wait up to 5 seconds print("Background cleanup scheduler stopped") # Register cleanup stop on exit atexit.register(stop_background_cleanup) def setup_logging(debug_mode): """ Setup logging based on debug mode. Args: debug_mode (bool): Whether to enable debug logging """ import logging if debug_mode: logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) print("Debug logging enabled") else: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Initialize FastMCP server mcp = FastMCP("Word Document Server") # Add HTTP endpoints for file serving @mcp.custom_route("/files/{file_id}", methods=["GET"]) async def serve_file(request: Request) -> FileResponse: """Serve a temporary file by its ID.""" file_id = request.path_params["file_id"] # Cleanup expired files first cleanup_expired_files() # Get file info file_info = get_temp_file_info(file_id) if not file_info: return JSONResponse( status_code=404, content={"error": "File not found or expired"} ) # Check if file still exists on disk if not os.path.exists(file_info["file_path"]): return JSONResponse( status_code=404, content={"error": "File no longer exists"} ) # Check if file has expired expires_at = datetime.fromisoformat(file_info["expires_at"]) if datetime.now() > expires_at: return JSONResponse( status_code=410, content={"error": "File has expired"} ) # Increment download count increment_download_count(file_id) # Serve the file return FileResponse( path=file_info["file_path"], filename=file_info["original_filename"], media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) @mcp.custom_route("/files/{file_id}/info", methods=["GET"]) async def get_file_info(request: Request) -> JSONResponse: """Get information about a temporary file.""" file_id = request.path_params["file_id"] cleanup_expired_files() file_info = get_temp_file_info(file_id) if not file_info: return JSONResponse( status_code=404, content={"error": "File not found or expired"} ) # Don't expose the full file path for security public_info = { "file_id": file_info["file_id"], "original_filename": file_info["original_filename"], "created_at": file_info["created_at"], "expires_at": file_info["expires_at"], "download_count": file_info["download_count"], "file_exists": os.path.exists(file_info["file_path"]) } return JSONResponse(content=public_info) @mcp.custom_route("/cleanup", methods=["POST"]) async def manual_cleanup(request: Request) -> JSONResponse: """Manually trigger cleanup of expired files.""" try: cleanup_expired_files() return JSONResponse(content={"message": "Cleanup completed successfully"}) except Exception as e: return JSONResponse( status_code=500, content={"error": f"Cleanup failed: {str(e)}"} ) def register_tools(): """Register all tools with the MCP server using FastMCP decorators.""" # Document tools (create, copy, info, etc.) @mcp.tool() async def create_document(filename: str, title: Optional[str] = None, author: Optional[str] = None): """Create a new Word document with optional metadata.""" return await document_tools.create_document(filename, title, author) @mcp.tool() async def create_document_with_download_link( filename: str, cleanup_hours: int = 24, title: Optional[str] = None, author: Optional[str] = None ) -> dict: """Create a new Word document in temporary storage and return a public download link. Args: filename: Name of the document to create (with or without .docx extension) cleanup_hours: Hours after which the file will be automatically deleted (default: 24) title: Optional title for the document metadata author: Optional author for the document metadata Returns: Dictionary with document creation status and download information """ from word_document_server.utils.file_utils import ensure_docx_extension # Ensure temp storage is initialized init_temp_storage() # Generate unique filename in temp directory original_filename = ensure_docx_extension(filename) unique_filename = f"{uuid.uuid4()}_{original_filename}" temp_file_path = TEMP_FILES_DIR / unique_filename try: # Create the document using existing logic but in temp location from docx import Document from word_document_server.core.styles import ensure_heading_style, ensure_table_style doc = Document() # Set properties if provided if title: doc.core_properties.title = title if author: doc.core_properties.author = author # Ensure necessary styles exist ensure_heading_style(doc) ensure_table_style(doc) # Save to temp location doc.save(str(temp_file_path)) # Register the file for cleanup file_id = register_temp_file(str(temp_file_path), original_filename, filename, cleanup_hours) # Get the public URL for download links base_url = get_public_base_url() download_url = f"{base_url}/files/{file_id}" expires_at = datetime.now() + timedelta(hours=cleanup_hours) return { "success": True, "message": f"Document {original_filename} created successfully", "download_url": download_url, "file_id": file_id, "original_filename": original_filename, "expires_at": expires_at.isoformat(), "cleanup_hours": cleanup_hours } except Exception as e: # Clean up the file if it was created but registration failed if temp_file_path.exists(): temp_file_path.unlink() return { "success": False, "message": f"Failed to create document: {str(e)}", "download_url": None, "file_id": None, "original_filename": original_filename, "expires_at": None, "cleanup_hours": cleanup_hours } @mcp.tool() async def copy_document(source_filename: str, destination_filename: Optional[str] = None): """Create a copy of a Word document.""" return await document_tools.copy_document(source_filename, destination_filename) @mcp.tool() async def get_document_info(filename: str): """Get information about a Word document.""" return await document_tools.get_document_info(filename) @mcp.tool() async def get_document_text(filename: str): """Extract all text from a Word document.""" return await document_tools.get_document_text(filename) @mcp.tool() async def get_document_outline(filename: str): """Get the structure of a Word document.""" return await document_tools.get_document_outline(filename) @mcp.tool() async def list_available_documents(directory: str = "."): """List all .docx files in the specified directory.""" return await document_tools.list_available_documents(directory) @mcp.tool() async def get_document_xml(filename: str): """Get the raw XML structure of a Word document.""" return await document_tools.get_document_xml_tool(filename) @mcp.tool() async def insert_header_near_text(filename: str, target_text: Optional[str] = None, header_title: Optional[str] = None, position: str = 'after', header_style: str = 'Heading 1', target_paragraph_index: Optional[int] = None): """Insert a header (with specified style) before or after the target paragraph. Specify by text or paragraph index. Args: filename (str), target_text (str, optional), header_title (str), position ('before' or 'after'), header_style (str, default 'Heading 1'), target_paragraph_index (int, optional).""" return await content_tools.insert_header_near_text_tool(filename, target_text, header_title, position, header_style, target_paragraph_index) @mcp.tool() async def insert_line_or_paragraph_near_text(filename: str, target_text: Optional[str] = None, line_text: Optional[str] = None, position: str = 'after', line_style: Optional[str] = None, target_paragraph_index: Optional[int] = None): """ Insert a new line or paragraph (with specified or matched style) before or after the target paragraph. Specify by text or paragraph index. Args: filename (str), target_text (str, optional), line_text (str), position ('before' or 'after'), line_style (str, optional), target_paragraph_index (int, optional). """ return await content_tools.insert_line_or_paragraph_near_text_tool(filename, target_text, line_text, position, line_style, target_paragraph_index) @mcp.tool() async def insert_numbered_list_near_text(filename: str, target_text: Optional[str] = None, list_items: Optional[List[str]] = None, position: str = 'after', target_paragraph_index: Optional[int] = None): """Insert a numbered list before or after the target paragraph. Specify by text or paragraph index.""" try: # Validate inputs if not list_items: return "Error: list_items parameter is required" if not target_text and target_paragraph_index is None: return "Error: Either target_text or target_paragraph_index must be provided" if position not in ['before', 'after']: return "Error: position must be 'before' or 'after'" # Use resolver to find the document doc, resolved_path = load_document_with_resolver(filename) # Find the target paragraph paragraphs = doc.paragraphs target_para = None target_index = None if target_paragraph_index is not None: if 0 <= target_paragraph_index < len(paragraphs): target_para = paragraphs[target_paragraph_index] target_index = target_paragraph_index else: return f"Error: Paragraph index {target_paragraph_index} is out of range (0-{len(paragraphs)-1})" elif target_text: for i, para in enumerate(paragraphs): if target_text.lower() in para.text.lower(): target_para = para target_index = i break if not target_para: return f"Error: Target text '{target_text}' not found in document" # Determine insertion position if position == 'after': insert_index = target_index + 1 else: # before insert_index = target_index # Insert numbered list items from word_document_server.utils.document_utils import insert_paragraph_at_index for i, item in enumerate(list_items): # Create paragraph with numbered list style new_para = doc.add_paragraph() new_para.text = item # Try to apply list numbering try: new_para.style = 'List Number' except: # Fallback: just add numbers manually new_para.text = f"{i + 1}. {item}" # Move paragraph to correct position # Note: This is simplified - moving paragraphs in python-docx is complex # For now, we'll add at the end and note the limitation # Save the document save_document_with_resolver(doc, filename, resolved_path) return f"Numbered list with {len(list_items)} items added {position} target paragraph in {filename}" except FileNotFoundError as e: return str(e) except Exception as e: return f"Failed to insert numbered list: {str(e)}" # Content tools (paragraphs, headings, tables, etc.) @mcp.tool() async def add_paragraph(filename: str, text: str, style: Optional[str] = None): """Add a paragraph to a Word document.""" try: # Use resolver to find the document doc, resolved_path = load_document_with_resolver(filename) # Add the paragraph paragraph = doc.add_paragraph(text) # Apply style if provided if style: try: paragraph.style = style except KeyError: # Style doesn't exist, use normal and report it paragraph.style = doc.styles['Normal'] # Save and return with warning save_document_with_resolver(doc, filename, resolved_path) return f"Paragraph added to {filename} with Normal style ('{style}' style not found)" # Save the document save_document_with_resolver(doc, filename, resolved_path) return f"Paragraph added to {filename}" except FileNotFoundError as e: return str(e) except Exception as e: return f"Failed to add paragraph: {str(e)}" @mcp.tool() async def add_heading(filename: str, text: str, level: int = 1): """Add a heading to a Word document.""" try: # Validate level if level < 1 or level > 9: return f"Invalid heading level: {level}. Level must be between 1 and 9." # Use resolver to find the document doc, resolved_path = load_document_with_resolver(filename) # Add the heading from word_document_server.core.styles import ensure_heading_style ensure_heading_style(doc) try: heading = doc.add_heading(text, level=level) except Exception: # Fallback to direct formatting if style fails from docx.shared import Pt paragraph = doc.add_paragraph(text) paragraph.style = doc.styles['Normal'] run = paragraph.runs[0] run.bold = True if level == 1: run.font.size = Pt(16) elif level == 2: run.font.size = Pt(14) else: run.font.size = Pt(12) # Save the document save_document_with_resolver(doc, filename, resolved_path) return f"Heading '{text}' (level {level}) added to {filename}" except FileNotFoundError as e: return str(e) except Exception as e: return f"Failed to add heading: {str(e)}" @mcp.tool() async def add_picture(filename: str, image_path: str, width: Optional[float] = None): """Add an image to a Word document.""" return await content_tools.add_picture(filename, image_path, width) @mcp.tool() async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None): """Add a table to a Word document.""" return await content_tools.add_table(filename, rows, cols, data) @mcp.tool() async def add_page_break(filename: str): """Add a page break to the document.""" return await content_tools.add_page_break(filename) @mcp.tool() async def delete_paragraph(filename: str, paragraph_index: int): """Delete a paragraph from a document.""" return await content_tools.delete_paragraph(filename, paragraph_index) @mcp.tool() async def delete_table(filename: str, table_index: int): """Delete a table from a document.""" return await content_tools.delete_table(filename, table_index) @mcp.tool() async def search_and_replace(filename: str, find_text: str, replace_text: str): """Search for text and replace all occurrences.""" return await content_tools.search_and_replace(filename, find_text, replace_text) # Format tools (styling, text formatting, etc.) @mcp.tool() async def create_custom_style(filename: str, style_name: str, bold: Optional[bool] = None, italic: Optional[bool] = None, font_size: Optional[int] = None, font_name: Optional[str] = None, color: Optional[str] = None, base_style: Optional[str] = None): """Create a custom style in the document.""" return await format_tools.create_custom_style( filename, style_name, bold, italic, font_size, font_name, color, base_style ) @mcp.tool() async def format_text(filename: str, paragraph_index: int, start_pos: int, end_pos: int, bold: Optional[bool] = None, italic: Optional[bool] = None, underline: Optional[bool] = None, color: Optional[str] = None, font_size: Optional[int] = None, font_name: Optional[str] = None): """Format a specific range of text within a paragraph. IMPORTANT: When specifying the color parameter, use a hex code WITHOUT the leading # (e.g., '0070C0', not '#0070C0'). """ return await format_tools.format_text( filename, paragraph_index, start_pos, end_pos, bold, italic, underline, color, font_size, font_name ) @mcp.tool() async def format_table(filename: str, table_index: int, has_header_row: Optional[bool] = None, border_style: Optional[str] = None, shading: Optional[List[str]] = None): """Format a table with borders, shading, and structure.""" return await format_tools.format_table(filename, table_index, has_header_row, border_style, shading) # Protection tools @mcp.tool() async def protect_document(filename: str, password: str): """Add password protection to a Word document.""" return await protection_tools.protect_document(filename, password) @mcp.tool() async def unprotect_document(filename: str, password: str): """Remove password protection from a Word document.""" return await protection_tools.unprotect_document(filename, password) # Footnote tools @mcp.tool() async def add_footnote_to_document(filename: str, paragraph_index: int, footnote_text: str): """Add a footnote to a specific paragraph in a Word document.""" return await footnote_tools.add_footnote_to_document(filename, paragraph_index, footnote_text) @mcp.tool() async def add_endnote_to_document(filename: str, paragraph_index: int, endnote_text: str): """Add an endnote to a specific paragraph in a Word document.""" return await footnote_tools.add_endnote_to_document(filename, paragraph_index, endnote_text) @mcp.tool() async def customize_footnote_style(filename: str, numbering_format: str = "1, 2, 3", start_number: int = 1, font_name: Optional[str] = None, font_size: Optional[int] = None): """Customize footnote numbering and formatting in a Word document.""" return await footnote_tools.customize_footnote_style( filename, numbering_format, start_number, font_name, font_size ) # Extended document tools @mcp.tool() async def get_paragraph_text_from_document(filename: str, paragraph_index: int): """Get text from a specific paragraph in a Word document.""" return await extended_document_tools.get_paragraph_text_from_document(filename, paragraph_index) @mcp.tool() async def find_text_in_document(filename: str, text_to_find: str, match_case: bool = True, whole_word: bool = False): """Find occurrences of specific text in a Word document.""" return await extended_document_tools.find_text_in_document( filename, text_to_find, match_case, whole_word ) @mcp.tool() async def convert_to_pdf(filename: str, output_filename: Optional[str] = None): """Convert a Word document to PDF format.""" return await extended_document_tools.convert_to_pdf(filename, output_filename) @mcp.tool() async def replace_paragraph_block_below_header(filename: str, header_text: str, new_paragraphs: List[str], detect_block_end_fn=None): """Reemplaza el bloque de párrafos debajo de un encabezado, evitando modificar TOC.""" return await replace_paragraph_block_below_header_tool(filename, header_text, new_paragraphs, detect_block_end_fn) @mcp.tool() async def replace_block_between_manual_anchors(filename: str, start_anchor_text: str, new_paragraphs: List[str], end_anchor_text: Optional[str] = None, new_paragraph_style: Optional[str] = None): """Replace all content between start_anchor_text and end_anchor_text (or next logical header if not provided).""" return await replace_block_between_manual_anchors_tool( filename=filename, start_anchor_text=start_anchor_text, new_paragraphs=new_paragraphs, end_anchor_text=end_anchor_text, new_paragraph_style=new_paragraph_style ) @mcp.tool() async def modify_table_cell(filename: str, table_index: int, row: int, column: int, content: str): """Modify or add content to a specific table cell, following the style of existing non-header cells.""" return await modify_table_cell_func(filename, table_index, row, column, content) # Session management tools for temp documents @mcp.tool() async def get_download_link(filename: str) -> dict: """Get the download link for a document (temp or permanent). Args: filename: Name of the document (e.g., "products.docx") Returns: Dictionary with download information or error message """ from word_document_server.utils.file_utils import ensure_docx_extension try: filename = ensure_docx_extension(filename) # Check if it's a temp file temp_file_info = get_temp_file_by_user_filename(filename) if temp_file_info: # Verify file still exists and is not expired if os.path.exists(temp_file_info["file_path"]): expires_at = datetime.fromisoformat(temp_file_info["expires_at"]) if datetime.now() <= expires_at: # Generate download URL base_url = get_public_base_url() download_url = f"{base_url}/files/{temp_file_info['file_id']}" return { "success": True, "filename": filename, "download_url": download_url, "file_id": temp_file_info["file_id"], "expires_at": temp_file_info["expires_at"], "download_count": temp_file_info["download_count"], "is_temp_file": True } else: return { "success": False, "filename": filename, "error": "File has expired", "is_temp_file": True } else: return { "success": False, "filename": filename, "error": "File no longer exists on disk", "is_temp_file": True } else: # Check if it's a regular file current_path = os.path.abspath(filename) if os.path.exists(current_path): return { "success": False, "filename": filename, "error": "File exists in current directory but has no download link (not created with create_document_with_download_link)", "is_temp_file": False } else: return { "success": False, "filename": filename, "error": "File not found in temp storage or current directory", "is_temp_file": None } except Exception as e: return { "success": False, "filename": filename, "error": f"Error retrieving download link: {str(e)}", "is_temp_file": None } @mcp.tool() async def list_my_documents() -> dict: """List all temporary documents available for download. Returns: Dictionary with list of documents and their information """ try: cleanup_expired_files() # Clean up first conn = sqlite3.connect(DB_FILE) cursor = conn.execute(""" SELECT file_id, original_filename, user_filename, created_at, expires_at, download_count FROM temp_files WHERE expires_at > ? ORDER BY created_at DESC """, (datetime.now().isoformat(),)) documents = [] base_url = get_public_base_url() for row in cursor.fetchall(): file_id, original_filename, user_filename, created_at, expires_at, download_count = row # Verify file still exists file_info = get_temp_file_info(file_id) if file_info and os.path.exists(file_info["file_path"]): documents.append({ "file_id": file_id, "filename": user_filename, "original_filename": original_filename, "download_url": f"{base_url}/files/{file_id}", "created_at": created_at, "expires_at": expires_at, "download_count": download_count }) conn.close() return { "success": True, "document_count": len(documents), "documents": documents } except Exception as e: return { "success": False, "error": f"Error listing documents: {str(e)}", "document_count": 0, "documents": [] } # ULTRA-EFFICIENT BATCH DOCUMENT CREATION TOOLS # These tools reduce 20+ calls to 1-3 calls for complex documents @mcp.tool() async def create_complete_document_with_sections( filename: str, title: str, sections: List[Dict[str, Any]], tables: Optional[List[Dict[str, Any]]] = None, metadata: Optional[Dict[str, str]] = None, cleanup_hours: int = 24 ) -> Dict[str, Any]: """Create a complete document with multiple sections, tables, and formatting in ONE call. ULTRA-EFFICIENT: Replaces 15-20 individual tool calls with 1 single call. Perfect for complex technical documents, reports, and multi-section content. Args: filename: Document filename title: Main document title (will be centered, level 0 heading) sections: List of sections with content and formatting tables: Optional list of tables to insert metadata: Optional document metadata (author, subject, keywords, comments) cleanup_hours: Hours until auto-cleanup (default 24) Section format: { "heading": "Section Title", "level": 1, # Heading level 1-6 "content": "Complete paragraph content. Can include multiple paragraphs separated by \\n\\n", "style": "Normal", # Optional paragraph style "table_after": 0, # Optional: insert table index after this section "page_break": false # Optional: add page break after section } Table format: { "rows": 5, "cols": 3, "data": [["Header1", "Header2", "Header3"], ["Row1Col1", "Row1Col2", "Row1Col3"]], "has_header": true, "title": "Table Title", "style": "Medium Grid 1 Accent 1", "alignment": "center" } Example usage for technical report: sections = [ { "heading": "1. INTRODUCCIÓN", "level": 1, "content": "Complete introduction with all context and background information...", }, { "heading": "2. ANÁLISIS TÉCNICO", "level": 1, "content": "Detailed technical analysis with findings and measurements...", "table_after": 0 } ] tables = [ { "rows": 4, "cols": 3, "data": [["Parameter", "Value", "Unit"], ["Height", "37.5", "m"]], "has_header": true, "title": "Technical Specifications" } ] """ return await batch_document_tools.create_complete_document_with_sections( filename, title, sections, tables, metadata, cleanup_hours ) @mcp.tool() async def create_complete_document_with_download_link_and_sections( filename: str, title: str, sections: List[Dict[str, Any]], tables: Optional[List[Dict[str, Any]]] = None, metadata: Optional[Dict[str, str]] = None, cleanup_hours: int = 24 ) -> Dict[str, Any]: """Create complete document with sections AND return download link in ONE call. ULTIMATE EFFICIENCY: Complete document creation + download link in 1 call. Reduces 20+ tool calls to 1 for complex documents. Perfect for n8n workflows - creates entire document and returns downloadable URL. Args: Same as create_complete_document_with_sections Returns: { "success": true, "message": "Complete document created successfully", "download_url": "https://domain.com/files/uuid", "file_id": "uuid", "sections_processed": 5, "tables_created": 2, "expires_at": "2024-12-01T12:00:00" } """ return await batch_document_tools.create_complete_document_with_download_link_and_sections( filename, title, sections, tables, metadata, cleanup_hours ) @mcp.tool() async def add_multiple_sections_batch( filename: str, sections: List[Dict[str, Any]] ) -> Dict[str, Any]: """Add multiple sections to existing document in ONE call. EFFICIENCY BOOSTER: Add many sections at once instead of one-by-one. Works with both regular files and temp documents (smart resolver). Args: filename: Existing document filename sections: List of sections to add (same format as create_complete_document_with_sections) Section format: { "heading": "New Section Title", "level": 1, "content": "Complete section content...", "style": "Normal", "page_break": false } """ return await batch_document_tools.add_multiple_sections_batch(filename, sections) @mcp.tool() async def create_technical_report_template( filename: str, report_data: Dict[str, Any], cleanup_hours: int = 24 ) -> Dict[str, Any]: """Create a complete technical report using predefined template in ONE call. SPECIALIZED TEMPLATE: Perfect for technical reports like dam inspections, engineering analyses, pathology reports, etc. Args: filename: Document filename report_data: All report data in structured format cleanup_hours: Hours until auto-cleanup Report data format: { "title": "INFORME TÉCNICO INTEGRAL - PRESA ROSARITO", "subtitle": "Análisis de Patologías de Hormigón e Informe Hidrológico", "metadata": {"author": "Engineer Name", "subject": "Technical Report"}, "introduction": { "content": "Introduction text with context...", "key_data": {"presa": "Rosarito", "rio": "Tiétar", "year": "2024"} }, "pathology_report": { "general_state": "BUENO EN GENERAL", "detected_pathologies": ["Proceso expansivo", "Agrietamiento", "Filtraciones"], "expansion_process": { "type": "Posible reacción álcali-árido", "velocity": "1,2 mm/campaña", "affected_zones": "Toda la coronación" }, "vertical_movements": { "headers": ["Fecha", "CCN5", "CCN6", "CCN7", "CCN8", "CCN9", "CCN10"], "data": [ ["22/10/2003", "0,4", "0,6", "-0,6", "-1,1", "-0,6", "-0,4"], ["02/11/2006", "3,2", "2,5", "2,9", "2,8", "2,7", "3,2"] ] } }, "hydrological_report": { "basin_characteristics": { "extension": "1.750 km²", "average_height": "494 m", "highest_point": "Pico de La Mira (2.343 m)" }, "annual_contributions": { "annual_average": "824-827 hm³", "maximum": "1.964 hm³", "minimum": "127 hm³" } }, "conclusions": "Complete conclusions with recommendations..." } Returns complete document with download link and detailed statistics. """ return await batch_document_tools.create_technical_report_template( filename, report_data, cleanup_hours ) # Markdown conversion tools @mcp.tool() async def convert_to_markdown(filename: str, output_filename: Optional[str] = None, use_pandoc: bool = True): """Convert a Word document to Markdown format. Args: filename: Path to the Word document to convert output_filename: Optional path for output markdown file. If not provided, returns markdown as string use_pandoc: Whether to use pypandoc (preferred) or fallback to custom implementation Returns: Success message if output_filename provided, or markdown content as string """ return await content_tools.convert_to_markdown(filename, output_filename, use_pandoc) @mcp.tool() async def get_markdown_preview(filename: str, max_length: int = 1000): """Get a preview of the document as markdown (first max_length characters). Args: filename: Path to the Word document max_length: Maximum length of preview Returns: Markdown preview of the document """ return await content_tools.get_markdown_preview(filename, max_length) def run_server(): """Run the Word Document MCP Server with configurable transport.""" # Get transport configuration config = get_transport_config() # Setup logging # setup_logging(config['debug']) # Register all tools register_tools() # Initialize temporary file storage init_temp_storage() print("Temporary file storage initialized") # Start background cleanup scheduler start_background_cleanup() # Print startup information transport_type = config['transport'] print(f"Starting Word Document MCP Server with {transport_type} transport...") # if config['debug']: # print(f"Configuration: {config}") try: if transport_type == 'stdio': # Run with stdio transport (default, backward compatible) print("Server running on stdio transport") mcp.run(transport='stdio') elif transport_type == 'streamable-http': # Run with streamable HTTP transport print(f"Server running on streamable-http transport at http://{config['host']}:{config['port']}{config['path']}") mcp.run( transport='streamable-http', host=config['host'], port=config['port'], path=config['path'] ) elif transport_type == 'sse': # Run with SSE transport print(f"Server running on SSE transport at http://{config['host']}:{config['port']}{config['sse_path']}") mcp.run( transport='sse', host=config['host'], port=config['port'], path=config['sse_path'] ) except KeyboardInterrupt: print("\nShutting down server...") except Exception as e: print(f"Error starting server: {e}") if config['debug']: import traceback traceback.print_exc() sys.exit(1) return mcp def main(): """Main entry point for the server.""" run_server() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/franlealp1/mcp-word'

If you have feedback or need assistance with the MCP directory API, please join our Discord server