create_relations
Adds multiple new connections between entities in a knowledge graph to establish relationships using active voice descriptions.
Instructions
Create multiple new relations between entities in the knowledge graph. Relations should be in active voice
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| relations | Yes |
Implementation Reference
- The actual implementation of the create_relations logic that performs batch insertion into the SQLite database.
async def create_relations( self, relations: List[Dict[str, Any]], batch_size: int = 1000 ) -> List[Dict[str, Any]]: """Create multiple new relations in batches.""" created_relations = [] async with self.pool.get_connection() as conn: async with self.pool.transaction(conn): for i in range(0, len(relations), batch_size): batch = relations[i:i + batch_size] relation_objects = [Relation.from_dict(r) for r in batch] # Verify entities exist for relation in relation_objects: cursor = await conn.execute( "SELECT 1 FROM entities WHERE name = ?", (sanitize_input(relation.from_),) ) if not await cursor.fetchone(): raise EntityNotFoundError(relation.from_) cursor = await conn.execute( "SELECT 1 FROM entities WHERE name = ?", (sanitize_input(relation.to),) ) if not await cursor.fetchone(): raise EntityNotFoundError(relation.to) # Insert batch await conn.executemany( """ INSERT INTO relations (from_entity, to_entity, relation_type) VALUES (?, ?, ?) ON CONFLICT DO NOTHING """, [(r.from_, r.to, r.relationType) for r in relation_objects] ) created_relations.extend([r.to_dict() for r in relation_objects]) return created_relations - optimized_memory_mcp_server/api/tools.py:28-28 (registration)Tool registration mapping "create_relations" to the dynamic handler.
"create_relations": lambda args: handle_tool("create_relations", args),