async def add_document(
new_file_path: str,
project_id: str,
feature_id: Optional[str],
doc_type: Optional[str],
file_path: Optional[str],
root_dir: str,
graphiti_client: GraphitiClient,
sync_mode: bool = False
) -> SyncResult:
"""
Compare new file with ROOT original file and sync differences to Graphiti.
@REQ: REQ-graphiti-chunk-mcp
@BP: BP-graphiti-chunk-mcp
@TASK: TASK-006-AddDocument
This is the main entry point for document comparison and syncing pipeline.
It performs the following steps:
1. Reads new file content
2. Uses PathResolver to locate ROOT original file
3. Reads ROOT original file content
4. Chunks both new and old files
5. Compares chunks to identify changes
6. Synchronizes changes to Graphiti (add/update/delete episodes)
7. Returns sync results
Args:
new_file_path: Absolute path to the new/modified file
project_id: Project identifier (e.g., 'knowledge-smith')
feature_id: Feature identifier (e.g., 'graphiti-chunk-mcp'), optional for general docs
doc_type: Document type (e.g., 'REQ', 'BP', 'TASK', or None for general docs)
file_path: TASK identifier or relative path for general documents
root_dir: Root directory for document resolution
graphiti_client: GraphitiClient instance for Graphiti operations
sync_mode: If True, wait for Graphiti sync to complete (for testing).
If False (default), schedule Graphiti sync in background and return immediately.
Returns:
SyncResult containing statistics about added, updated, deleted, and unchanged chunks.
Note: In background mode (sync_mode=False), the sync is scheduled but may not be
complete when this function returns.
Raises:
FileNotFoundError: If new file or ROOT original file cannot be found
ValueError: If invalid parameters provided
RuntimeError: If Graphiti operations fail
Examples:
>>> # Sync TASK document
>>> async with GraphitiClient(...) as client:
... result = await add_document(
... new_file_path="/path/to/TASK-006-AddDocument.md",
... project_id="knowledge-smith",
... feature_id="graphiti-chunk-mcp",
... doc_type="TASK",
... file_path="006",
... root_dir="/path/to/root",
... graphiti_client=client
... )
... print(f"Added: {len(result.added)}, Updated: {len(result.updated)}")
"""
logger.info(f"Starting add_document: new_file={new_file_path}, "
f"project_id={project_id}, feature_id={feature_id}, "
f"doc_type={doc_type}, file_path={file_path}")
try:
# Step 0: Validate parameters
logger.debug("Step 0: Validating parameters")
# Validate doc_type
if doc_type is not None and doc_type not in VALID_RBT_TYPES:
raise ValueError(
f"Invalid doc_type: '{doc_type}'. "
f"Must be one of {sorted(VALID_RBT_TYPES)} for RBT documents, or None for general documents.\n"
f"For general documents (e.g., todos, guides), set doc_type=None and use file_path "
f"(e.g., file_path='todos/TODO-001.md')."
)
# Step 1: Validate and read new file
logger.debug("Step 1: Reading new file")
if not os.path.exists(new_file_path):
raise FileNotFoundError(f"New file does not exist: {new_file_path}")
with open(new_file_path, 'r', encoding='utf-8') as f:
new_content = f.read()
logger.info(f"Read {len(new_content)} characters from new file")
# Step 1.5: For TASK documents, extract full task identifier from frontmatter if needed
if doc_type == "TASK" and file_path:
# Check if file_path is a simple number (e.g., "001")
# If so, try to extract full task ID from document frontmatter
if file_path.replace("TASK-", "").replace("-", "").isdigit():
logger.debug("Step 1.5: Extracting full TASK identifier from frontmatter")
doc_id = _extract_document_id_from_frontmatter(new_content)
if doc_id and doc_id.startswith("TASK-"):
# Extract the full task identifier (e.g., "TASK-001-AddDocument" → "001-AddDocument")
task_full_id = doc_id.replace("TASK-", "", 1)
logger.info(f"Extracted full TASK identifier from frontmatter: {task_full_id}")
file_path = task_full_id
else:
logger.warning(f"Could not extract TASK ID from frontmatter, using original file_path: {file_path}")
# Step 2: Use PathResolver to locate ROOT original file
logger.debug("Step 2: Resolving ROOT original file")
resolver = PathResolver(root_dir=root_dir)
try:
old_path_info = resolver.resolve(
project_id=project_id,
feature_id=feature_id,
doc_type=doc_type,
file_path=file_path
)
except FileNotFoundError as e:
# Enhance error message
raise FileNotFoundError(
f"Could not find ROOT original file\n"
f"Please check parameters:\n"
f" project_id: {project_id}\n"
f" feature_id: {feature_id}\n"
f" doc_type: {doc_type}\n"
f" file_path: {file_path}\n"
f"Original error: {e}"
) from e
logger.info(f"Resolved ROOT file path: {old_path_info.file_path} (exists={old_path_info.file_exists})")
# Step 3: Read ROOT original file content (if exists)
if not old_path_info.file_exists:
# ROOT file does not exist - treat as new document
logger.info(f"ROOT file does not exist, treating as new document: {old_path_info.file_path}")
old_chunks = []
else:
# ROOT file exists - read and chunk it
logger.debug("Step 3: Reading ROOT original file")
with open(old_path_info.file_path, 'r', encoding='utf-8') as f:
old_content = f.read()
logger.info(f"Read {len(old_content)} characters from ROOT file")
# Chunk ROOT file
logger.debug("Step 3b: Chunking ROOT file")
old_chunks = _chunk_document(
document_content=old_content,
project_id=project_id,
feature_id=feature_id or old_path_info.feature_id,
doc_type=doc_type or old_path_info.doc_type
)
logger.info(f"Created {len(old_chunks)} chunks from ROOT file")
# Step 4: Chunk new file
logger.debug("Step 4: Chunking new file")
new_chunks = _chunk_document(
document_content=new_content,
project_id=project_id,
feature_id=feature_id or old_path_info.feature_id,
doc_type=doc_type or old_path_info.doc_type
)
logger.info(f"Created {len(new_chunks)} chunks from new file")
# Step 5: Compare chunks
logger.debug("Step 5: Comparing chunks")
comparator = ChunkComparator()
sync_result = comparator.compare(old_chunks=old_chunks, new_chunks=new_chunks)
logger.info(f"Comparison result: {len(sync_result.added)} added, "
f"{len(sync_result.updated)} updated, {len(sync_result.deleted)} deleted, "
f"{sync_result.unchanged} unchanged")
# Step 6: Update ROOT file immediately after comparison
logger.debug("Step 6: Updating ROOT file")
_update_root_file(new_file_path, old_path_info.file_path)
logger.info(f"Updated ROOT file: {old_path_info.file_path}")
# Step 7: Synchronize changes to Graphiti
if sync_mode:
# Synchronous mode: Wait for Graphiti sync to complete
logger.debug("Step 7: Synchronizing changes to Graphiti (sync mode)")
await _sync_chunks_to_graphiti(
graphiti_client=graphiti_client,
new_chunks=new_chunks,
sync_result=sync_result,
project_id=project_id
)
logger.info("Successfully synchronized all changes to Graphiti (sync mode)")
else:
# Asynchronous mode: Schedule background synchronization
logger.debug("Step 7: Scheduling background synchronization to Graphiti")
asyncio.create_task(
_sync_chunks_to_graphiti_background(
new_chunks=new_chunks,
sync_result=sync_result,
project_id=project_id
)
)
logger.info("Background synchronization task scheduled")
# Step 8: Return sync results
logger.info(f"add_document completed successfully. Total chunks: {sync_result.total_chunks}")
return sync_result
except FileNotFoundError:
raise
except ValueError:
raise
except Exception as e:
logger.error(f"Unexpected error in add_document: {e}", exc_info=True)
raise RuntimeError(f"Failed to add document: {e}") from e