Skip to main content
Glama
edge_inserter.py3.69 kB
"""Shared edge insertion utilities with bulk + fallback strategy.""" from dataclasses import dataclass from typing import List, Dict, Any import logging try: import kuzu except ImportError: raise ImportError("kuzu package not available. Install with: pip install kuzu") logger = logging.getLogger(__name__) @dataclass class EdgeInsertionResult: """Result of edge insertion operation.""" inserted_count: int failed_count: int = 0 class EdgeInserter: """Unified edge insertion logic with configurable behavior.""" @staticmethod def insert_edges( conn: kuzu.Connection, edges_data: List[Dict[str, Any]], *, serialize_metadata: bool = False, verbose_logging: bool = False ) -> EdgeInsertionResult: """ Insert edges with automatic bulk→individual fallback. Args: conn: KuzuDB connection edges_data: Edge dictionaries serialize_metadata: Convert dict metadata to JSON strings (for COPY FROM) verbose_logging: Log detailed edge information Returns: EdgeInsertionResult with counts """ import pandas as pd import json if not edges_data: logger.debug("No edges to insert") return EdgeInsertionResult(inserted_count=0) # Optional: JSON serialization if serialize_metadata: for edge_data in edges_data: if 'metadata' in edge_data and isinstance(edge_data['metadata'], dict): edge_data['metadata'] = json.dumps(edge_data['metadata']) elif 'metadata' not in edge_data or edge_data['metadata'] is None: edge_data['metadata'] = '{}' # Optional: Verbose logging if verbose_logging: for ed in edges_data: if ed.get('type') == 'INHERITS': logger.info( f"Inserting INHERITS edge: " f"subject={ed['subject_frame_id'][:16]} -> " f"object={ed['object_frame_id'][:16]}" ) # Attempt bulk insert try: df = pd.DataFrame(edges_data) if verbose_logging: logger.info(f"Attempting to insert {len(edges_data)} edges via COPY Edge") logger.info(f"DataFrame columns: {list(df.columns)}") logger.info(f"DataFrame shape: {df.shape}") conn.execute("COPY Edge FROM $df", {'df': df}) logger.debug(f"Bulk inserted {len(edges_data)} edges") return EdgeInsertionResult(inserted_count=len(edges_data)) except Exception as e: logger.error(f"Bulk edge insertion failed: {e}") return EdgeInserter._insert_individually(conn, edges_data) @staticmethod def _insert_individually( conn: kuzu.Connection, edges_data: List[Dict[str, Any]] ) -> EdgeInsertionResult: """Fallback: insert edges one by one.""" import pandas as pd successful = 0 failed = 0 for edge_data in edges_data: try: df = pd.DataFrame([edge_data]) conn.execute("COPY Edge FROM $df", {'df': df}) successful += 1 except Exception as e: logger.error( f"Failed to insert edge {edge_data.get('type', 'UNKNOWN')}: {e}" ) failed += 1 logger.info(f"Individual edge insertion: {successful} successful, {failed} failed") return EdgeInsertionResult(inserted_count=successful, failed_count=failed)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server