Doc/docx-MCP

by MeterLong
Verified
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MCP Docx Processing Service Provides various operations for docx documents, including querying, adding, modifying, deleting, and font style settings Implemented using the official MCP library """ import os import tempfile import logging import traceback from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, Optional from mcp.server.fastmcp import FastMCP, Context from docx import Document from docx.shared import Pt, RGBColor, Inches, Cm from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING, WD_BREAK from docx.enum.style import WD_STYLE_TYPE from docx.oxml.ns import qn from docx.oxml import OxmlElement # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(os.path.join(tempfile.gettempdir(), "docx_mcp_server.log")), logging.StreamHandler() ] ) logger = logging.getLogger("DocxMCPServer") # Create a state file for restoring state when MCP service restarts CURRENT_DOC_FILE = os.path.join(tempfile.gettempdir(), "docx_mcp_current_doc.txt") class DocxProcessor: """Class for processing Docx documents, implementing various document operations""" def __init__(self): self.documents = {} # Store opened documents self.current_document = None self.current_file_path = None # Try to load current document from state file self._load_current_document() def _load_current_document(self): """Load current document from state file""" if not os.path.exists(CURRENT_DOC_FILE): return False try: with open(CURRENT_DOC_FILE, 'r', encoding='utf-8') as f: file_path = f.read().strip() if file_path and os.path.exists(file_path): try: self.current_file_path = file_path self.current_document = Document(file_path) self.documents[file_path] = self.current_document return True except Exception as e: logger.error(f"Failed to load document at {file_path}: {e}") # Delete invalid state file to prevent future loading attempts try: os.remove(CURRENT_DOC_FILE) logger.info(f"Removed invalid state file pointing to {file_path}") except Exception as e_remove: logger.error(f"Failed to remove state file: {e_remove}") else: # Delete invalid state file if path is empty or file doesn't exist try: os.remove(CURRENT_DOC_FILE) logger.info("Removed invalid state file with non-existent document path") except Exception as e_remove: logger.error(f"Failed to remove state file: {e_remove}") except Exception as e: logger.error(f"Failed to load current document: {e}") # Delete corrupted state file try: os.remove(CURRENT_DOC_FILE) logger.info("Removed corrupted state file") except Exception as e_remove: logger.error(f"Failed to remove state file: {e_remove}") return False def _save_current_document(self): """Save current document path to state file""" if not self.current_file_path: return False try: with open(CURRENT_DOC_FILE, 'w', encoding='utf-8') as f: f.write(self.current_file_path) return True except Exception as e: logger.error(f"Failed to save current document path: {e}") return False def save_state(self): """Save processor state""" # Save current document if self.current_document and self.current_file_path: try: self.current_document.save(self.current_file_path) self._save_current_document() except Exception as e: logger.error(f"Failed to save current document: {e}") def load_state(self): """Load processor state""" self._load_current_document() # ... Keep all original document processing methods ... # Create global processor instance processor = DocxProcessor() @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server lifecycle""" try: # Start server with clean state logger.info("DocxProcessor MCP server starting with clean state...") # Do not attempt to load any previous state yield {"processor": processor} finally: # Save state when server shuts down logger.info("DocxProcessor MCP server shutting down...") if processor.current_document and processor.current_file_path: processor.save_state() else: logger.info("No document open, not saving state") # Create MCP server mcp = FastMCP( name="DocxProcessor", instructions="Word document processing service, providing functions to create, edit, and query documents", lifespan=server_lifespan ) @mcp.tool() def create_document(ctx: Context, file_path: str) -> str: """ Create a new Word document Parameters: - file_path: Document save path """ try: processor.current_document = Document() processor.current_file_path = file_path processor.documents[file_path] = processor.current_document # Save document processor.current_document.save(file_path) return f"Document created successfully: {file_path}" except Exception as e: error_msg = f"Failed to create document: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def open_document(ctx: Context, file_path: str) -> str: """ Open an existing Word document Parameters: - file_path: Path to the document to open """ try: if not os.path.exists(file_path): return f"File does not exist: {file_path}" processor.current_document = Document(file_path) processor.current_file_path = file_path processor.documents[file_path] = processor.current_document return f"Document opened successfully: {file_path}" except Exception as e: error_msg = f"Failed to open document: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def save_document(ctx: Context) -> str: """ Save the currently open Word document to the original file (update the original file) """ try: if not processor.current_document: return "No document is open" if not processor.current_file_path: return "Current document has not been saved before, please use save_as_document to specify a save path" # Save to original file path processor.current_document.save(processor.current_file_path) return f"Document saved successfully to original file: {processor.current_file_path}" except Exception as e: error_msg = f"Failed to save document: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def add_paragraph( ctx: Context, text: str, bold: bool = False, italic: bool = False, underline: bool = False, font_size: Optional[int] = None, font_name: Optional[str] = None, color: Optional[str] = None, alignment: Optional[str] = None ) -> str: """ Add paragraph text to document Parameters: - text: Paragraph text content - bold: Whether to bold - italic: Whether to italicize - underline: Whether to underline - font_size: Font size (points) - font_name: Font name - color: Text color (format: #FF0000) - alignment: Alignment (left, center, right, justify) """ try: if not processor.current_document: return "No document is open" # Add paragraph paragraph = processor.current_document.add_paragraph(text) # Apply additional formatting if paragraph.runs: run = paragraph.runs[0] run.bold = bold run.italic = italic run.underline = underline # Set font size if font_size: run.font.size = Pt(font_size) # Set font name if font_name: run.font.name = font_name # Set East Asian font run._element.rPr.rFonts.set(qn('w:eastAsia'), font_name) # Set font color if color and color.startswith('#') and len(color) == 7: r = int(color[1:3], 16) g = int(color[3:5], 16) b = int(color[5:7], 16) run.font.color.rgb = RGBColor(r, g, b) # Set alignment if alignment: if alignment == "left": paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT elif alignment == "center": paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER elif alignment == "right": paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT elif alignment == "justify": paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY return "Paragraph added" except Exception as e: error_msg = f"Failed to add paragraph: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def add_heading(ctx: Context, text: str, level: int) -> str: """ Add heading to document Parameters: - text: Heading text - level: Heading level (1-9) """ try: if not processor.current_document: return "No document is open" processor.current_document.add_heading(text, level=level) return f"Added level {level} heading" except Exception as e: error_msg = f"Failed to add heading: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def add_table(ctx: Context, rows: int, cols: int, data: Optional[list] = None) -> str: """ Add table to document Parameters: - rows: Number of rows - cols: Number of columns - data: Table data, two-dimensional array """ try: if not processor.current_document: return "No document is open" table = processor.current_document.add_table(rows=rows, cols=cols, style="Table Grid") # Fill table data if data: for i, row_data in enumerate(data): if i < rows: row = table.rows[i] for j, cell_text in enumerate(row_data): if j < cols: row.cells[j].text = str(cell_text) return f"Added {rows}x{cols} table" except Exception as e: error_msg = f"Failed to add table: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def get_document_info(ctx: Context) -> str: """ Get document information, including paragraph count, table count, styles, etc. """ try: if not processor.current_document: return "No document is open" doc = processor.current_document # Get basic document information sections_count = len(doc.sections) paragraphs_count = len(doc.paragraphs) tables_count = len(doc.tables) # Get style list paragraph_styles = [] for style in doc.styles: if style.type == WD_STYLE_TYPE.PARAGRAPH: paragraph_styles.append(style.name) # Build information string info = f"Document path: {processor.current_file_path}\n" info += f"Section count: {sections_count}\n" info += f"Paragraph count: {paragraphs_count}\n" info += f"Table count: {tables_count}\n" info += f"Available paragraph styles: {', '.join(paragraph_styles[:10])}..." return info except Exception as e: error_msg = f"Failed to get document information: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def search_text(ctx: Context, keyword: str) -> str: """ Search for text in the document Parameters: - keyword: Keyword to search for """ try: if not processor.current_document: return "No document is open" doc = processor.current_document results = [] # Search in paragraphs for i, paragraph in enumerate(doc.paragraphs): if keyword in paragraph.text: results.append({ "type": "paragraph", "index": i, "text": paragraph.text }) # Search in tables for t_idx, table in enumerate(doc.tables): for r_idx, row in enumerate(table.rows): for c_idx, cell in enumerate(row.cells): if keyword in cell.text: results.append({ "type": "table cell", "table_index": t_idx, "row": r_idx, "column": c_idx, "text": cell.text }) if not results: return f"Keyword '{keyword}' not found" # Build response response = f"Found {len(results)} occurrences of '{keyword}':\n\n" for idx, result in enumerate(results): response += f"{idx+1}. {result['type']} " if result['type'] == "paragraph": response += f"index {result['index']}: {result['text'][:100]}" if len(result['text']) > 100: response += "..." response += "\n" else: response += f"in table {result['table_index']} at cell ({result['row']},{result['column']}): {result['text'][:100]}" if len(result['text']) > 100: response += "..." response += "\n" return response except Exception as e: error_msg = f"Failed to search text: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def search_and_replace(ctx: Context, keyword: str, replace_with: str, preview_only: bool = False) -> str: """ Search and replace text in the document, providing detailed replacement information and preview options Parameters: - keyword: Keyword to search for - replace_with: Text to replace with - preview_only: Whether to only preview without actually replacing, default is False """ try: if not processor.current_document: return "No document is open" doc = processor.current_document results = [] # Search in paragraphs for i, paragraph in enumerate(doc.paragraphs): if keyword in paragraph.text: # Save original text and replaced text original_text = paragraph.text replaced_text = original_text.replace(keyword, replace_with) results.append({ "type": "paragraph", "index": i, "original": original_text, "replaced": replaced_text, "count": original_text.count(keyword) }) # If not in preview mode, perform replacement if not preview_only: paragraph.text = replaced_text # Search in tables for t_idx, table in enumerate(doc.tables): for r_idx, row in enumerate(table.rows): for c_idx, cell in enumerate(row.cells): if keyword in cell.text: # Save original text and replaced text original_text = cell.text replaced_text = original_text.replace(keyword, replace_with) results.append({ "type": "table cell", "table_index": t_idx, "row": r_idx, "column": c_idx, "original": original_text, "replaced": replaced_text, "count": original_text.count(keyword) }) # If not in preview mode, perform replacement if not preview_only: # Replace all paragraphs in the cell with the replaced text for para in cell.paragraphs: if keyword in para.text: para.text = para.text.replace(keyword, replace_with) if not results: return f"Keyword '{keyword}' not found" # Calculate total replacements total_replacements = sum(item["count"] for item in results) # Build response action_word = "Preview" if preview_only else "Replace" response = f"{action_word} '{keyword}' with '{replace_with}', found {len(results)} locations, {total_replacements} occurrences:\n\n" for idx, result in enumerate(results): response += f"{idx+1}. In {result['type']} " if result['type'] == "paragraph": response += f"index {result['index']} {action_word.lower()}ing {result['count']} times:\n" else: response += f"table {result['table_index']} at cell ({result['row']},{result['column']}) {action_word.lower()}ing {result['count']} times:\n" # Display original and replaced text snippets (context) max_display = 50 if len(result['original']) > max_display * 2: # Find keyword position and display surrounding text start_pos = result['original'].find(keyword) start_pos = max(0, start_pos - max_display) excerpt_original = "..." + result['original'][start_pos:start_pos + max_display * 2] + "..." excerpt_replaced = "..." + result['replaced'][start_pos:start_pos + max_display * 2] + "..." else: excerpt_original = result['original'] excerpt_replaced = result['replaced'] response += f" Original: {excerpt_original}\n" response += f" Replaced: {excerpt_replaced}\n\n" if preview_only: response += "This is a preview of replacements. No actual changes were made. To execute replacements, set preview_only to False." else: response += "Replacements completed successfully." return response except Exception as e: error_msg = f"Search and replace failed: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def find_and_replace(ctx: Context, find_text: str, replace_text: str) -> str: """ Find and replace text in the document Parameters: - find_text: Text to find - replace_text: Text to replace with """ try: if not processor.current_document: return "No document is open" doc = processor.current_document replace_count = 0 # Find and replace in paragraphs for paragraph in doc.paragraphs: if find_text in paragraph.text: paragraph.text = paragraph.text.replace(find_text, replace_text) replace_count += paragraph.text.count(replace_text) # Find and replace in tables for table in doc.tables: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: if find_text in paragraph.text: paragraph.text = paragraph.text.replace(find_text, replace_text) replace_count += paragraph.text.count(replace_text) return f"Replaced '{find_text}' with '{replace_text}', {replace_count} occurrences" except Exception as e: error_msg = f"Find and replace failed: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def merge_table_cells( ctx: Context, table_index: int, start_row: int, start_col: int, end_row: int, end_col: int ) -> str: """ Merge table cells Parameters: - table_index: Table index - start_row: Start row index - start_col: Start column index - end_row: End row index - end_col: End column index """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if not doc.tables: return "No tables in document" if table_index < 0 or table_index >= len(doc.tables): return f"Table index out of range: {table_index}, document has {len(doc.tables)} tables" table = doc.tables[table_index] # Check if row and column indices are valid if start_row < 0 or start_row >= len(table.rows): return f"Start row index out of range: {start_row}, table has {len(table.rows)} rows" if start_col < 0 or start_col >= len(table.columns): return f"Start column index out of range: {start_col}, table has {len(table.columns)} columns" if end_row < start_row or end_row >= len(table.rows): return f"End row index invalid: {end_row}, should be between {start_row} and {len(table.rows)-1}" if end_col < start_col or end_col >= len(table.columns): return f"End column index invalid: {end_col}, should be between {start_col} and {len(table.columns)-1}" # Get start and end cells start_cell = table.cell(start_row, start_col) end_cell = table.cell(end_row, end_col) # Merge cells start_cell.merge(end_cell) return f"Merged cells in table {table_index} from ({start_row},{start_col}) to ({end_row},{end_col})" except Exception as e: error_msg = f"Failed to merge table cells: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def split_table(ctx: Context, table_index: int, row_index: int) -> str: """ Split table into two tables at specified row Parameters: - table_index: Table index - row_index: Split table after this row """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if not doc.tables: return "No tables in document" if table_index < 0 or table_index >= len(doc.tables): return f"Table index out of range: {table_index}, document has {len(doc.tables)} tables" table = doc.tables[table_index] if row_index < 0 or row_index >= len(table.rows) - 1: return f"Row index invalid: {row_index}, should be between 0 and {len(table.rows)-2}" # Use XML operations to split table # Get table element tbl = table._tbl # Calculate split position split_position = row_index + 1 # Create new table element new_tbl = OxmlElement('w:tbl') # Copy table properties for child in tbl.xpath('./w:tblPr')[0].getchildren(): new_tbl.append(child.copy()) # Copy table grid settings for child in tbl.xpath('./w:tblGrid')[0].getchildren(): new_tbl.append(child.copy()) # Move rows to new table rows = tbl.xpath('./w:tr') for i in range(split_position, len(rows)): new_tbl.append(rows[i]) # Insert new table after original table tbl.addnext(new_tbl) return f"Split table {table_index} after row {row_index}" except Exception as e: error_msg = f"Failed to split table: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def add_table_row(ctx: Context, table_index: int, data: Optional[list] = None) -> str: """ Add a row to table Parameters: - table_index: Table index - data: Row data in list format """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if not doc.tables: return "No tables in document" if table_index < 0 or table_index >= len(doc.tables): return f"Table index out of range: {table_index}, document has {len(doc.tables)} tables" table = doc.tables[table_index] # Add new row new_row = table.add_row() # Fill row data if data: for i, cell_text in enumerate(data): if i < len(new_row.cells): new_row.cells[i].text = str(cell_text) return f"Added new row to table {table_index}" except Exception as e: error_msg = f"Failed to add table row: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def delete_table_row(ctx: Context, table_index: int, row_index: int) -> str: """ Delete a row from table Parameters: - table_index: Table index - row_index: Row index to delete """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if not doc.tables: return "No tables in document" if table_index < 0 or table_index >= len(doc.tables): return f"Table index out of range: {table_index}, document has {len(doc.tables)} tables" table = doc.tables[table_index] if row_index < 0 or row_index >= len(table.rows): return f"Row index out of range: {row_index}, table has {len(table.rows)} rows" # Use XML operations to delete row row = table.rows[row_index]._tr row.getparent().remove(row) return f"Deleted row {row_index} from table {table_index}" except Exception as e: error_msg = f"Failed to delete table row: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def edit_table_cell(ctx: Context, table_index: int, row_index: int, col_index: int, text: str) -> str: """ Edit table cell content Parameters: - table_index: Table index - row_index: Row index - col_index: Column index - text: Cell text """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if not doc.tables: return "No tables in document" if table_index < 0 or table_index >= len(doc.tables): return f"Table index out of range: {table_index}, document has {len(doc.tables)} tables" table = doc.tables[table_index] if row_index < 0 or row_index >= len(table.rows): return f"Row index out of range: {row_index}, table has {len(table.rows)} rows" if col_index < 0 or col_index >= len(table.columns): return f"Column index out of range: {col_index}, table has {len(table.columns)} columns" # Modify cell content table.cell(row_index, col_index).text = text return f"Cell ({row_index}, {col_index}) in table {table_index} has been modified" except Exception as e: error_msg = f"Failed to edit table cell: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def add_page_break(ctx: Context) -> str: """ Add page break """ try: if not processor.current_document: return "No document is open" processor.current_document.add_page_break() return "Page break added" except Exception as e: error_msg = f"Failed to add page break: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def set_page_margins( ctx: Context, top: Optional[float] = None, bottom: Optional[float] = None, left: Optional[float] = None, right: Optional[float] = None ) -> str: """ Set page margins Parameters: - top: Top margin (cm) - bottom: Bottom margin (cm) - left: Left margin (cm) - right: Right margin (cm) """ try: if not processor.current_document: return "No document is open" doc = processor.current_document # Get current section (default to use first section) section = doc.sections[0] # Set page margins if top is not None: section.top_margin = Cm(top) if bottom is not None: section.bottom_margin = Cm(bottom) if left is not None: section.left_margin = Cm(left) if right is not None: section.right_margin = Cm(right) return "Page margins set" except Exception as e: error_msg = f"Failed to set page margins: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def delete_paragraph(ctx: Context, paragraph_index: int) -> str: """ Delete specified paragraph from document Parameters: - paragraph_index: Paragraph index to delete """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs): return f"Paragraph index out of range: {paragraph_index}, document has {len(doc.paragraphs)} paragraphs" # python-docx does not provide a direct method to delete a paragraph, use XML operations paragraph = doc.paragraphs[paragraph_index] p = paragraph._element p.getparent().remove(p) # Delete paragraph object reference for garbage collection paragraph._p = None paragraph._element = None return f"Paragraph {paragraph_index} deleted" except Exception as e: error_msg = f"Failed to delete paragraph: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def delete_text(ctx: Context, paragraph_index: int, start_pos: int, end_pos: int) -> str: """ Delete specified text from paragraph Parameters: - paragraph_index: Paragraph index - start_pos: Start position (0-based index) - end_pos: End position (not included in the text) """ try: if not processor.current_document: return "No document is open" doc = processor.current_document if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs): return f"Paragraph index out of range: {paragraph_index}, document has {len(doc.paragraphs)} paragraphs" paragraph = doc.paragraphs[paragraph_index] text = paragraph.text if start_pos < 0 or start_pos >= len(text): return f"Start position out of range: {start_pos}, paragraph length is {len(text)}" if end_pos <= start_pos or end_pos > len(text): return f"End position invalid: {end_pos}, should be between {start_pos+1} and {len(text)}" # Build new text (delete specified text) new_text = text[:start_pos] + text[end_pos:] paragraph.text = new_text return f"Deleted text from position {start_pos} to {end_pos} in paragraph {paragraph_index}" except Exception as e: error_msg = f"Failed to delete text: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def save_as_document(ctx: Context, new_file_path: str) -> str: """ Save current document as a new file Parameters: - new_file_path: Path to save the new file """ try: if not processor.current_document: return "No document is open" # Save as new file processor.current_document.save(new_file_path) # Update current file path processor.current_file_path = new_file_path processor.documents[new_file_path] = processor.current_document return f"Document saved as: {new_file_path}" except Exception as e: error_msg = f"Failed to save document: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def create_document_copy(ctx: Context, suffix: str = "-副本") -> str: """ Create a copy of the current document in the directory of the original file Parameters: - suffix: Suffix to add to the original file name, default is "-副本" """ try: if not processor.current_document: return "No document is open" if not processor.current_file_path: return "Current document has not been saved, cannot create a copy" # Parse original file path file_dir = os.path.dirname(processor.current_file_path) file_name = os.path.basename(processor.current_file_path) file_name_without_ext, file_ext = os.path.splitext(file_name) # Create new file name new_file_name = f"{file_name_without_ext}{suffix}{file_ext}" new_file_path = os.path.join(file_dir, new_file_name) # Save as new file processor.current_document.save(new_file_path) return f"Document copy created: {new_file_path}" except Exception as e: error_msg = f"Failed to create document copy: {str(e)}" logger.error(error_msg) return error_msg @mcp.tool() def replace_section(ctx: Context, section_title: str, new_content: list, preserve_title: bool = True) -> str: """ Find specified title in document and replace content under that title, keeping original position, format, and style Parameters: - section_title: Title text to find - new_content: New content list, each element is a paragraph - preserve_title: Whether to keep original title, default is True """ try: if not processor.current_document: return "No document is open" doc = processor.current_document # Find title position title_index = -1 for i, paragraph in enumerate(doc.paragraphs): if section_title in paragraph.text: title_index = i break if title_index == -1: return f"Title not found: '{section_title}'" # Determine end position of that section (next same or higher level title) end_index = len(doc.paragraphs) title_style = doc.paragraphs[title_index].style for i in range(title_index + 1, len(doc.paragraphs)): # If next same level or higher level title found, set as end position if doc.paragraphs[i].style.name.startswith('Heading') and \ (doc.paragraphs[i].style.name <= title_style.name or doc.paragraphs[i].style == title_style): end_index = i break # Save original paragraph style and format information original_styles = [] for i in range(start_delete := (title_index + (1 if preserve_title else 0)), min(end_index, start_delete + len(new_content))): if i < len(doc.paragraphs): para = doc.paragraphs[i] style_info = { 'style': para.style, 'alignment': para.alignment, 'runs': [] } # Save each run format for run in para.runs: run_info = { 'bold': run.bold, 'italic': run.italic, 'underline': run.underline, 'font_size': run.font.size, 'font_name': run.font.name, 'color': run.font.color.rgb if run.font.color.rgb else None } style_info['runs'].append(run_info) original_styles.append(style_info) else: # If original paragraph count is insufficient, use last paragraph style if original_styles: original_styles.append(original_styles[-1]) else: # If no original style, use default style original_styles.append({ 'style': None, 'alignment': None, 'runs': [] }) # If original style count is insufficient, use last style to fill while len(original_styles) < len(new_content): if original_styles: original_styles.append(original_styles[-1]) else: original_styles.append({ 'style': None, 'alignment': None, 'runs': [] }) # Record insert position insert_position = start_delete # Delete from end to avoid index change for i in range(end_index - 1, start_delete - 1, -1): p = doc.paragraphs[i]._element p.getparent().remove(p) # Add new content, apply original format for i, content in enumerate(reversed(new_content)): # Create new paragraph p = doc.add_paragraph() # Apply original paragraph style style_info = original_styles[len(new_content) - i - 1] if style_info['style']: p.style = style_info['style'] if style_info['alignment'] is not None: p.alignment = style_info['alignment'] # Add text and apply format if style_info['runs'] and len(style_info['runs']) > 0: # If multiple runs, try to keep format # Simplified processing: Add entire content to a run, apply format from first run run = p.add_run(content) run_info = style_info['runs'][0] run.bold = run_info['bold'] run.italic = run_info['italic'] run.underline = run_info['underline'] if run_info['font_size']: run.font.size = run_info['font_size'] if run_info['font_name']: run.font.name = run_info['font_name'] # Set Chinese font run._element.rPr.rFonts.set(qn('w:eastAsia'), run_info['font_name']) if run_info['color']: run.font.color.rgb = run_info['color'] else: # If no run information, add text directly p.text = content # Move new paragraph to correct position doc._body._body.insert(insert_position, p._p) # Delete original added paragraph (at end of document) doc._body._body.remove(doc.paragraphs[-1]._p) return f"Replaced content under title '{section_title}', keeping original format and style" except Exception as e: error_msg = f"Failed to replace content: {str(e)}" logger.error(error_msg) traceback.print_exc() # Print detailed error information return error_msg @mcp.tool() def edit_section_by_keyword(ctx: Context, keyword: str, new_content: list, section_range: int = 3) -> str: """ Find paragraphs containing specified keyword and replace them and their surrounding content, keeping original position, format, and style Parameters: - keyword: Keyword to find - new_content: New content list, each element is a paragraph - section_range: Surrounding paragraph range to replace, default is 3 """ try: if not processor.current_document: return "No document is open" doc = processor.current_document # Find keyword position keyword_indices = [] for i, paragraph in enumerate(doc.paragraphs): if keyword in paragraph.text: keyword_indices.append(i) if not keyword_indices: return f"Keyword not found: '{keyword}'" # Use first match keyword_index = keyword_indices[0] # Determine paragraph range to replace start_index = max(0, keyword_index - section_range) end_index = min(len(doc.paragraphs), keyword_index + section_range + 1) # Save original paragraph style and format information original_styles = [] for i in range(start_index, min(end_index, start_index + len(new_content))): if i < len(doc.paragraphs): para = doc.paragraphs[i] style_info = { 'style': para.style, 'alignment': para.alignment, 'runs': [] } # Save each run format for run in para.runs: run_info = { 'bold': run.bold, 'italic': run.italic, 'underline': run.underline, 'font_size': run.font.size, 'font_name': run.font.name, 'color': run.font.color.rgb if run.font.color.rgb else None } style_info['runs'].append(run_info) original_styles.append(style_info) else: # If original paragraph count is insufficient, use last paragraph style if original_styles: original_styles.append(original_styles[-1]) else: # If no original style, use default style original_styles.append({ 'style': None, 'alignment': None, 'runs': [] }) # If original style count is insufficient, use last style to fill while len(original_styles) < len(new_content): if original_styles: original_styles.append(original_styles[-1]) else: original_styles.append({ 'style': None, 'alignment': None, 'runs': [] }) # Record insert position insert_position = start_index # Delete from end to avoid index change for i in range(end_index - 1, start_index - 1, -1): p = doc.paragraphs[i]._element p.getparent().remove(p) # Add new content, apply original format for i, content in enumerate(reversed(new_content)): # Create new paragraph p = doc.add_paragraph() # Apply original paragraph style style_info = original_styles[len(new_content) - i - 1] if style_info['style']: p.style = style_info['style'] if style_info['alignment'] is not None: p.alignment = style_info['alignment'] # Add text and apply format if style_info['runs'] and len(style_info['runs']) > 0: # If multiple runs, try to keep format # Simplified processing: Add entire content to a run, apply format from first run run = p.add_run(content) run_info = style_info['runs'][0] run.bold = run_info['bold'] run.italic = run_info['italic'] run.underline = run_info['underline'] if run_info['font_size']: run.font.size = run_info['font_size'] if run_info['font_name']: run.font.name = run_info['font_name'] # Set Chinese font run._element.rPr.rFonts.set(qn('w:eastAsia'), run_info['font_name']) if run_info['color']: run.font.color.rgb = run_info['color'] else: # If no run information, add text directly p.text = content # Move new paragraph to correct position doc._body._body.insert(insert_position, p._p) # Delete original added paragraph (at end of document) doc._body._body.remove(doc.paragraphs[-1]._p) return f"Replaced paragraphs containing keyword '{keyword}' and their surrounding content, keeping original format and style" except Exception as e: error_msg = f"Failed to replace content: {str(e)}" logger.error(error_msg) traceback.print_exc() # Print detailed error information return error_msg # Add more tools... if __name__ == "__main__": # Always start with a clean state, don't try to load any previous document if os.path.exists(CURRENT_DOC_FILE): try: os.remove(CURRENT_DOC_FILE) logger.info("Removed existing state file for clean startup") except Exception as e: logger.error(f"Failed to remove existing state file: {e}") # Run MCP server mcp.run()