feedback_enrichment_minimal_example.py•3.12 kB
import asyncio
import cognee
from cognee.api.v1.search import SearchType
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.feedback.extract_feedback_interactions import extract_feedback_interactions
from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers
from cognee.tasks.feedback.create_enrichments import create_enrichments
from cognee.tasks.feedback.link_enrichments_to_feedback import link_enrichments_to_feedback
CONVERSATION = [
    "Alice: Hey, Bob. Did you talk to Mallory?",
    "Bob: Yeah, I just saw her before coming here.",
    "Alice: Then she told you to bring my documents, right?",
    "Bob: Uh… not exactly. She said you wanted me to bring you donuts. Which sounded kind of odd…",
    "Alice: Ugh, she’s so annoying. Thanks for the donuts anyway!",
]
async def initialize_conversation_and_graph(conversation):
    """Prune data/system, add conversation, cognify."""
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)
    await cognee.add(conversation)
    await cognee.cognify()
async def run_question_and_submit_feedback(question_text: str) -> bool:
    """Ask question, submit feedback based on correctness, and return correctness flag."""
    result = await cognee.search(
        query_type=SearchType.GRAPH_COMPLETION,
        query_text=question_text,
        save_interaction=True,
    )
    answer_text = str(result).lower()
    mentions_mallory = "mallory" in answer_text
    feedback_text = (
        "Great answers, very helpful!"
        if mentions_mallory
        else "The answer about Bob and donuts was wrong."
    )
    await cognee.search(
        query_type=SearchType.FEEDBACK,
        query_text=feedback_text,
        last_k=1,
    )
    return mentions_mallory
async def run_feedback_enrichment_memify(last_n: int = 5):
    """Execute memify with extraction, answer improvement, enrichment creation, and graph processing tasks."""
    # Instantiate tasks with their own kwargs
    extraction_tasks = [Task(extract_feedback_interactions, last_n=last_n)]
    enrichment_tasks = [
        Task(generate_improved_answers, top_k=20),
        Task(create_enrichments),
        Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 10}),
        Task(add_data_points, task_config={"batch_size": 10}),
        Task(link_enrichments_to_feedback),
    ]
    await cognee.memify(
        extraction_tasks=extraction_tasks,
        enrichment_tasks=enrichment_tasks,
        data=[{}],  # A placeholder to prevent fetching the entire graph
        dataset="feedback_enrichment_minimal",
    )
async def main():
    await initialize_conversation_and_graph(CONVERSATION)
    is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?")
    if not is_correct:
        await run_feedback_enrichment_memify(last_n=5)
if __name__ == "__main__":
    asyncio.run(main())