Skip to main content
Glama

cognee-mcp

product_recommendation.py5.72 kB
import os import json import asyncio from neo4j import exceptions from cognee import prune # from cognee import visualize_graph from cognee.infrastructure.databases.graph import get_graph_engine from cognee.low_level import setup, DataPoint from cognee.pipelines import run_tasks, Task from cognee.tasks.storage import add_data_points class Products(DataPoint): name: str = "Products" products_aggregator_node = Products() class Product(DataPoint): id: str name: str type: str price: float colors: list[str] is_type: Products = products_aggregator_node class Preferences(DataPoint): name: str = "Preferences" preferences_aggregator_node = Preferences() class Preference(DataPoint): id: str name: str value: str is_type: Preferences = preferences_aggregator_node class Customers(DataPoint): name: str = "Customers" customers_aggregator_node = Customers() class Customer(DataPoint): id: str name: str has_preference: list[Preference] purchased: list[Product] liked: list[Product] is_type: Customers = customers_aggregator_node def ingest_files(): customers_file_path = os.path.join(os.path.dirname(__file__), "customers.json") customers = json.loads(open(customers_file_path, "r").read()) customers_data_points = {} products_data_points = {} preferences_data_points = {} for customer in customers: new_customer = Customer( id=customer["id"], name=customer["name"], liked=[], purchased=[], has_preference=[], ) customers_data_points[customer["name"]] = new_customer for product in customer["products"]: if product["id"] not in products_data_points: products_data_points[product["id"]] = Product( id=product["id"], type=product["type"], name=product["name"], price=product["price"], colors=product["colors"], ) new_product = products_data_points[product["id"]] if product["action"] == "purchased": new_customer.purchased.append(new_product) elif product["action"] == "liked": new_customer.liked.append(new_product) for preference in customer["preferences"]: if preference["id"] not in preferences_data_points: preferences_data_points[preference["id"]] = Preference( id=preference["id"], name=preference["name"], value=preference["value"], ) new_preference = preferences_data_points[preference["id"]] new_customer.has_preference.append(new_preference) return customers_data_points.values() async def main(): await prune.prune_data() await prune.prune_system(metadata=True) await setup() pipeline = run_tasks([Task(ingest_files), Task(add_data_points)]) async for status in pipeline: print(status) graph_engine = await get_graph_engine() products_results = await graph_engine.query( """ // Step 1: Use new customers's preferences from input UNWIND $preferences AS pref_input // Step 2: Find other customers who have these preferences MATCH (other_customer:Customer)-[:has_preference]->(preference:Preference) WHERE preference.value = pref_input WITH other_customer, count(preference) AS similarity_score // Step 3: Limit to the top-N most similar customers ORDER BY similarity_score DESC LIMIT 5 // Step 4: Get products that these similar customers have purchased MATCH (other_customer)-[:purchased]->(product:Product) // Step 5: Rank products based on frequency RETURN product, count(*) AS recommendation_score ORDER BY recommendation_score DESC LIMIT 10 """, { "preferences": ["White", "Navy Blue", "Regular Sneakers"], }, ) print("Top 10 recommended products:") for result in products_results: print(f"{result['product']['id']}: {result['product']['name']}") try: await graph_engine.query( """ // Match the customer and their stored shoe size preference MATCH (customer:Customer {id: $customer_id}) OPTIONAL MATCH (customer)-[:has_preference]->(preference:Preference {name: 'ShoeSize'}) // Assume the new shoe size is passed as a parameter $new_size WITH customer, preference, $new_size AS new_size // If a stored preference exists and it does not match the new value, // raise an error using APOC's utility procedure. CALL apoc.util.validate( preference IS NOT NULL AND preference.value <> new_size, "Conflicting shoe size preference: existing size is " + preference.value + " and new size is " + new_size, [] ) // If no conflict, continue with the update or further processing // ... RETURN customer """, { "customer_id": "customer_1", "new_size": "42", }, ) except exceptions.ClientError as error: print(f"Anomaly detected: {str(error.message)}") # # Or use our simple graph preview # graph_file_path = str( # os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html") # ) # await visualize_graph(graph_file_path) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/topoteretes/cognee'

If you have feedback or need assistance with the MCP directory API, please join our Discord server