# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
CLI commands for document ingestion.
"""
import asyncio
import logging
import sys
import time
from pathlib import Path
import click
from ..ingestion import get_loader_for_file, is_supported_file, SUPPORTED_FORMATS
from ..models.memory import Memory
from ..utils import _process_and_store_chunk
logger = logging.getLogger(__name__)
def add_ingestion_commands(cli_group):
"""Add ingestion commands to a Click CLI group."""
cli_group.add_command(ingest_document)
cli_group.add_command(ingest_directory)
cli_group.add_command(list_formats)
@click.command()
@click.argument('file_path', type=click.Path(exists=True, path_type=Path))
@click.option('--tags', '-t', multiple=True, help='Tags to apply to all memories (can be used multiple times)')
@click.option('--chunk-size', '-c', default=1000, help='Target size for text chunks in characters')
@click.option('--chunk-overlap', '-o', default=200, help='Characters to overlap between chunks')
@click.option('--memory-type', '-m', default='document', help='Type label for created memories')
@click.option('--storage-backend', '-s', default='sqlite_vec',
type=click.Choice(['sqlite_vec', 'sqlite-vec', 'cloudflare', 'hybrid']), help='Storage backend to use')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose output')
def ingest_document(file_path: Path, tags: tuple, chunk_size: int, chunk_overlap: int,
memory_type: str, storage_backend: str, verbose: bool):
"""
Ingest a single document file into the memory database.
Supports multiple formats including PDF, text, and Markdown files.
The document will be parsed, chunked intelligently, and stored as multiple memories.
Examples:
memory ingest-document manual.pdf --tags documentation,manual
memory ingest-document README.md --chunk-size 500 --verbose
memory ingest-document data.txt --memory-type reference --tags important
"""
if verbose:
logging.basicConfig(level=logging.INFO)
click.echo(f"π Processing document: {file_path}")
async def run_ingestion():
from .utils import get_storage
try:
# Initialize storage
storage = await get_storage(storage_backend)
# Get appropriate document loader
loader = get_loader_for_file(file_path)
if loader is None:
click.echo(f"β Error: Unsupported file format: {file_path.suffix}", err=True)
return False
# Configure loader
loader.chunk_size = chunk_size
loader.chunk_overlap = chunk_overlap
if verbose:
click.echo(f"π§ Using loader: {loader.__class__.__name__}")
click.echo(f"βοΈ Chunk size: {chunk_size}, Overlap: {chunk_overlap}")
start_time = time.time()
chunks_processed = 0
chunks_stored = 0
errors = []
# Extract and store chunks
with click.progressbar(length=0, label='Processing chunks') as bar:
async for chunk in loader.extract_chunks(file_path):
chunks_processed += 1
bar.length = chunks_processed
bar.update(1)
try:
# Combine CLI tags with chunk metadata tags
all_tags = list(tags)
if chunk.metadata.get('tags'):
# Handle tags from chunk metadata (can be string or list)
chunk_tags = chunk.metadata['tags']
if isinstance(chunk_tags, str):
# Split comma-separated string into list
chunk_tags = [tag.strip() for tag in chunk_tags.split(',') if tag.strip()]
all_tags.extend(chunk_tags)
# Create memory object
memory = Memory(
content=chunk.content,
content_hash=generate_content_hash(chunk.content, chunk.metadata),
tags=list(set(all_tags)), # Remove duplicates
memory_type=memory_type,
metadata=chunk.metadata
)
# Store the memory
success, error = await storage.store(memory)
if success:
chunks_stored += 1
else:
errors.append(f"Chunk {chunk.chunk_index}: {error}")
if verbose:
click.echo(f"β οΈ Error storing chunk {chunk.chunk_index}: {error}")
except Exception as e:
errors.append(f"Chunk {chunk.chunk_index}: {str(e)}")
if verbose:
click.echo(f"β οΈ Exception in chunk {chunk.chunk_index}: {str(e)}")
processing_time = time.time() - start_time
success_rate = (chunks_stored / chunks_processed * 100) if chunks_processed > 0 else 0
# Display results
click.echo(f"\nβ
Document ingestion completed: {file_path.name}")
click.echo(f"π Chunks processed: {chunks_processed}")
click.echo(f"πΎ Chunks stored: {chunks_stored}")
click.echo(f"β‘ Success rate: {success_rate:.1f}%")
click.echo(f"β±οΈ Processing time: {processing_time:.2f} seconds")
if errors:
click.echo(f"β οΈ Errors encountered: {len(errors)}")
if verbose:
for error in errors[:5]: # Show first 5 errors
click.echo(f" - {error}")
if len(errors) > 5:
click.echo(f" ... and {len(errors) - 5} more errors")
return chunks_stored > 0
except Exception as e:
click.echo(f"β Error ingesting document: {str(e)}", err=True)
if verbose:
import traceback
click.echo(traceback.format_exc(), err=True)
return False
finally:
if 'storage' in locals():
await storage.close()
success = asyncio.run(run_ingestion())
sys.exit(0 if success else 1)
@click.command()
@click.argument('directory_path', type=click.Path(exists=True, file_okay=False, path_type=Path))
@click.option('--tags', '-t', multiple=True, help='Tags to apply to all memories (can be used multiple times)')
@click.option('--recursive', '-r', is_flag=True, default=True, help='Process subdirectories recursively')
@click.option('--extensions', '-e', multiple=True, help='File extensions to process (default: all supported)')
@click.option('--chunk-size', '-c', default=1000, help='Target size for text chunks in characters')
@click.option('--max-files', default=100, help='Maximum number of files to process')
@click.option('--storage-backend', '-s', default='sqlite_vec',
type=click.Choice(['sqlite_vec', 'sqlite-vec', 'cloudflare', 'hybrid']), help='Storage backend to use')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose output')
@click.option('--dry-run', is_flag=True, help='Show what would be processed without storing')
def ingest_directory(directory_path: Path, tags: tuple, recursive: bool, extensions: tuple,
chunk_size: int, max_files: int, storage_backend: str, verbose: bool, dry_run: bool):
"""
Batch ingest all supported documents from a directory.
Recursively processes all supported file types in the directory,
creating memories with consistent tagging and metadata.
Examples:
memory ingest-directory ./docs --tags knowledge-base --recursive
memory ingest-directory ./manuals --extensions pdf,md --max-files 50
memory ingest-directory ./content --dry-run --verbose
"""
if verbose:
logging.basicConfig(level=logging.INFO)
click.echo(f"π Processing directory: {directory_path}")
async def run_batch_ingestion():
from .utils import get_storage
try:
# Initialize storage (unless dry run)
storage = None if dry_run else await get_storage(storage_backend)
# Determine file extensions to process
if extensions:
file_extensions = [ext.lstrip('.') for ext in extensions]
else:
file_extensions = list(SUPPORTED_FORMATS.keys())
if verbose:
click.echo(f"π Looking for extensions: {', '.join(file_extensions)}")
click.echo(f"π Max files: {max_files}, Recursive: {recursive}")
# Find all supported files
all_files = []
for ext in file_extensions:
ext_pattern = f"*.{ext.lstrip('.')}"
if recursive:
files = list(directory_path.rglob(ext_pattern))
else:
files = list(directory_path.glob(ext_pattern))
all_files.extend(files)
# Remove duplicates and filter supported files
unique_files = []
seen = set()
for file_path in all_files:
if file_path not in seen and is_supported_file(file_path):
unique_files.append(file_path)
seen.add(file_path)
# Limit number of files
files_to_process = unique_files[:max_files]
if not files_to_process:
click.echo(f"β No supported files found in directory: {directory_path}")
return False
click.echo(f"π Found {len(files_to_process)} files to process")
if dry_run:
click.echo("π DRY RUN - Files that would be processed:")
for file_path in files_to_process:
click.echo(f" π {file_path}")
return True
start_time = time.time()
total_chunks_processed = 0
total_chunks_stored = 0
files_processed = 0
files_failed = 0
all_errors = []
# Process each file with progress bar
with click.progressbar(files_to_process, label='Processing files') as files_bar:
for file_path in files_bar:
try:
if verbose:
click.echo(f"\nπ Processing: {file_path.name}")
# Get appropriate document loader
loader = get_loader_for_file(file_path)
if loader is None:
all_errors.append(f"{file_path.name}: Unsupported format")
files_failed += 1
continue
# Configure loader
loader.chunk_size = chunk_size
file_chunks_processed = 0
file_chunks_stored = 0
# Extract and store chunks from this file
async for chunk in loader.extract_chunks(file_path):
file_chunks_processed += 1
total_chunks_processed += 1
# Process and store the chunk
success, error = await _process_and_store_chunk(
chunk,
storage,
file_path.name,
base_tags=list(tags),
context_tags={
"source_dir": directory_path.name,
"file_type": file_path.suffix.lstrip('.')
}
)
if success:
file_chunks_stored += 1
total_chunks_stored += 1
else:
all_errors.append(error)
if file_chunks_stored > 0:
files_processed += 1
if verbose:
click.echo(f" β
{file_chunks_stored}/{file_chunks_processed} chunks stored")
else:
files_failed += 1
if verbose:
click.echo(f" β No chunks stored")
except Exception as e:
files_failed += 1
all_errors.append(f"{file_path.name}: {str(e)}")
if verbose:
click.echo(f" β Error: {str(e)}")
processing_time = time.time() - start_time
success_rate = (total_chunks_stored / total_chunks_processed * 100) if total_chunks_processed > 0 else 0
# Display results
click.echo(f"\nβ
Directory ingestion completed: {directory_path.name}")
click.echo(f"π Files processed: {files_processed}/{len(files_to_process)}")
click.echo(f"π Total chunks processed: {total_chunks_processed}")
click.echo(f"πΎ Total chunks stored: {total_chunks_stored}")
click.echo(f"β‘ Success rate: {success_rate:.1f}%")
click.echo(f"β±οΈ Processing time: {processing_time:.2f} seconds")
if files_failed > 0:
click.echo(f"β Files failed: {files_failed}")
if all_errors:
click.echo(f"β οΈ Total errors: {len(all_errors)}")
if verbose:
error_limit = 10
for error in all_errors[:error_limit]:
click.echo(f" - {error}")
if len(all_errors) > error_limit:
click.echo(f" ... and {len(all_errors) - error_limit} more errors")
return total_chunks_stored > 0
except Exception as e:
click.echo(f"β Error in batch ingestion: {str(e)}", err=True)
if verbose:
import traceback
click.echo(traceback.format_exc(), err=True)
return False
finally:
if storage:
await storage.close()
success = asyncio.run(run_batch_ingestion())
sys.exit(0 if success else 1)
@click.command()
def list_formats() -> None:
"""
List all supported document formats for ingestion.
Shows file extensions and descriptions of supported document types.
"""
click.echo("π Supported document formats for ingestion:\n")
for ext, description in SUPPORTED_FORMATS.items():
click.echo(f" π .{ext:<8} - {description}")
click.echo(f"\n⨠Total: {len(SUPPORTED_FORMATS)} supported formats")
click.echo("\nExamples:")
click.echo(" memory ingest-document manual.pdf")
click.echo(" memory ingest-directory ./docs --extensions pdf,md")