Skip to main content
Glama
database.py11.8 kB
#!/usr/bin/env python3 """ Database layer for NSCCN - manages SQLite database with entities, edges, and skeletons. """ import sqlite3 import json import logging from pathlib import Path from typing import Optional, List, Dict, Any, Tuple import numpy as np logger = logging.getLogger(__name__) class NSCCNDatabase: """Database manager for NSCCN code graph and cache.""" def __init__(self, db_path: str = "nsccn.db"): """Initialize database connection and create tables if needed.""" self.db_path = Path(db_path) self.conn: Optional[sqlite3.Connection] = None self._initialize() def _initialize(self): """Initialize database connection and create schema.""" self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row self._create_schema() logger.info(f"Database initialized: {self.db_path}") def _create_schema(self): """Create database schema with entities, edges, and skeletons tables.""" cursor = self.conn.cursor() # Entities table (graph nodes) cursor.execute(""" CREATE TABLE IF NOT EXISTS entities ( id TEXT PRIMARY KEY, type TEXT NOT NULL, file_path TEXT NOT NULL, name TEXT NOT NULL, start_line INTEGER, end_line INTEGER, signature TEXT, docstring TEXT, embedding BLOB, last_updated REAL ) """) # Edges table (causal relationships) cursor.execute(""" CREATE TABLE IF NOT EXISTS edges ( source_id TEXT NOT NULL, relation TEXT NOT NULL, target_id TEXT NOT NULL, context TEXT, PRIMARY KEY (source_id, relation, target_id), FOREIGN KEY(source_id) REFERENCES entities(id), FOREIGN KEY(target_id) REFERENCES entities(id) ) """) # Create indexes for efficient querying cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_entities_file ON entities(file_path) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_edges_relation ON edges(relation) """) # Skeletons cache table cursor.execute(""" CREATE TABLE IF NOT EXISTS skeletons ( file_path TEXT PRIMARY KEY, content TEXT, last_modified REAL ) """) self.conn.commit() logger.debug("Database schema created") def upsert_entity(self, entity: Dict[str, Any]) -> None: """Insert or update an entity in the database.""" cursor = self.conn.cursor() # Convert embedding to bytes if present embedding_blob = None if entity.get('embedding') is not None: embedding_blob = entity['embedding'].tobytes() cursor.execute(""" INSERT OR REPLACE INTO entities (id, type, file_path, name, start_line, end_line, signature, docstring, embedding, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entity['id'], entity['type'], entity['file_path'], entity['name'], entity.get('start_line'), entity.get('end_line'), entity.get('signature'), entity.get('docstring'), embedding_blob, entity.get('last_updated') )) self.conn.commit() def upsert_entities_batch(self, entities: List[Dict[str, Any]]) -> None: """Batch insert or update entities.""" cursor = self.conn.cursor() data = [] for entity in entities: embedding_blob = None if entity.get('embedding') is not None: embedding_blob = entity['embedding'].tobytes() data.append(( entity['id'], entity['type'], entity['file_path'], entity['name'], entity.get('start_line'), entity.get('end_line'), entity.get('signature'), entity.get('docstring'), embedding_blob, entity.get('last_updated') )) cursor.executemany(""" INSERT OR REPLACE INTO entities (id, type, file_path, name, start_line, end_line, signature, docstring, embedding, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, data) self.conn.commit() def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]: """Retrieve an entity by ID.""" cursor = self.conn.cursor() cursor.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)) row = cursor.fetchone() if row is None: return None entity = dict(row) # Convert embedding from bytes to numpy array if entity['embedding']: entity['embedding'] = np.frombuffer(entity['embedding'], dtype=np.float32) return entity def get_entities_by_file(self, file_path: str) -> List[Dict[str, Any]]: """Get all entities for a specific file.""" cursor = self.conn.cursor() cursor.execute("SELECT * FROM entities WHERE file_path = ?", (file_path,)) rows = cursor.fetchall() entities = [] for row in rows: entity = dict(row) if entity['embedding']: entity['embedding'] = np.frombuffer(entity['embedding'], dtype=np.float32) entities.append(entity) return entities def delete_entities_by_file(self, file_path: str) -> None: """Delete all entities for a specific file.""" cursor = self.conn.cursor() cursor.execute("DELETE FROM entities WHERE file_path = ?", (file_path,)) self.conn.commit() def upsert_edge(self, source_id: str, relation: str, target_id: str, context: Optional[str] = None) -> None: """Insert or update an edge in the database.""" cursor = self.conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO edges (source_id, relation, target_id, context) VALUES (?, ?, ?, ?) """, (source_id, relation, target_id, context)) self.conn.commit() def upsert_edges_batch(self, edges: List[Tuple[str, str, str, Optional[str]]]) -> None: """Batch insert or update edges.""" cursor = self.conn.cursor() cursor.executemany(""" INSERT OR REPLACE INTO edges (source_id, relation, target_id, context) VALUES (?, ?, ?, ?) """, edges) self.conn.commit() def get_edges_by_source(self, source_id: str, relation: Optional[str] = None) -> List[Dict[str, Any]]: """Get all edges originating from a source entity.""" cursor = self.conn.cursor() if relation: cursor.execute( "SELECT * FROM edges WHERE source_id = ? AND relation = ?", (source_id, relation) ) else: cursor.execute("SELECT * FROM edges WHERE source_id = ?", (source_id,)) return [dict(row) for row in cursor.fetchall()] def get_edges_by_target(self, target_id: str, relation: Optional[str] = None) -> List[Dict[str, Any]]: """Get all edges pointing to a target entity.""" cursor = self.conn.cursor() if relation: cursor.execute( "SELECT * FROM edges WHERE target_id = ? AND relation = ?", (target_id, relation) ) else: cursor.execute("SELECT * FROM edges WHERE target_id = ?", (target_id,)) return [dict(row) for row in cursor.fetchall()] def delete_edges_by_source(self, source_id: str) -> None: """Delete all edges originating from a source entity.""" cursor = self.conn.cursor() cursor.execute("DELETE FROM edges WHERE source_id = ?", (source_id,)) self.conn.commit() def upsert_skeleton(self, file_path: str, content: str, last_modified: float) -> None: """Insert or update a skeleton in the cache.""" cursor = self.conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO skeletons (file_path, content, last_modified) VALUES (?, ?, ?) """, (file_path, content, last_modified)) self.conn.commit() def get_skeleton(self, file_path: str) -> Optional[Dict[str, Any]]: """Retrieve a skeleton from the cache.""" cursor = self.conn.cursor() cursor.execute("SELECT * FROM skeletons WHERE file_path = ?", (file_path,)) row = cursor.fetchone() if row is None: return None return dict(row) def delete_skeleton(self, file_path: str) -> None: """Delete a skeleton from the cache.""" cursor = self.conn.cursor() cursor.execute("DELETE FROM skeletons WHERE file_path = ?", (file_path,)) self.conn.commit() def search_entities_by_embedding(self, query_embedding: np.ndarray, limit: int = 10) -> List[Dict[str, Any]]: """ Search entities by embedding similarity (cosine similarity). Returns entities sorted by similarity score. """ cursor = self.conn.cursor() cursor.execute("SELECT id, embedding FROM entities WHERE embedding IS NOT NULL") rows = cursor.fetchall() results = [] query_norm = np.linalg.norm(query_embedding) for row in rows: entity_id = row['id'] embedding_bytes = row['embedding'] entity_embedding = np.frombuffer(embedding_bytes, dtype=np.float32) # Cosine similarity entity_norm = np.linalg.norm(entity_embedding) if entity_norm > 0 and query_norm > 0: similarity = np.dot(query_embedding, entity_embedding) / (query_norm * entity_norm) results.append({'id': entity_id, 'score': float(similarity)}) # Sort by similarity (descending) and limit results.sort(key=lambda x: x['score'], reverse=True) # Get full entity details for top results top_results = [] for result in results[:limit]: entity = self.get_entity(result['id']) if entity: entity['score'] = result['score'] top_results.append(entity) return top_results def get_all_entities(self) -> List[Dict[str, Any]]: """Get all entities from the database.""" cursor = self.conn.cursor() cursor.execute("SELECT * FROM entities") rows = cursor.fetchall() entities = [] for row in rows: entity = dict(row) if entity['embedding']: entity['embedding'] = np.frombuffer(entity['embedding'], dtype=np.float32) entities.append(entity) return entities def close(self): """Close the database connection.""" if self.conn: self.conn.close() logger.info("Database connection closed")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itstanner5216/EliteMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server