"""
Text chunking utilities for semantic search.
This module provides lightweight text processing and chunking capabilities
without heavy ML dependencies, replacing the transformers-based approach.
"""
import re
import hashlib
from typing import List, Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
def calculate_content_hash(content: str) -> str:
"""
Calculate SHA-256 hash for content change detection.
Args:
content: Text content to hash
Returns:
Hexadecimal hash string
"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def simple_text_chunker(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
"""
Split text into overlapping chunks without ML dependencies.
Args:
text: Input text to chunk
chunk_size: Target size of each chunk in characters
overlap: Number of characters to overlap between chunks
Returns:
List of text chunks
"""
# Ensure proper types
try:
chunk_size = int(chunk_size)
overlap = int(overlap)
except (ValueError, TypeError):
logger.warning(f"Invalid chunk parameters, using defaults")
chunk_size = 1000
overlap = 100
if not text or chunk_size <= 0:
return []
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
# Calculate end position
end = start + chunk_size
# If this is not the last chunk, try to break at word boundary
if end < len(text):
# Look for last space within the chunk
last_space = text.rfind(' ', start, end)
if last_space > start:
end = last_space
chunk = text[start:end].strip()
if chunk: # Only add non-empty chunks
chunks.append(chunk)
# Calculate next start position with overlap
if end >= len(text):
break
start = end - overlap
if start <= chunks[-1] if chunks else 0:
start = end # Avoid infinite loop
return chunks
def smart_chunk_by_sentences(text: str, target_size: int = 1000, min_size: int = 100) -> List[str]:
"""
Chunk text by sentence boundaries with size targets.
Strategy:
1. Split by sentences using regex
2. Combine sentences until target size reached
3. Ensure minimum chunk sizes
Args:
text: Input text to chunk
target_size: Target size of each chunk in characters
min_size: Minimum size for a chunk
Returns:
List of text chunks
"""
# Ensure proper types
try:
target_size = int(target_size)
min_size = int(min_size)
except (ValueError, TypeError):
logger.warning(f"Invalid sentence chunking parameters, using defaults")
target_size = 1000
min_size = 100
if not text or target_size <= 0:
return []
if len(text) <= target_size:
return [text]
# Split into sentences using regex
# This regex looks for sentence endings followed by whitespace and capital letters
sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])'
sentences = re.split(sentence_pattern, text)
if not sentences:
return [text]
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Check if adding this sentence would exceed target size
potential_chunk = current_chunk + (" " if current_chunk else "") + sentence
if len(potential_chunk) <= target_size:
current_chunk = potential_chunk
else:
# Current chunk is full, start new one
if current_chunk and len(current_chunk) >= min_size:
chunks.append(current_chunk)
# Handle very long sentences that exceed target size
if len(sentence) > target_size:
# Split long sentence into smaller chunks
long_chunks = simple_text_chunker(sentence, target_size, 50)
if long_chunks:
if len(long_chunks) > 1:
chunks.extend(long_chunks[:-1]) # Add all but last
current_chunk = long_chunks[-1]
else:
current_chunk = long_chunks[0]
else:
current_chunk = sentence[:target_size] # Fallback truncation
else:
current_chunk = sentence
# Add remaining chunk if it meets minimum size
if current_chunk and len(current_chunk) >= min_size:
chunks.append(current_chunk)
elif current_chunk and chunks:
# Merge small remainder with last chunk
chunks[-1] = chunks[-1] + " " + current_chunk
return chunks
def extract_searchable_text(item: Dict[str, Any]) -> str:
"""
Extract and combine searchable text from a Zotero item.
Combines:
- Title
- Abstract
- Authors
- Publication details
- Tags
- Notes (if present)
Args:
item: Zotero item dictionary
Returns:
Combined searchable text
"""
data = item.get("data", {})
text_parts = []
# Title
if title := data.get("title", "").strip():
text_parts.append(title)
# Authors/Creators
creators = data.get("creators", [])
if creators:
from .utils import format_creators
creators_text = format_creators(creators)
if creators_text and creators_text != "No authors listed":
text_parts.append(creators_text)
# Abstract
if abstract := data.get("abstractNote", "").strip():
text_parts.append(abstract)
# Publication details
publication_parts = []
if pub_title := data.get("publicationTitle", "").strip():
publication_parts.append(pub_title)
if journal := data.get("journalAbbreviation", "").strip():
publication_parts.append(journal)
if volume := data.get("volume", "").strip():
publication_parts.append(f"Volume {volume}")
if issue := data.get("issue", "").strip():
publication_parts.append(f"Issue {issue}")
if pages := data.get("pages", "").strip():
publication_parts.append(f"Pages {pages}")
if date := data.get("date", "").strip():
publication_parts.append(date)
if publication_parts:
text_parts.append(" ".join(publication_parts))
# Tags
tags = data.get("tags", [])
if tags:
tag_text = " ".join([tag.get("tag", "") for tag in tags if tag.get("tag", "").strip()])
if tag_text:
text_parts.append(tag_text)
# Notes (if present in data)
if note := data.get("note", "").strip():
# Clean HTML from notes
note_clean = re.sub(r'<[^>]+>', '', note)
note_clean = re.sub(r'\s+', ' ', note_clean).strip()
if note_clean:
text_parts.append(note_clean)
# DOI and URLs
if doi := data.get("DOI", "").strip():
text_parts.append(f"DOI: {doi}")
if url := data.get("url", "").strip():
text_parts.append(f"URL: {url}")
# Extra field (may contain citation keys or other metadata)
if extra := data.get("extra", "").strip():
# Parse extra field for useful information
extra_lines = [line.strip() for line in extra.split('\n') if line.strip()]
extra_text = " ".join(extra_lines)
if extra_text:
text_parts.append(extra_text)
return " ".join(filter(None, text_parts))
def create_document_chunks(item: Dict[str, Any], config: Dict[str, Any]) -> List[str]:
"""
Create searchable text chunks from a Zotero item (metadata only).
Args:
item: Zotero item dictionary
config: Chunking configuration with keys:
- chunk_size: Target chunk size (default 1000)
- overlap: Chunk overlap (default 100)
- min_chunk_size: Minimum chunk size (default 100)
- max_chunks_per_item: Maximum chunks per item (default 10)
- chunking_strategy: 'simple' or 'sentences' (default 'sentences')
Returns:
List of text chunks for embedding
"""
# Extract searchable text
full_text = extract_searchable_text(item)
if not full_text.strip():
logger.warning(f"No searchable text found for item {item.get('key', 'unknown')}")
return []
# Validate and normalize chunking config to ensure proper types
validated_config = validate_chunking_config(config)
# Get configuration from validated config (now guaranteed to be proper types)
chunk_size = validated_config["chunk_size"]
overlap = validated_config["overlap"]
min_chunk_size = validated_config["min_chunk_size"]
max_chunks = validated_config["max_chunks_per_item"]
strategy = validated_config["chunking_strategy"]
# Choose chunking strategy
if strategy == "simple":
chunks = simple_text_chunker(full_text, chunk_size, overlap)
else: # sentences strategy
chunks = smart_chunk_by_sentences(full_text, chunk_size, min_chunk_size)
# Filter out very small chunks and limit total number
valid_chunks = [
chunk for chunk in chunks
if len(chunk.strip()) >= min_chunk_size
]
# Limit number of chunks per item
if len(valid_chunks) > max_chunks:
logger.info(f"Limiting chunks for item {item.get('key', 'unknown')}: "
f"{len(valid_chunks)} -> {max_chunks}")
valid_chunks = valid_chunks[:max_chunks]
logger.debug(f"Created {len(valid_chunks)} chunks for item {item.get('key', 'unknown')}")
return valid_chunks
def create_fulltext_chunks(full_text: str, config: Dict[str, Any],
item_key: str = "unknown") -> List[str]:
"""
Create text chunks from full-text content (for large documents).
This function is optimized for processing large full-text content
and can handle documents of any size by using streaming approaches.
Args:
full_text: Full text content to chunk
config: Chunking configuration
item_key: Item key for logging purposes
Returns:
List of text chunks for embedding
"""
if not full_text or not full_text.strip():
logger.warning(f"No full-text content for item {item_key}")
return []
# Validate and normalize chunking config
validated_config = validate_chunking_config(config)
# Use larger chunk size for full-text content (configurable)
chunk_size = validated_config.get("fulltext_chunk_size",
validated_config["chunk_size"] * 2)
overlap = validated_config.get("fulltext_overlap",
validated_config["overlap"])
min_chunk_size = validated_config["min_chunk_size"]
strategy = validated_config["chunking_strategy"]
# Remove max_chunks limitation for full-text (we want to process everything)
logger.info(f"Processing full-text content for {item_key}: "
f"{len(full_text):,} characters")
# Preprocess text to improve chunking
processed_text = preprocess_text_for_embedding(full_text)
# Choose chunking strategy
if strategy == "simple":
chunks = simple_text_chunker(processed_text, chunk_size, overlap)
else: # sentences strategy
chunks = smart_chunk_by_sentences(processed_text, chunk_size, min_chunk_size)
# Filter out very small chunks but don't limit total number
valid_chunks = [
chunk for chunk in chunks
if len(chunk.strip()) >= min_chunk_size
]
logger.info(f"Created {len(valid_chunks)} full-text chunks for item {item_key}")
return valid_chunks
async def create_streaming_fulltext_chunks(full_text: str, config: Dict[str, Any],
chunk_callback = None, item_key: str = "unknown") -> int:
"""
Create text chunks from full-text content using streaming approach.
This function processes large texts without loading all chunks into memory,
calling a callback function for each chunk as it's created.
Args:
full_text: Full text content to chunk
config: Chunking configuration
chunk_callback: Callback function called with each chunk (chunk_text, chunk_index)
item_key: Item key for logging purposes
Returns:
Total number of chunks processed
"""
from io import StringIO
if not full_text or not full_text.strip():
logger.warning(f"No full-text content for item {item_key}")
return 0
if not chunk_callback:
logger.error("No chunk callback provided for streaming chunks")
return 0
# Validate and normalize chunking config
validated_config = validate_chunking_config(config)
chunk_size = validated_config.get("fulltext_chunk_size",
validated_config["chunk_size"] * 2)
overlap = validated_config.get("fulltext_overlap",
validated_config["overlap"])
min_chunk_size = validated_config["min_chunk_size"]
logger.info(f"Streaming full-text processing for {item_key}: "
f"{len(full_text):,} characters")
# Preprocess text to improve chunking
processed_text = preprocess_text_for_embedding(full_text)
# Use streaming approach for very large texts
if len(processed_text) > 1_000_000: # 1MB threshold
return await _stream_large_text_chunks(processed_text, chunk_size, overlap,
min_chunk_size, chunk_callback, item_key)
else:
# For smaller texts, use regular chunking
chunks = create_fulltext_chunks(processed_text, validated_config, item_key)
chunk_count = 0
for i, chunk in enumerate(chunks):
await chunk_callback(chunk, i) # i starts at 0 for each attachment
chunk_count += 1
return chunk_count
async def _stream_large_text_chunks(text: str, chunk_size: int, overlap: int,
min_chunk_size: int, chunk_callback, item_key: str) -> int:
"""
Stream process very large text files to avoid memory issues.
This function processes text in a streaming fashion, never loading
more than a few chunks into memory at once.
"""
chunk_count = 0 # Always start at 0 for each attachment
text_pos = 0
text_len = len(text)
overlap_buffer = ""
max_chunks = 10000 # Safety limit to prevent infinite loops
logger.info(f"Starting streaming chunk processing for {item_key} ({text_len:,} chars) - chunk indices will start at 0")
while text_pos < text_len and chunk_count < max_chunks:
# Calculate chunk boundaries
chunk_start = max(0, text_pos - len(overlap_buffer))
chunk_end = min(text_len, text_pos + chunk_size)
# Extract chunk text
chunk_text = text[chunk_start:chunk_end]
# Add overlap buffer if we have one
if overlap_buffer and text_pos > 0:
chunk_text = overlap_buffer + chunk_text[len(overlap_buffer):]
# Find sentence boundary for better chunking (only if not at end)
actual_chunk = chunk_text
next_pos = chunk_end
if chunk_end < text_len and len(chunk_text) >= chunk_size:
# Look for sentence ending near the target chunk size
sentence_end = chunk_text.rfind('.', chunk_size - 200, chunk_size + 200)
if sentence_end > chunk_size - 500 and sentence_end > 0:
actual_chunk = chunk_text[:sentence_end + 1]
next_pos = chunk_start + sentence_end + 1
else:
# No good sentence break, use word boundary
word_end = chunk_text.rfind(' ', chunk_size - 100, chunk_size + 100)
if word_end > chunk_size - 200 and word_end > 0:
actual_chunk = chunk_text[:word_end]
next_pos = chunk_start + word_end + 1
# Prepare overlap for next chunk
if len(actual_chunk) >= overlap:
overlap_buffer = actual_chunk[-overlap:]
else:
overlap_buffer = ""
# Only process chunks that meet minimum size
if len(actual_chunk.strip()) >= min_chunk_size:
await chunk_callback(actual_chunk.strip(), chunk_count)
chunk_count += 1
if chunk_count % 100 == 0:
logger.debug(f"Processed {chunk_count} chunks for {item_key} (pos: {text_pos:,}/{text_len:,})")
# Move to next position
if next_pos <= text_pos:
# Prevent infinite loop - force advance if position didn't move
text_pos = min(text_pos + chunk_size, text_len)
logger.warning(f"Position did not advance for {item_key}, forcing advance to {text_pos}")
else:
text_pos = next_pos
if chunk_count >= max_chunks:
logger.warning(f"Hit safety limit of {max_chunks} chunks for {item_key}")
logger.info(f"Completed streaming processing: {chunk_count} chunks for {item_key}")
return chunk_count
def preprocess_text_for_embedding(text: str) -> str:
"""
Preprocess text before embedding generation.
This function cleans and normalizes text to improve embedding quality
without using heavy NLP libraries.
Args:
text: Input text to preprocess
Returns:
Cleaned and normalized text
"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove or normalize special characters that don't add semantic value
# Keep punctuation that affects meaning
text = re.sub(r'[^\w\s\.,!?;:()\'"\/\-]', ' ', text)
# Normalize quotes
text = re.sub(r'[""]', '"', text)
text = re.sub(r'[\u2018\u2019]', "'", text)
# Remove multiple consecutive punctuation
text = re.sub(r'([.!?]){2,}', r'\1', text)
# Clean up spaces around punctuation
text = re.sub(r'\s+([.!?,:;])', r'\1', text)
text = re.sub(r'([.!?])\s*([.!?])', r'\1 \2', text)
# Final cleanup
text = re.sub(r'\s+', ' ', text).strip()
return text
def get_text_stats(text: str) -> Dict[str, Any]:
"""
Get basic statistics about text content.
Args:
text: Input text
Returns:
Dictionary with text statistics
"""
if not text:
return {
"length": 0,
"words": 0,
"sentences": 0,
"paragraphs": 0
}
# Character count
length = len(text)
# Word count (simple word boundary splitting)
words = len(re.findall(r'\b\w+\b', text))
# Sentence count (approximate)
sentences = len(re.findall(r'[.!?]+', text))
# Paragraph count (double newlines)
paragraphs = len(re.split(r'\n\s*\n', text))
return {
"length": length,
"words": words,
"sentences": sentences,
"paragraphs": paragraphs
}
def validate_chunking_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate and normalize chunking configuration.
Args:
config: Input configuration dictionary
Returns:
Validated and normalized configuration
"""
validated = {}
# Helper function to safely convert to int with better error handling
def safe_int(value, default: int, name: str) -> int:
try:
# Handle None or empty values
if value is None or value == "":
logger.debug(f"Empty {name}, using default {default}")
return default
# Handle already int values
if isinstance(value, int):
return value
# Handle string conversion
if isinstance(value, str):
# Strip whitespace and handle empty strings
cleaned = value.strip()
if not cleaned:
logger.debug(f"Empty string for {name}, using default {default}")
return default
result = int(cleaned)
return result
# Handle float conversion (truncate to int)
if isinstance(value, float):
return int(value)
# Try generic conversion
result = int(value)
return result
except (ValueError, TypeError) as e:
logger.warning(f"Invalid {name} '{value}' (type: {type(value).__name__}), using default {default}: {e}")
return default
except Exception as e:
logger.error(f"Unexpected error converting {name} '{value}', using default {default}: {e}")
return default
# Convert all values to integers first with extra safety to avoid type comparison issues
chunk_size = safe_int(config.get("chunk_size", 1000), 1000, "chunk_size")
overlap = safe_int(config.get("overlap", 100), 100, "overlap")
min_chunk_size = safe_int(config.get("min_chunk_size", 100), 100, "min_chunk_size")
max_chunks = safe_int(config.get("max_chunks_per_item", 10), 10, "max_chunks_per_item")
# Double-check all values are integers (extra safety)
chunk_size = int(chunk_size)
overlap = int(overlap)
min_chunk_size = int(min_chunk_size)
max_chunks = int(max_chunks)
# Now validate ranges with all values guaranteed to be integers
if chunk_size < 100:
logger.warning(f"chunk_size {chunk_size} too small, using 1000")
chunk_size = 1000
elif chunk_size > 5000:
logger.warning(f"Large chunk_size {chunk_size}, using 5000")
chunk_size = 5000
validated["chunk_size"] = chunk_size
# Full-text chunk size (usually larger than metadata chunks)
fulltext_chunk_size = safe_int(config.get("fulltext_chunk_size", chunk_size * 2), chunk_size * 2, "fulltext_chunk_size")
fulltext_chunk_size = int(fulltext_chunk_size) # Extra safety
if fulltext_chunk_size < chunk_size:
logger.warning(f"fulltext_chunk_size {fulltext_chunk_size} smaller than chunk_size, using {chunk_size * 2}")
fulltext_chunk_size = chunk_size * 2
elif fulltext_chunk_size > 10000:
logger.warning(f"Large fulltext_chunk_size {fulltext_chunk_size}, using 10000")
fulltext_chunk_size = 10000
validated["fulltext_chunk_size"] = fulltext_chunk_size
# Validate overlap
if overlap < 0:
logger.warning(f"Invalid overlap {overlap}, using 100")
overlap = 100
elif overlap >= chunk_size:
logger.warning(f"Overlap {overlap} >= chunk_size {chunk_size}, using {chunk_size // 4}")
overlap = chunk_size // 4
validated["overlap"] = overlap
# Full-text overlap (proportional to fulltext chunk size)
fulltext_overlap = safe_int(config.get("fulltext_overlap", overlap), overlap, "fulltext_overlap")
fulltext_overlap = int(fulltext_overlap) # Extra safety
if fulltext_overlap < 0:
logger.warning(f"Invalid fulltext_overlap {fulltext_overlap}, using {overlap}")
fulltext_overlap = overlap
elif fulltext_overlap >= fulltext_chunk_size:
logger.warning(f"fulltext_overlap {fulltext_overlap} >= fulltext_chunk_size {fulltext_chunk_size}, using {fulltext_chunk_size // 4}")
fulltext_overlap = fulltext_chunk_size // 4
validated["fulltext_overlap"] = fulltext_overlap
# Validate minimum chunk size
if min_chunk_size < 50:
logger.warning(f"min_chunk_size {min_chunk_size} too small, using 100")
min_chunk_size = 100
elif min_chunk_size >= chunk_size:
logger.warning(f"min_chunk_size {min_chunk_size} >= chunk_size {chunk_size}, using {chunk_size // 2}")
min_chunk_size = chunk_size // 2
validated["min_chunk_size"] = min_chunk_size
# Validate maximum chunks per item
if max_chunks < 1:
logger.warning(f"max_chunks_per_item {max_chunks} too small, using 10")
max_chunks = 10
elif max_chunks > 50:
logger.warning(f"Large max_chunks_per_item {max_chunks}, using 50")
max_chunks = 50
validated["max_chunks_per_item"] = max_chunks
# Chunking strategy
strategy = config.get("chunking_strategy", "sentences")
if strategy not in ["simple", "sentences"]:
logger.warning(f"Invalid chunking_strategy {strategy}, using 'sentences'")
strategy = "sentences"
validated["chunking_strategy"] = strategy
return validated