analyze_document
Parse, score, and decide on collected documents to prepare high-quality indexable artifacts for RAG implementation.
Instructions
Parse, score and decide on a collected document.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| kwargs | Yes |
Implementation Reference
- kfabric/mcp/registry.py:146-153 (handler)The MCP tool handler for 'analyze_document' which maps the orchestrator result to the tool output structure.
def _analyze_document(orchestrator: Orchestrator, _db: Session, _settings: AppSettings, arguments: dict[str, Any]) -> Any: result = orchestrator.analyze_document(arguments["document_id"]) return { "parsed_document_id": result["parsed_document"].id, "decision_status": result["decision"].status, "global_score": result["score"].global_score, "salvaged_fragment_ids": [fragment.id for fragment in result["fragments"]], } - kfabric/mcp/registry.py:223-232 (registration)Registration of the 'analyze_document' tool within the MCP registry.
ToolDefinition( name="analyze_document", title="Analyze Document", description="Parse, score and decide on a collected document.", version="1.0.0", input_schema={"type": "object", "properties": {"document_id": {"type": "string"}}, "required": ["document_id"]}, output_schema={"type": "object"}, security=common_security, handler=_analyze_document, ), - kfabric/services/orchestrator.py:138-170 (handler)The core service logic that performs the document analysis (parsing, scoring, and decision-making).
def analyze_document(self, document_id: str) -> dict[str, Any]: collected = self.session.get(CollectedDocument, document_id) if not collected: raise ValueError(f"Collected document {document_id} not found") candidate = collected.candidate query = candidate.query parsed_payload = parse_document(collected.raw_content, collected.content_type, candidate) parsed = ParsedDocument(collected_document_id=collected.id, **parsed_payload) self.session.add(parsed) self.session.flush() query_terms = query.expansion_text.split() if query.expansion_text else [] score_payload = score_document(parsed.normalized_text, candidate.domain, query_terms, parsed.headings, self.settings) score = DocumentScore(parsed_document_id=parsed.id, **score_payload) self.session.add(score) self.session.flush() fragments_payload = salvage_fragments(parsed.normalized_text, query_terms, self.settings) saved_fragments: list[SalvagedFragment] = [] for fragment_payload in fragments_payload: fragment = SalvagedFragment( parsed_document_id=parsed.id, query_id=query.id, **fragment_payload, ) self.session.add(fragment) saved_fragments.append(fragment) self.session.flush() decision_status, rejection_reason = self._decide_document(score.global_score, saved_fragments) decision = DocumentDecision( parsed_document_id=parsed.id, status=decision_status,