"""Processing configuration models for embedding batching, AST caching, and chunking."""
from __future__ import annotations
import os
from pydantic import BaseModel, Field, field_validator, model_validator
def _get_default_parallel_workers() -> int:
"""Get the default number of parallel workers based on CPU count.
Returns a reasonable default: min(CPU count, 8) to avoid excessive overhead.
Falls back to 4 if CPU count cannot be determined.
"""
import os
try:
cpu_count = os.cpu_count()
if cpu_count is None:
return 4
# Cap at 8 to avoid excessive thread overhead
return min(cpu_count, 8)
except (NotImplementedError, OSError):
return 4
class EmbeddingBatchConfig(BaseModel):
"""Embedding batch processing configuration."""
model_config = {"frozen": True}
batch_size: int = Field(
default=100,
ge=1,
le=500,
description="Number of texts to embed per batch. "
"Local models can handle larger batches (100-200), API providers should use smaller (20-50).",
)
concurrency: int = Field(
default=4,
ge=1,
le=16,
description="Number of batches to process in parallel. "
"Higher values speed up embedding but increase memory/API usage.",
)
rate_limit_rpm: int | None = Field(
default=None,
description="Requests per minute limit for API providers. "
"If set, embedding will be throttled to respect this limit. "
"Set to None for local providers or when using default API limits.",
)
retry_max_attempts: int = Field(
default=3,
ge=1,
le=10,
description="Maximum retry attempts for failed batches.",
)
retry_base_delay: float = Field(
default=1.0,
ge=0.1,
le=10.0,
description="Base delay in seconds between retry attempts (exponential backoff).",
)
@field_validator("batch_size")
@classmethod
def validate_batch_size(cls, v: int) -> int:
"""Validate batch_size is reasonable."""
if v < 1:
raise ValueError("batch_size must be >= 1")
return v
@field_validator("concurrency")
@classmethod
def validate_concurrency(cls, v: int) -> int:
"""Validate concurrency doesn't exceed reasonable limits."""
cpu_count = os.cpu_count() or 4
max_concurrency = min(16, cpu_count * 2)
return min(v, max_concurrency)
class ASTCacheConfig(BaseModel):
"""AST cache configuration for tree-sitter parser.
Caches parsed ASTs to speed up incremental indexing by avoiding
re-parsing of unchanged files.
"""
model_config = {"frozen": True}
enabled: bool = Field(
default=True, description="Enable AST caching for incremental indexing"
)
max_entries: int = Field(
default=1000,
ge=100,
le=10000,
description="Maximum number of cached ASTs before LRU eviction",
)
ttl_seconds: int = Field(
default=3600,
ge=60,
le=86400, # 24 hours max
description="Cache TTL in seconds (default: 1 hour)",
)
class ChunkingConfig(BaseModel):
"""Chunking configuration."""
model_config = {"frozen": True}
max_chunk_tokens: int = Field(default=512, description="Max tokens per chunk")
overlap_tokens: int = Field(default=50, description="Overlap between chunks")
batch_size: int = Field(
default=500,
description="Number of chunks to process in each batch for memory efficiency",
)
class_split_threshold: int = Field(
default=100,
description="Line count threshold above which classes are split into summary + method chunks",
)
parallel_workers: int = Field(
default_factory=_get_default_parallel_workers,
ge=1,
le=32,
description="Number of parallel workers for file parsing. "
"Defaults to min(CPU count, 8). Higher values speed up indexing on multi-core systems.",
)
@field_validator("parallel_workers")
@classmethod
def validate_parallel_workers(cls, v: int) -> int:
"""Validate parallel_workers doesn't exceed CPU count."""
if v < 1:
raise ValueError("parallel_workers must be >= 1")
cpu_count = os.cpu_count() or 4
return min(v, cpu_count)
@model_validator(mode="after")
def validate_overlap_less_than_max(self) -> "ChunkingConfig":
"""Validate overlap_tokens is less than max_chunk_tokens."""
if self.overlap_tokens >= self.max_chunk_tokens:
raise ValueError(
f"overlap_tokens ({self.overlap_tokens}) must be less than "
f"max_chunk_tokens ({self.max_chunk_tokens})"
)
return self