# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev>
# This file is part of Archive Agent. See LICENSE for details.
from logging import Logger
import re
from concurrent.futures import ProcessPoolExecutor
import time
from typing import List, Tuple, Optional, Callable
from dataclasses import dataclass
import spacy
from spacy.language import Language
from spacy.tokens import Doc
import bisect
from archive_agent.ai.AiManager import AiManager
from archive_agent.ai.AiManagerFactory import AiManagerFactory
from archive_agent.ai.chunk.AiChunk import ChunkSchema
from archive_agent.data.DocumentContent import DocumentContent, ReferenceList
from archive_agent.util.format import format_file
from archive_agent.util.text_util import splitlines_exact
from archive_agent.core.ProgressManager import ProgressInfo
SentenceRange = Tuple[int, int]
SentenceBoundary = Tuple[str, int, int] # (text, start_char, end_char)
@dataclass
class SentenceWithRange:
text: str
reference_range: SentenceRange
@dataclass
class ChunkWithRange:
text: str
reference_range: SentenceRange
def _normalize_inline_whitespace(text: str) -> str:
"""
Normalize inline whitespace inside a string:
- Replace tabs and non-breaking spaces with normal space
- Collapse multiple spaces into one
- Strip leading and trailing whitespace
:param text: Single paragraph string
:return: Cleaned inline text
"""
text = text.replace("\t", " ").replace("\xa0", " ")
text = re.sub(r" {2,}", " ", text)
return text.strip()
def _extract_paragraph_sentences_with_reference_ranges(
paragraph_doc_content: DocumentContent,
sentence_boundaries: List[SentenceBoundary],
) -> List[SentenceWithRange]:
"""
Map pre-computed sentence boundaries to reference ranges.
:param paragraph_doc_content: Document content (single paragraph).
:param sentence_boundaries: Sentence boundaries from spaCy subprocess.
:return: List of SentenceWithRange for the block.
"""
paragraph_lines = paragraph_doc_content.lines
paragraph_references = paragraph_doc_content.get_per_line_references()
normalized_lines = [_normalize_inline_whitespace(line) for line in paragraph_lines]
line_starts: Optional[List[int]] = None
if paragraph_references:
line_starts = [0]
for norm_line in normalized_lines[:-1]:
line_starts.append(line_starts[-1] + len(norm_line) + 1)
block_sentences: List[SentenceWithRange] = []
for sent_text, start_char, end_char in sentence_boundaries:
sentence_text = sent_text.strip()
if sentence_text and paragraph_references and line_starts:
start_line_idx = bisect.bisect_right(line_starts, start_char) - 1
start_line_idx = min(start_line_idx, len(paragraph_lines) - 1)
end_line_idx = bisect.bisect_right(line_starts, end_char - 1) - 1
end_line_idx = min(end_line_idx, len(paragraph_lines) - 1)
sentence_refs = paragraph_references[start_line_idx: end_line_idx + 1]
min_ref = min(sentence_refs) if sentence_refs else 0
max_ref = max(sentence_refs) if sentence_refs else 0
else:
min_ref = max_ref = 0
block_sentences.append(SentenceWithRange(sentence_text, (min_ref, max_ref)))
return block_sentences
@Language.component("_spacy_markdown_sentence_fixer")
def _spacy_markdown_sentence_fixer(doc: Doc) -> Doc:
"""
Adjust sentence segmentation to better handle Markdown syntax.
Prevents sentence breaks after Markdown-specific tokens like headers, list items, and code blocks.
"""
skip_next = False
for i, token in enumerate(doc[:-1]):
if skip_next:
doc[i + 1].is_sent_start = False
skip_next = False
continue
# Inline code or triple backticks
if token.text in {"`", "```"}:
doc[i + 1].is_sent_start = False
continue
# Markdown headers
if token.text.startswith("#"):
doc[i + 1].is_sent_start = False
continue
# Bullet list
if token.text in {"-", "*"} and (i == 0 or doc[i - 1].text == "\n"):
doc[i + 1].is_sent_start = False
continue
# Avoid break inside code block fences
if token.text == "```":
skip_next = True
return doc
_nlp_cache: Optional[Language] = None
def _get_nlp() -> Language:
"""
Get NLP model for sentence splitting (cached).
:return: spaCy language model.
"""
global _nlp_cache
if _nlp_cache is not None:
return _nlp_cache
# Use blank English pipeline - only tokenizer + sentencizer needed.
nlp = spacy.blank("en")
nlp.max_length = 100_000_000
nlp.add_pipe("sentencizer")
nlp.add_pipe("_spacy_markdown_sentence_fixer", after="sentencizer")
_nlp_cache = nlp
return nlp
def _spacy_tokenize_paragraphs(para_texts: List[str]) -> List[List[SentenceBoundary]]:
"""
Run spaCy tokenization in a subprocess with its own GIL.
The Cython tokenizer holds the GIL without releasing it, starving the printer
thread and all other workers. Running in a subprocess isolates the GIL contention.
:param para_texts: Paragraph texts to tokenize.
:return: Sentence boundaries per paragraph: [(text, start_char, end_char), ...].
"""
nlp = _get_nlp()
return [
[(sent.text, sent.start_char, sent.end_char) for sent in nlp(text).sents]
for text in para_texts
]
_spacy_executor: Optional[ProcessPoolExecutor] = None
def _get_spacy_executor() -> ProcessPoolExecutor:
"""
Get or create the subprocess executor for spaCy tokenization.
:return: ProcessPoolExecutor with a single worker subprocess.
"""
global _spacy_executor
if _spacy_executor is None:
_spacy_executor = ProcessPoolExecutor(max_workers=1)
return _spacy_executor
def _extract_paragraphs(doc_content: DocumentContent) -> List[DocumentContent]:
"""
Extract paragraphs.
- Respects empty lines.
- Turns Markdown list items into separate paragraphs to let NLP (spaCy) work properly.
- Turns Markdown headings into separate paragraphs.
:param doc_content: Document content.
:return: List document content (one per paragraph).
"""
per_line_references = doc_content.get_per_line_references()
para_blocks: List[DocumentContent] = []
current_paragraph_lines: List[str] = []
current_reference_list: ReferenceList = []
has_references = bool(per_line_references)
for line_index, line_text in enumerate(doc_content.lines):
next_paragraph = False
discard_line = False
# Empty line starts new paragraph
if not line_text:
next_paragraph = True
discard_line = True
# Markdown item starts new paragraph
if line_text.startswith("- "):
next_paragraph = True
# Markdown heading starts new paragraph
if line_text.startswith("#"):
next_paragraph = True
# Push current paragraph and start next
if next_paragraph:
if current_paragraph_lines:
para_blocks.append(DocumentContent.from_lines(lines=current_paragraph_lines, lines_per_line=current_reference_list))
current_paragraph_lines = []
current_reference_list = []
# Discard line
if discard_line:
continue
# Append text line with page or line number reference
current_paragraph_lines.append(line_text)
if has_references:
current_reference_list.append(per_line_references[line_index])
# If heading, push as single-line paragraph
if line_text.startswith("#"):
para_blocks.append(DocumentContent.from_lines(lines=current_paragraph_lines, lines_per_line=current_reference_list))
current_paragraph_lines = []
current_reference_list = []
# Push last paragraph if non-empty
if current_paragraph_lines:
para_blocks.append(DocumentContent.from_lines(lines=current_paragraph_lines, lines_per_line=current_reference_list))
return para_blocks
def _extract_sentences_with_reference_ranges(
paragraphs_doc_content: List[DocumentContent],
) -> List[SentenceWithRange]:
# Prepare paragraph texts for batch tokenization in subprocess
para_texts: List[str] = []
for paragraph_doc_content in paragraphs_doc_content:
normalized_lines = [_normalize_inline_whitespace(line) for line in paragraph_doc_content.lines]
para_texts.append(" ".join(normalized_lines))
# Run spaCy in subprocess (own GIL - doesn't starve main process)
future = _get_spacy_executor().submit(_spacy_tokenize_paragraphs, para_texts)
all_sentence_boundaries: List[List[SentenceBoundary]] = future.result()
# Map sentence boundaries to reference ranges (main process, GIL-friendly)
sentences_with_reference_ranges: List[SentenceWithRange] = []
for paragraph_index, (paragraph_doc_content, sentence_boundaries) in enumerate(
zip(paragraphs_doc_content, all_sentence_boundaries)
):
if paragraph_index > 0:
# Insert paragraph delimiter: empty sentence with zero range.
# (Unit tests are sensitive to this logic)
sentences_with_reference_ranges.append(SentenceWithRange("", (0, 0)))
paragraph_sentences_with_reference_ranges = _extract_paragraph_sentences_with_reference_ranges(
paragraph_doc_content=paragraph_doc_content,
sentence_boundaries=sentence_boundaries,
)
sentences_with_reference_ranges.extend(paragraph_sentences_with_reference_ranges)
return sentences_with_reference_ranges
def get_sentences_with_reference_ranges(doc_content: DocumentContent) -> List[SentenceWithRange]:
"""
Use preprocessing and NLP (spaCy) to split text into sentences, keeping track of reference ranges.
Processes text with paragraph and sentence handling, inferring ranges from provided references (lines or pages).
spaCy runs in a subprocess to avoid GIL starvation of the printer thread and other workers.
:param doc_content: Document content.
:return: List of SentenceWithRange (text and range pairs).
"""
return _extract_sentences_with_reference_ranges(
paragraphs_doc_content=_extract_paragraphs(doc_content),
)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def _group_blocks_of_sentences(sentences: List[str], sentences_per_block: int) -> List[List[str]]:
"""
Group sentences into blocks of a specified size.
:param sentences: List of all sentences.
:param sentences_per_block: Number of sentences per block.
:return: List of sentence blocks.
"""
return [sentences[i:i + sentences_per_block] for i in range(0, len(sentences), sentences_per_block)]
def _chunk_start_to_ranges(start_lines: List[int], total_lines: int) -> List[SentenceRange]:
"""
Convert a list of chunk start lines into ranges of sentence indices.
:param start_lines: List of starting line numbers for chunks.
:param total_lines: Total number of lines in the block.
:return: List of (start, end) ranges for each chunk.
"""
extended = start_lines + [total_lines + 1]
return list(zip(extended[:-1], extended[1:]))
def _extract_chunks_and_carry(sentences: List[str], ranges: List[SentenceRange]) -> Tuple[List[str], str | None]:
"""
Extract chunks and any carry-over text from sentences based on ranges.
:param sentences: List of sentences in the block.
:param ranges: List of (start, end) ranges for chunks.
:return: Tuple of (list of chunk texts, carry-over text or None).
"""
if not ranges:
return [], None
if len(ranges) == 1:
start, end = ranges[0]
return [], "\n".join(sentences[start - 1:end - 1])
*main, last = ranges
chunks = ["\n".join(sentences[start - 1:end - 1]) for start, end in main]
carry = "\n".join(sentences[last[0] - 1:last[1] - 1])
return chunks, carry
def _aggregate_ranges(reference_ranges: List[SentenceRange]) -> SentenceRange:
"""
Aggregate min and max references from a list of ranges, ignoring zeros.
:param reference_ranges: List of (min, max) reference ranges.
:return: Aggregated (min, max) reference.
"""
valid_mins = [min_r for min_r, _ in reference_ranges if min_r > 0]
valid_maxs = [max_r for _, max_r in reference_ranges if max_r > 0]
r_min = min(valid_mins) if valid_mins else 0
r_max = max(valid_maxs) if valid_maxs else 0
return r_min, r_max
def _format_chunk(_file_path: str, header: str, body: str) -> str:
"""
Format chunk.
:param _file_path: File path.
:param header: Chunk header.
:param body: Chunk body.
:return: Formatted chunk text.
"""
return "\n".join([
# NOTE: Removed in v11.0.0. Use `tools/qdrant-remove-paths-from-context-headers.py` to remove this from pre-v11.0.0.
# f"# {format_file(file_path)}",
# Context header
f"# {header}",
f"",
# Chunk body
body,
])
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def get_chunks_with_reference_ranges(
ai_factory: AiManagerFactory,
sentences_with_references: List[SentenceWithRange],
chunk_callback: Callable[[AiManager, List[str]], ChunkSchema],
chunk_lines_block: int,
file_path: str,
progress_info: ProgressInfo,
logger: Logger,
verbose: bool = True,
) -> List[ChunkWithRange]:
"""
Chunkify a list of sentences into AI-determined chunks, carrying over leftover sentences where needed,
and annotate each chunk with the corresponding (min, max) line reference range.
:param ai_factory: AI manager factory.
:param sentences_with_references: List of sentences with their reference ranges.
:param chunk_callback: Chunk callback.
:param chunk_lines_block: Number of sentences per block to be chunked.
:param file_path: Path to the originating file (used for logging and labeling).
:param progress_info: Progress tracking information
:param logger: Logger.
:param verbose: Enable to show additional information.
:return: List of ChunkWithRange objects containing the formatted chunk and its reference range.
"""
sentences = [s.text for s in sentences_with_references]
sentence_reference_ranges = [s.reference_range for s in sentences_with_references]
if len(sentence_reference_ranges) != len(sentences):
raise ValueError(
f"Reference range mismatch: "
f"{len(sentence_reference_ranges)} ranges, "
f"{len(sentences)} sentences "
f"for {format_file(file_path)}"
)
blocks_of_sentences = _group_blocks_of_sentences(sentences, chunk_lines_block)
logger.info(
f"Chunking ({len(sentences)}) sentences in ({len(blocks_of_sentences)}) blocks "
f"for {format_file(file_path)}"
)
# Set progress total based on total sentences (more meaningful than block count)
progress_info.progress_manager.set_total(progress_info.parent_key, len(sentences_with_references))
chunks_with_ranges: List[ChunkWithRange] = []
carry: Optional[str] = None
carry_reference_ranges: Optional[List[SentenceRange]] = None
last_carry_header: Optional[str] = None
failed_blocks = 0
chunk_t0 = time.monotonic()
idx = 0
for block_index, block_of_sentences in enumerate(blocks_of_sentences):
block_len = len(block_of_sentences)
block_sentence_reference_ranges = sentence_reference_ranges[idx: idx + block_len]
idx += block_len
if verbose:
logger.info(f"Chunking block ({block_index + 1}) / ({len(blocks_of_sentences)}) of {format_file(file_path)}")
if carry:
assert carry_reference_ranges is not None
carry_lines = splitlines_exact(carry)
current_block_line_count: int = len(carry_lines) + len(block_of_sentences)
if verbose:
logger.info(f"Carrying over ({len(carry_lines)}) lines; current block has ({current_block_line_count}) lines")
block_of_sentences = carry_lines + block_of_sentences
block_sentence_reference_ranges = carry_reference_ranges + block_sentence_reference_ranges
range_start = block_sentence_reference_ranges[0] if block_sentence_reference_ranges else (0, 0)
range_stop = block_sentence_reference_ranges[-1] if block_sentence_reference_ranges else (0, 0)
logger.debug(f"Chunking block {block_index + 1}: {len(block_of_sentences)} sentences, range {range_start} to {range_stop}")
try:
ai = ai_factory.get_ai()
chunk_result: ChunkSchema = chunk_callback(ai, block_of_sentences)
except Exception as e:
failed_blocks += 1
logger.critical(
f"BLOCK SKIPPED: Chunking failed for block ({block_index + 1}) / ({len(blocks_of_sentences)}) "
f"of {format_file(file_path)} — ({len(block_of_sentences)}) sentences lost: {e}"
)
carry = None
carry_reference_ranges = None
last_carry_header = None
progress_info.progress_manager.update_task(progress_info.parent_key, advance=block_len)
continue
chunk_start_lines = chunk_result.get_chunk_start_lines()
chunk_headers = chunk_result.get_chunk_headers()
ranges = _chunk_start_to_ranges(start_lines=chunk_start_lines, total_lines=len(block_of_sentences))
block_chunks, carry = _extract_chunks_and_carry(block_of_sentences, ranges)
for i, (r_start, r_end) in enumerate(ranges[:-1] if carry else ranges):
r_reference_ranges = block_sentence_reference_ranges[r_start - 1:r_end - 1]
r_range = _aggregate_ranges(r_reference_ranges)
if r_range == (0, 0):
# AI returned `chunk_start_sentences` selecting empty lines only
logger.warning(f"Chunk {len(chunks_with_ranges) + 1}: Discarding empty chunk")
continue
# DEBUG
# logger.info(f"Chunk {len(chunks_with_ranges) + 1}: sentences {r_start}:{r_end}, range {r_range}")
body = "\n".join(block_of_sentences[r_start - 1:r_end - 1])
chunk_text = _format_chunk(
_file_path=file_path,
header=chunk_headers[i],
body=body,
)
chunks_with_ranges.append(ChunkWithRange(chunk_text, r_range))
if carry:
carry_reference_ranges = block_sentence_reference_ranges[ranges[-1][0] - 1:ranges[-1][1] - 1]
last_carry_header = chunk_headers[-1]
# Update progress after processing each block (advance by number of sentences processed)
progress_info.progress_manager.update_task(progress_info.parent_key, advance=block_len)
block_elapsed = time.monotonic() - chunk_t0
logger.info(
f"Chunked block ({block_index + 1}) / ({len(blocks_of_sentences)}) "
f"({len(chunks_with_ranges)} chunks so far, {block_elapsed:.1f}s elapsed)"
)
if failed_blocks > 0:
logger.critical(
f"INCOMPLETE INGESTION: ({failed_blocks}) of ({len(blocks_of_sentences)}) blocks failed chunking "
f"for {format_file(file_path)} — re-run with a different model or smaller block size"
)
if carry:
assert last_carry_header is not None, "Internal error: carry exists but no header was recorded"
assert carry_reference_ranges is not None
carry_range = _aggregate_ranges(carry_reference_ranges)
if carry_range == (0, 0):
logger.warning(f"Chunk {len(chunks_with_ranges) + 1}: Discarding empty chunk (final)")
return chunks_with_ranges
carry_lines = splitlines_exact(carry)
final_chunk_line_count: int = len(carry_lines)
if verbose:
logger.info(f"Appending final carry of ({final_chunk_line_count}) lines; final chunk has ({final_chunk_line_count}) lines")
formatted_carry = _format_chunk(
_file_path=file_path,
header=last_carry_header,
body=carry,
)
chunks_with_ranges.append(ChunkWithRange(formatted_carry, carry_range))
return chunks_with_ranges