Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
test_chunking_fix.py8.18 kB
#!/usr/bin/env python3 """ Direct inline test of the fixed chunking logic """ import re from pathlib import Path from typing import List, Dict import hashlib def create_chunk_metadata(text: str, file_path: Path, chunk_num: int) -> Dict: """Create metadata for a text chunk""" content_hash = hashlib.md5(text.encode()).hexdigest()[:8] file_hash = hashlib.md5(str(file_path).encode()).hexdigest()[:6] chunk_id = f"{file_path.stem}_{file_hash}_{chunk_num}_{content_hash}" return { 'id': chunk_id, 'text': text, 'metadata': { 'source': str(file_path), 'filename': file_path.name, 'chunk_number': chunk_num, 'file_type': file_path.suffix[1:] if file_path.suffix else 'unknown' } } def create_chunks(content: str, file_path: Path) -> List[Dict]: """Proven chunking logic with aggressive multi-level fallback""" chunks = [] # Clean text content = re.sub(r'\s+', ' ', content).strip() if len(content) < 100: return chunks # Use proven settings from build-kb.py chunk_size = 800 # Target size max_chunk_size = 1200 # Hard limit to stay under token limits overlap = 150 # Overlap for context preservation # Multi-level splitting: paragraphs → sentences → words paragraphs = content.split('\n\n') current_chunk = "" chunk_num = 0 for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue # If paragraph itself is too large, split by sentences if len(paragraph) > max_chunk_size: sentences = re.split(r'[.!?]+', paragraph) for sentence in sentences: sentence = sentence.strip() if not sentence: continue # If sentence is still too large, split by words (aggressive fallback) if len(sentence) > max_chunk_size: words = sentence.split() current_sentence = "" for word in words: if len(current_sentence + " " + word) > max_chunk_size and current_sentence: # Store the current sentence chunk if len(current_sentence.strip()) > 100: chunks.append(create_chunk_metadata( current_sentence.strip(), file_path, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_words = current_sentence.split()[-20:] # Last 20 words current_sentence = " ".join(overlap_words) + " " + word else: current_sentence += " " + word if current_sentence else word # Add final sentence chunk if len(current_sentence.strip()) > 100: if len(current_chunk + " " + current_sentence) <= max_chunk_size: current_chunk += " " + current_sentence if current_chunk else current_sentence else: # Store current chunk first if len(current_chunk.strip()) > 100: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) chunk_num += 1 current_chunk = current_sentence else: # Normal sentence, try to add to current chunk if len(current_chunk + " " + sentence) > chunk_size and current_chunk: # Store current chunk if len(current_chunk.strip()) > 100: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk current_chunk = overlap_text + " " + sentence else: current_chunk += " " + sentence if current_chunk else sentence else: # Normal paragraph processing if len(current_chunk + " " + paragraph) > chunk_size and current_chunk: # Store current chunk if len(current_chunk.strip()) > 100: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk current_chunk = overlap_text + " " + paragraph else: current_chunk += " " + paragraph if current_chunk else paragraph # Add final chunk if len(current_chunk.strip()) > 100: # Ensure final chunk isn't too large if len(current_chunk) > max_chunk_size: # Split final chunk if needed words = current_chunk.split() while len(" ".join(words)) > max_chunk_size and len(words) > 10: chunk_words = words[:len(words)//2] chunks.append(create_chunk_metadata( " ".join(chunk_words), file_path, chunk_num )) chunk_num += 1 words = words[len(words)//2:] if words: chunks.append(create_chunk_metadata( " ".join(words), file_path, chunk_num )) else: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) return chunks def test_chunking(): """Test the chunking with problematic text sizes""" # Test 1: Massive single paragraph (common PDF issue) massive_text = "This is a sentence that appears in a PDF with no paragraph breaks. " * 300 # ~21,000 chars chunks = create_chunks(massive_text, Path("massive.pdf")) print(f"=== CHUNKING TEST RESULTS ===") print(f"Input: {len(massive_text):,} characters") print(f"Output: {len(chunks)} chunks") max_size = 0 over_limit = [] for i, chunk in enumerate(chunks): size = len(chunk['text']) max_size = max(max_size, size) if size > 1200: over_limit.append((i, size)) print(f" Chunk {i}: {size:,} chars") print(f"\nMax chunk size: {max_size:,} chars") print(f"Chunks over 1200 limit: {len(over_limit)}") if over_limit: for chunk_id, size in over_limit: print(f" Chunk {chunk_id}: {size:,} chars (OVER LIMIT!)") # Test 2: No-break text (worst case) no_break_text = "verylongwordwithnobreaksatall" * 500 # ~15,000 chars, no spaces chunks2 = create_chunks(no_break_text, Path("nobreaks.txt")) print(f"\n=== NO-BREAKS TEST ===") print(f"Input: {len(no_break_text):,} characters (no spaces)") print(f"Output: {len(chunks2)} chunks") max_size2 = 0 for i, chunk in enumerate(chunks2): size = len(chunk['text']) max_size2 = max(max_size2, size) print(f" Chunk {i}: {size:,} chars") print(f"Max chunk size: {max_size2:,} chars") # Overall validation success = max(max_size, max_size2) <= 1200 print(f"\n🎯 CHUNKING FIX: {'✅ SUCCESS' if success else '❌ FAILED'}") print(f"All chunks under 1200 chars: {success}") return success if __name__ == "__main__": test_chunking()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server