sanitize_docx
Sanitize DOCX files by removing dangerous metadata, track changes, and author info. Produces an audit report of removed content. Use before sharing documents externally.
Instructions
Sanitizes a DOCX file by stripping dangerous metadata (rsids, author names, template paths, DMS metadata, hidden text, orphaned content) and producing an audit report of everything removed. Use this before sending documents to external parties. Supports three modes: full scrub (for signing/closing), keep-markup (preserves your track changes and open comments), or baseline (recomputes your delta against the original document).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes | Absolute path to the DOCX file to sanitize. | |
| output_path | No | Output path for the sanitized file. Defaults to <stem>_sanitized.docx. | |
| keep_markup | No | Keep existing track changes and open comments. Strips resolved comments and all metadata. Use this when sending a redline to counterparty. | |
| baseline_path | No | Path to the original/baseline document. When provided, the tool recomputes your changes as a clean delta against this baseline. Use when Track Changes was off, or to collapse multiple rounds of markup into a single clean redline. | |
| author | No | Replace all author names on track changes and comments with this value. Used with keep_markup or baseline_path. | |
| accept_all | No | Accept all unresolved track changes (full sanitize mode only). Required if the document contains unresolved changes. The report will list every change that was auto-accepted. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- MCP tool handler for sanitize_docx. This is the async entry point decorated with @tool. It validates inputs, wraps the heavy synchronous sanitize operation in asyncio.to_thread to avoid blocking the event loop, and returns structured results including report_text.
async def sanitize_docx( file_path: Annotated[str, "Absolute path to the DOCX file to sanitize."], ctx: Context, output_path: Annotated[ Optional[str], "Output path for the sanitized file. Defaults to <stem>_sanitized.docx.", ] = None, keep_markup: Annotated[ bool, "Keep existing track changes and open comments. Strips resolved comments and all metadata. " "Use this when sending a redline to counterparty.", ] = False, baseline_path: Annotated[ Optional[str], "Path to the original/baseline document. When provided, the tool recomputes your changes " "as a clean delta against this baseline. Use when Track Changes was off, or to collapse " "multiple rounds of markup into a single clean redline.", ] = None, author: Annotated[ Optional[str], "Replace all author names on track changes and comments with this value. " "Used with keep_markup or baseline_path.", ] = None, accept_all: Annotated[ bool, "Accept all unresolved track changes (full sanitize mode only). " "Required if the document contains unresolved changes. " "The report will list every change that was auto-accepted.", ] = False, ) -> dict: start_time = time.perf_counter() from adeu.sanitize.core import SanitizeError from adeu.sanitize.core import sanitize_docx as _sanitize await ctx.info( f"Sanitizing document: {Path(file_path).name}", extra={ "file_path": file_path, "keep_markup": keep_markup, "baseline_path": baseline_path, "author": author, "accept_all": accept_all, }, ) # Verify input file exists p = Path(file_path) if not p.exists(): raise ToolError(f"File not found: {file_path}") if baseline_path and not Path(baseline_path).exists(): raise ToolError(f"Baseline file not found: {baseline_path}") try: # Wrap the heavy synchronous sanitize operation in a thread to avoid blocking the MCP event loop result = await asyncio.to_thread( _sanitize, file_path, output_path=output_path, keep_markup=keep_markup, baseline_path=baseline_path, author=author, accept_all=accept_all, ) await ctx.info( "Sanitization complete", extra={ "output_path": result.output_path, "status": result.status, "tracked_changes_found": result.tracked_changes_found, "comments_removed": result.comments_removed, }, ) res = { "output_path": result.output_path, "status": result.status, "tracked_changes_found": result.tracked_changes_found, "tracked_changes_accepted": result.tracked_changes_accepted, "comments_removed": result.comments_removed, "comments_kept": result.comments_kept, "metadata_stripped": result.metadata_stripped, "warnings": result.warnings, "report_text": result.report_text, } return add_timing_if_debug(start_time, res) except SanitizeError as e: await ctx.warning( "Sanitization blocked", extra={"reason": str(e)}, ) raise ToolError(str(e)) from e except Exception as e: await ctx.error( "Sanitization failed", extra={"error": str(e)}, ) raise ToolError(f"Error sanitizing document: {str(e)}") from e - src/adeu/sanitize/core.py:55-162 (handler)Core sanitize_docx function (the actual implementation). Determines mode (full/keep-markup/baseline), delegates to private helpers (_sanitize_full, _sanitize_keep_markup, _sanitize_baseline), applies common transforms (rsid, hidden text, metadata scrubbing, etc.), and returns a SanitizeResult.
def sanitize_docx( input_path: str, output_path: Optional[str] = None, *, keep_markup: bool = False, baseline_path: Optional[str] = None, author: Optional[str] = None, accept_all: bool = False, ) -> SanitizeResult: """ Sanitize a DOCX file. Args: input_path: Path to the input DOCX file. output_path: Path for the output file. Defaults to <stem>_sanitized.docx. keep_markup: Keep existing track changes and open comments. baseline_path: Path to baseline document for delta recomputation. author: Replace all author names with this value. accept_all: Accept unresolved track changes (full sanitize only). Returns: SanitizeResult with status, counts, and report text. Raises: SanitizeError: If blocked (unresolved changes without --accept-all). FileNotFoundError: If input or baseline file not found. """ input_p = Path(input_path) if not input_p.exists(): raise FileNotFoundError(f"Input file not found: {input_path}") if baseline_path and not Path(baseline_path).exists(): raise FileNotFoundError(f"Baseline file not found: {baseline_path}") # Determine mode if baseline_path: mode = SanitizeMode.BASELINE elif keep_markup: mode = SanitizeMode.KEEP_MARKUP else: mode = SanitizeMode.FULL # Default output path if not output_path: output_path = str(input_p.parent / f"{input_p.stem}_sanitized{input_p.suffix}") report = SanitizeReport( filename=input_p.name, mode=mode.value, author=author, ) # Load document with open(input_p, "rb") as f: doc = Document(BytesIO(f.read())) # --- Mode-specific logic --- if mode == SanitizeMode.FULL: _sanitize_full(doc, report, accept_all=accept_all) elif mode == SanitizeMode.KEEP_MARKUP: _sanitize_keep_markup(doc, report, author=author) elif mode == SanitizeMode.BASELINE: assert baseline_path is not None _sanitize_baseline(doc, input_path, baseline_path, report, author=author) if report.status == "blocked": report_text = report.render() raise SanitizeError(report_text) # --- Common transforms (applied in all modes) --- _apply_common_transforms(doc, report) # --- Author replacement and date normalization --- if mode in (SanitizeMode.KEEP_MARKUP, SanitizeMode.BASELINE): if author: report.add_transform_lines(transforms.replace_comment_authors(doc, author)) report.add_transform_lines(transforms.replace_change_authors(doc, author)) # Always normalize change dates on outbound docs — prevents counterparty # from inferring when edits were made report.add_transform_lines(transforms.normalize_change_dates(doc)) # --- Save --- output = BytesIO() doc.save(output) output.seek(0) with open(output_path, "wb") as f: f.write(output.getvalue()) # Finalize report if report.warnings: report.status = "clean_with_warnings" report_text = report.render() report.status = report.status # ensure set logger.info("Sanitization complete", output_path=output_path, status=report.status) return SanitizeResult( output_path=output_path, status=report.status, tracked_changes_found=report.tracked_changes_found, tracked_changes_accepted=report.tracked_changes_accepted, comments_removed=report.comments_removed, comments_kept=report.comments_kept, metadata_stripped=[line for line in report.metadata_lines], warnings=report.warnings, report_text=report_text, ) - src/adeu/sanitize/core.py:28-52 (schema)SanitizeMode enum and SanitizeResult/SanitizeError data classes defining the return schema and error types for the sanitize_docx operation.
class SanitizeMode(enum.Enum): FULL = "full" KEEP_MARKUP = "keep-markup" BASELINE = "baseline" @dataclass class SanitizeResult: """Structured result returned by sanitize_docx.""" output_path: str status: str # "clean", "clean_with_warnings", "blocked" tracked_changes_found: int = 0 tracked_changes_accepted: int = 0 comments_removed: int = 0 comments_kept: int = 0 metadata_stripped: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) report_text: str = "" class SanitizeError(Exception): """Raised when sanitization is blocked (e.g., unresolved track changes).""" pass - src/adeu/mcp_components/tools/sanitize.py:13-22 (registration)Registration of sanitize_docx as an MCP tool via the @tool decorator with description, annotations (destructiveHint), and parameter schema defined via type annotations.
@tool( description=( "Sanitizes a DOCX file by stripping dangerous metadata (rsids, author names, " "template paths, DMS metadata, hidden text, orphaned content) and producing " "an audit report of everything removed. Use this before sending documents to " "external parties. Supports three modes: full scrub (for signing/closing), " "keep-markup (preserves your track changes and open comments), or baseline " "(recomputes your delta against the original document)." ), annotations={"destructiveHint": True}, - All document transform helper functions used by sanitize — stripping rsid, para IDs, proof errors, hidden text, track changes, comments, metadata, custom XML, image alt text, and author/date normalization.
""" DOCX sanitization transforms. Each transform mutates a python-docx Document (or its underlying lxml tree) in place and returns a list of human-readable report lines describing what was changed. """ import datetime from typing import Optional import structlog from docx.document import Document as DocumentObject from docx.oxml.ns import qn from docx.text.paragraph import Paragraph from lxml import etree from adeu.redline.comments import CommentsManager from adeu.utils.docx import ( get_paragraph_prefix, iter_block_items, iter_document_parts, normalize_docx, ) logger = structlog.get_logger(__name__) # Namespaces used across transforms W14_NS = "http://schemas.microsoft.com/office/word/2010/wordml" W15_NS = "http://schemas.microsoft.com/office/word/2012/wordml" W16CID_NS = "http://schemas.microsoft.com/office/word/2016/wordml/cid" DC_NS = "http://purl.org/dc/elements/1.1/" CP_NS = "http://schemas.openxmlformats.org/package/2006/metadata/core-properties" DCTERMS_NS = "http://purl.org/dc/terms/" EXTENDED_NS = "http://schemas.openxmlformats.org/officeDocument/2006/extended-properties" # rsid attribute names (all variations) RSID_ATTRS = [ qn("w:rsidR"), qn("w:rsidRPr"), qn("w:rsidRDefault"), qn("w:rsidP"), qn("w:rsidDel"), qn("w:rsidSect"), qn("w:rsidTr"), ] # w14 attributes to strip W14_ATTRS = [ f"{{{W14_NS}}}paraId", f"{{{W14_NS}}}textId", ] def strip_rsid(doc: DocumentObject) -> list[str]: """Remove all rsid* attributes from every element in the document.""" count = 0 for el in doc.element.iter(): for attr in RSID_ATTRS: if attr in el.attrib: del el.attrib[attr] count += 1 # Also strip the rsids element itself (lists all session IDs) for rsids_el in doc.element.findall(f".//{qn('w:rsids')}"): rsids_el.getparent().remove(rsids_el) count += 1 if count: return [f"rsid attributes: {count} removed"] return [] def strip_para_ids(doc: DocumentObject) -> list[str]: """Remove w14:paraId and w14:textId from all elements.""" count = 0 for el in doc.element.iter(): for attr in W14_ATTRS: if attr in el.attrib: del el.attrib[attr] count += 1 if count: return [f"Paragraph/text IDs: {count} removed"] return [] def strip_proof_errors(doc: DocumentObject) -> list[str]: """Remove w:proofErr spellcheck markers.""" elements = doc.element.xpath("//w:proofErr") count = len(elements) for el in elements: el.getparent().remove(el) if count: return [f"Spell check markers: {count} removed"] return [] def strip_empty_properties(doc: DocumentObject) -> list[str]: """Remove empty w:rPr and w:pPr elements (no children).""" count = 0 for tag in [qn("w:rPr"), qn("w:pPr")]: for el in doc.element.iter(tag): if len(el) == 0 and el.text is None: parent = el.getparent() if parent is not None: parent.remove(el) count += 1 if count: return [f"Empty property elements: {count} removed"] return [] def strip_hidden_text(doc: DocumentObject) -> list[str]: """Remove runs that have w:vanish or w:webHidden in their rPr.""" count = 0 for rpr in doc.element.iter(qn("w:rPr")): vanish = rpr.find(qn("w:vanish")) web_hidden = rpr.find(qn("w:webHidden")) if vanish is not None or web_hidden is not None: run = rpr.getparent() if run is not None and run.tag == qn("w:r"): run_parent = run.getparent() if run_parent is not None: run_parent.remove(run) count += 1 if count: return [f"Hidden text runs: {count} removed"] return [] def coalesce_runs(doc: DocumentObject) -> list[str]: """Coalesce adjacent runs with identical formatting (delegates to existing normalize_docx).""" # normalize_docx already handles proofErr + run coalescing, # but since we call strip_proof_errors separately, we only need coalescing. # The existing function is safe — it respects w:ins/w:del boundaries. normalize_docx(doc) return [] # --------------------------------------------------------------------------- # Track change transforms # --------------------------------------------------------------------------- def count_tracked_changes(doc: DocumentObject) -> tuple[int, int, int]: """Returns (insertion_count, deletion_count, format_count) of unresolved track changes.""" ins_count = len(doc.element.findall(f".//{qn('w:ins')}")) del_count = len(doc.element.findall(f".//{qn('w:del')}")) fmt_count = ( len(doc.element.findall(f".//{qn('w:rPrChange')}")) + len(doc.element.findall(f".//{qn('w:pPrChange')}")) + len(doc.element.findall(f".//{qn('w:sectPrChange')}")) ) return ins_count, del_count, fmt_count def get_track_change_authors(doc: DocumentObject) -> set[str]: """Return a set of all unique authors found in unresolved track changes.""" authors = set() for tag in [qn("w:ins"), qn("w:del"), qn("w:rPrChange"), qn("w:pPrChange"), qn("w:sectPrChange")]: for el in doc.element.iter(tag): author = el.get(qn("w:author")) if author: authors.add(author) return authors def accept_all_tracked_changes(doc: DocumentObject) -> list[str]: """ Accept all tracked changes: unwrap w:ins (keep content), remove w:del (discard content). Also strips w:rPrChange elements. Returns report lines listing each change that was accepted. """ lines = [] ins_els = doc.element.findall(f".//{qn('w:ins')}") del_els = doc.element.findall(f".//{qn('w:del')}") fmt_els = ( doc.element.findall(f".//{qn('w:rPrChange')}") + doc.element.findall(f".//{qn('w:pPrChange')}") + doc.element.findall(f".//{qn('w:sectPrChange')}") ) # Collect textual info before mutating (we skip empty strings for the detailed list to reduce noise) for ins in ins_els: text = _get_element_text(ins) if text.strip(): lines.append(f' Accepted insertion: "{_truncate(text, 60)}"') for d in del_els: text = _get_element_text(d) if text.strip(): lines.append(f' Accepted deletion of: "{_truncate(text, 60)}"') # Now mutate: unwrap insertions for ins in ins_els: parent = ins.getparent() if parent is None: continue index = parent.index(ins) for child in list(ins): parent.insert(index, child) index += 1 parent.remove(ins) # Remove deletions for d in del_els: if d.getparent() is not None: d.getparent().remove(d) # Remove rPrChange (format change tracking) for rpc in doc.element.findall(f".//{qn('w:rPrChange')}"): if rpc.getparent() is not None: rpc.getparent().remove(rpc) # Remove pPrChange (paragraph format change tracking) for ppc in doc.element.findall(f".//{qn('w:pPrChange')}"): if ppc.getparent() is not None: ppc.getparent().remove(ppc) # Remove sectPrChange (section format change tracking) for spc in doc.element.findall(f".//{qn('w:sectPrChange')}"): if spc.getparent() is not None: spc.getparent().remove(spc) total = len(ins_els) + len(del_els) + len(fmt_els) if total: header = [ f"Tracked changes auto-accepted: {total} ({len(ins_els)}" + f" insertions, {len(del_els)} deletions, {len(fmt_els)} formatting)" ] return header + lines return [] # --------------------------------------------------------------------------- # Comment transforms # --------------------------------------------------------------------------- def get_comments_summary(doc: DocumentObject) -> dict: """ Returns a summary of comments in the document. Keys: 'total', 'open', 'resolved', 'comments' (list of dicts with id, author, text, resolved, heading). """ cm = CommentsManager(doc) data = cm.extract_comments_data() comments = [] open_count = 0 resolved_count = 0 for c_id, info in data.items(): is_resolved = info.get("resolved", False) if is_resolved: resolved_count += 1 else: open_count += 1 comments.append( { "id": c_id, "author": info.get("author", "Unknown"), "text": info.get("text", ""), "date": info.get("date", ""), "resolved": is_resolved, } ) return { "total": len(comments), "open": open_count, "resolved": resolved_count, "comments": comments, } def _eject_comment_parts(doc: DocumentObject): """Completely eject all comment XML parts from the DOCX package.""" pkg = doc.part.package comment_partnames = set() for part in pkg.parts: if str(part.partname).startswith("/word/comments"): comment_partnames.add(part.partname) if not comment_partnames: return # Sever from package root rels root_rels_to_remove = [ rId for rId, rel in pkg.rels.items() if not rel.is_external and getattr(rel.target_part, "partname", None) in comment_partnames ] for rId in root_rels_to_remove: del pkg.rels[rId] # Sever from all other parts for part in pkg.parts: part_rels_to_remove = [ rId for rId, rel in part.rels.items() if not rel.is_external and getattr(rel.target_part, "partname", None) in comment_partnames ] for rId in part_rels_to_remove: del part.rels[rId] # Remove from package parts list if hasattr(pkg, "_parts") and isinstance(pkg._parts, list): pkg._parts = [p for p in pkg._parts if p.partname not in comment_partnames] def remove_all_comments(doc: DocumentObject) -> list[str]: """Remove all comments from the document.""" cm = CommentsManager(doc) data = cm.extract_comments_data() if not data: return [] lines = [] for info in data.values(): status = "[Resolved]" if info.get("resolved") else "[Open]" lines.append(f' {status} "{_truncate(info.get("text", ""), 60)}" ({info.get("author", "Unknown")})') # Delete all comments for c_id in list(data.keys()): cm.delete_comment(c_id) # Strip any remaining comment range markers from body for tag in ["w:commentRangeStart", "w:commentRangeEnd", "w:commentReference"]: for el in doc.element.findall(f".//{qn(tag)}"): if el.getparent() is not None: el.getparent().remove(el) resolved_count = len([c for c in data.values() if c.get("resolved")]) open_count = len([c for c in data.values() if not c.get("resolved")]) header = [f"Comments removed: {len(data)} ({resolved_count} resolved, {open_count} open)"] return header + lines def remove_resolved_comments(doc: DocumentObject) -> list[str]: """Remove only resolved comments. Keep open ones.""" cm = CommentsManager(doc) data = cm.extract_comments_data() if not data: return [] resolved = {c_id: info for c_id, info in data.items() if info.get("resolved")} if not resolved: return [] lines = [] for c_id, info in resolved.items(): lines.append(f' [Resolved] "{_truncate(info.get("text", ""), 60)}" ({info.get("author", "Unknown")})') cm.delete_comment(c_id) # Clean up orphaned range markers for deleted comments remaining_ids = {c_id for c_id in data if c_id not in resolved} for tag in ["w:commentRangeStart", "w:commentRangeEnd", "w:commentReference"]: for el in doc.element.findall(f".//{qn(tag)}"): el_id = el.get(qn("w:id")) if el_id and el_id not in remaining_ids: if el.getparent() is not None: el.getparent().remove(el) # To be mathematically secure against orphaned resolved comments if cm.extended_part: for child in list(cm.extended_part.element): if child.get(qn("w15:done")) in ("1", "true", "on"): cm.extended_part.element.remove(child) return [f"Resolved comments stripped: {len(resolved)}"] + lines def replace_comment_authors(doc: DocumentObject, new_author: str) -> list[str]: """Replace author names on all comments with a single identity.""" cm = CommentsManager(doc) if not cm.comments_part: return [] original_authors = set() for c in cm.comments_part.element.findall(qn("w:comment")): author = c.get(qn("w:author")) if author: original_authors.add(author) c.set(qn("w:author"), new_author) initials = c.get(qn("w:initials")) if initials: new_initials = "".join(p[0] for p in new_author.split() if p).upper() c.set(qn("w:initials"), new_initials) if original_authors: return [f'Comment authors replaced: {", ".join(sorted(original_authors))} → "{new_author}"'] return [] def replace_change_authors(doc: DocumentObject, new_author: str) -> list[str]: """Replace author names on all track changes with a single identity.""" original_authors = set() for tag in [qn("w:ins"), qn("w:del"), qn("w:rPrChange"), qn("w:pPrChange")]: for el in doc.element.iter(tag): author = el.get(qn("w:author")) if author: original_authors.add(author) el.set(qn("w:author"), new_author) if original_authors: return [f'Track change authors replaced: {", ".join(sorted(original_authors))} → "{new_author}"'] return [] def normalize_change_dates(doc: DocumentObject) -> list[str]: """Normalize w:date on track changes to a single timestamp. Prevents counterparty from inferring when edits were made (e.g. "they edited clause 8 at 2am — they're under pressure"). """ count = 0 fixed_date = "2025-01-01T00:00:00Z" for tag in [qn("w:ins"), qn("w:del"), qn("w:rPrChange"), qn("w:pPrChange")]: for el in doc.element.iter(tag): if el.get(qn("w:date")): el.set(qn("w:date"), fixed_date) count += 1 if count: return [f"Track change timestamps: {count} normalized"] return [] # --------------------------------------------------------------------------- # Document property transforms # --------------------------------------------------------------------------- def scrub_doc_properties(doc: DocumentObject) -> list[str]: """Scrub author/timestamp/volatile metadata from docProps.""" lines = [] core = doc.core_properties if core.author: lines.append(f"Author: {core.author}") core.author = "" if core.last_modified_by: lines.append(f"Last modified by: {core.last_modified_by}") core.last_modified_by = "" if core.title: # Title is often intentional, but can leak. Report it, don't strip. pass if core.revision and core.revision > 1: lines.append(f"Revision count: {core.revision} → 1") core.revision = 1 # Strip volatile app properties via raw XML # python-docx doesn't expose all app.xml fields, so we go to the package app_part = _find_part(doc, "/docProps/app.xml") if app_part is not None: app_el = _parse_part_xml(app_part) if app_el is not None: # Integer fields must be set to "0" (empty string causes schema validation failure / unreadable content) int_fields = ["TotalTime", "Words", "Characters", "Paragraphs", "Lines", "CharactersWithSpaces"] for tag_local in int_fields: tag = f"{{{EXTENDED_NS}}}{tag_local}" for el in app_el.findall(f".//{tag}"): if el.text and el.text != "0": if tag_local == "TotalTime": lines.append(f"Total editing time: {el.text} minutes") el.text = "0" # String fields can be safely emptied str_fields = ["Template", "Manager", "Company"] for tag_local in str_fields: tag = f"{{{EXTENDED_NS}}}{tag_local}" for el in app_el.findall(f".//{tag}"): if el.text: if tag_local == "Template": lines.append(f"Template: {el.text}") elif tag_local == "Company": lines.append(f"Company: {el.text}") elif tag_local == "Manager": lines.append(f"Manager: {el.text}") el.text = "" _write_part_xml(app_part, app_el) if lines: return ["Metadata scrubbed:"] + [f" {line}" for line in lines] return [] def scrub_timestamps(doc: DocumentObject) -> list[str]: """Normalize timestamps in core properties using the python-docx API.""" modified_any = False epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) core = doc.core_properties if core.created and core.created != epoch: core.created = epoch modified_any = True if core.modified and core.modified != epoch: core.modified = epoch modified_any = True if core.last_printed and core.last_printed != epoch: core.last_printed = epoch modified_any = True if modified_any: return ["Timestamps normalized to epoch"] return [] # --------------------------------------------------------------------------- # Package-level transforms # --------------------------------------------------------------------------- def strip_custom_xml(doc: DocumentObject) -> list[str]: """Remove customXml parts from the package.""" pkg = doc.part.package # 1. Identify all Custom XML parts custom_parts = [] for part in pkg.parts: partname = str(part.partname) if "/customXml" in partname: custom_parts.append(part) if not custom_parts: return [] custom_partnames = {p.partname for p in custom_parts} # 2. Sever relationships from package root root_rels_to_remove = [ rId for rId, rel in pkg.rels.items() if not rel.is_external and getattr(rel.target_part, "partname", None) in custom_partnames ] for rId in root_rels_to_remove: del pkg.rels[rId] # 3. Sever relationships from all other parts for part in pkg.parts: part_rels_to_remove = [ rId for rId, rel in part.rels.items() if not rel.is_external and getattr(rel.target_part, "partname", None) in custom_partnames ] for rId in part_rels_to_remove: del part.rels[rId] # 4. Remove from package's internal parts list to prevent serialization if hasattr(pkg, "_parts") and isinstance(pkg._parts, list): pkg._parts = [p for p in pkg._parts if p.partname not in custom_partnames] # 5. Scrub dangling data bindings from the document body to prevent Unreadable Content for sdt_pr in doc.element.findall(f".//{qn('w:sdtPr')}"): for binding in sdt_pr.findall(f".//{qn('w:dataBinding')}"): sdt_pr.remove(binding) return [f"Custom XML parts: {len(custom_parts)} removed"] def audit_hyperlinks(doc: DocumentObject, internal_patterns: Optional[list[str]] = None) -> list[str]: """Find hyperlinks targeting internal URLs. Returns warnings (does not modify).""" if internal_patterns is None: internal_patterns = [ "sharepoint.com", "onedrive.com", ".internal", "intranet", "localhost", "10.", "192.168.", "172.16.", ] warnings = [] for rel in doc.part.rels.values(): if not rel.is_external: continue url = str(rel.target_ref) for pattern in internal_patterns: if pattern.lower() in url.lower(): warnings.append(f"Hyperlink targets internal URL: {_truncate(url, 80)}") break return warnings def strip_image_alt_text(doc: DocumentObject) -> list[str]: """Remove auto-generated alt text (descr attributes) from images.""" WP_NS = "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing" count = 0 for el in doc.element.iter(f"{{{WP_NS}}}docPr"): descr = el.get("descr") if descr: # Heuristic: keep if it looks intentional (long, not a filename) looks_like_filename = "." in descr and len(descr) < 60 is_short_auto = len(descr) < 10 if looks_like_filename or is_short_auto: del el.attrib["descr"] count += 1 if count: return [f"Image alt text: {count} auto-generated descriptions removed"] return [] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _get_element_text(el) -> str: """Extract visible text from a w:ins or w:del element.""" texts = [] for t in el.iter(qn("w:t")): if t.text: texts.append(t.text) for t in el.iter(qn("w:delText")): if t.text: texts.append(t.text) return "".join(texts) def _truncate(text: str, max_len: int = 60) -> str: text = text.replace("\n", " ").strip() if len(text) <= max_len: return text return text[: max_len - 3] + "..." def _find_part(doc: DocumentObject, partname_suffix: str): """Find a part in the OPC package by partname suffix.""" for part in doc.part.package.parts: if str(part.partname).endswith(partname_suffix): return part return None def _parse_part_xml(part) -> Optional[etree._Element]: """Parse a part's blob as XML.""" try: return etree.fromstring(part.blob) except Exception: return None def _write_part_xml(part, element: etree._Element): """Write modified XML back to a part's blob.""" part._blob = etree.tostring(element, xml_declaration=True, encoding="UTF-8", standalone=True) def get_nearest_heading(doc: DocumentObject, target_paragraph_index: int) -> str: """Find the nearest heading above a given paragraph index.""" best_heading = None para_idx = 0 for part in iter_document_parts(doc): for item in iter_block_items(part): if isinstance(item, Paragraph): prefix = get_paragraph_prefix(item) if prefix: heading_text = item.text.strip() if heading_text: best_heading = heading_text if para_idx >= target_paragraph_index: return f"§ {best_heading}" if best_heading else f"¶{target_paragraph_index}" para_idx += 1 return f"§ {best_heading}" if best_heading else f"¶{target_paragraph_index}"