vector_db_client.py•21.4 kB
"""
IRIS Vector Database Client
Provides a Python wrapper for InterSystems IRIS vector database operations.
Supports connection management, vector insertion, and similarity search using
IRIS native VECTOR(DOUBLE, n) type with COSINE similarity.
Usage:
from vector_db_client import IRISVectorDBClient
client = IRISVectorDBClient(
host="localhost",
port=1972,
namespace="DEMO",
username="_SYSTEM",
password="ISCDEMO"
)
# Insert vectors
client.insert_vector(
resource_id="doc-123",
patient_id="patient-456",
document_type="Progress Note",
text_content="Patient presents with...",
embedding=[0.1, 0.2, ...], # 1024-dim vector
embedding_model="nvidia/nv-embedqa-e5-v5"
)
# Search similar vectors
results = client.search_similar(
query_vector=[0.1, 0.2, ...],
top_k=10
)
Dependencies:
pip install intersystems-iris
"""
import os
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
import logging
try:
import iris
except ImportError:
raise ImportError(
"intersystems-iris package not found. "
"Install with: pip install intersystems-iris"
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IRISVectorDBClient:
"""
Client for InterSystems IRIS vector database operations.
Handles connection management, table creation, vector insertion,
and similarity search using IRIS native VECTOR type.
"""
def __init__(
self,
host: str = "localhost",
port: int = 1972,
namespace: str = "DEMO",
username: str = "_SYSTEM",
password: str = "ISCDEMO",
vector_dimension: int = 1024
):
"""
Initialize IRIS database client.
Args:
host: IRIS server hostname
port: IRIS SQL port (default: 1972)
namespace: IRIS namespace (default: DEMO)
username: Database username
password: Database password
vector_dimension: Vector embedding dimension (default: 1024)
"""
self.host = host
self.port = port
self.namespace = namespace
self.username = username
self.password = password
self.vector_dimension = vector_dimension
self.connection = None
self.cursor = None
logger.info(f"Initialized IRIS client: {host}:{port}/{namespace}")
def connect(self) -> None:
"""
Establish connection to IRIS database.
Raises:
Exception: If connection fails
"""
try:
connection_string = (
f"{self.host}:{self.port}/{self.namespace}"
)
self.connection = iris.connect(
connection_string,
self.username,
self.password
)
self.cursor = self.connection.cursor()
logger.info(f"✓ Connected to IRIS: {connection_string}")
except Exception as e:
logger.error(f"✗ Failed to connect to IRIS: {e}")
raise
def disconnect(self) -> None:
"""Close database connection."""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
logger.info("✓ Disconnected from IRIS")
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
def create_clinical_note_vectors_table(
self,
table_name: str = "ClinicalNoteVectors",
drop_if_exists: bool = False
) -> None:
"""
Create the ClinicalNoteVectors table with VECTOR column.
Args:
table_name: Name of the table to create
drop_if_exists: Whether to drop existing table
Schema:
- ResourceID VARCHAR(255) PRIMARY KEY
- PatientID VARCHAR(255) NOT NULL
- DocumentType VARCHAR(255) NOT NULL
- TextContent VARCHAR(10000)
- SourceBundle VARCHAR(500)
- Embedding VECTOR(DOUBLE, {vector_dimension}) NOT NULL
- EmbeddingModel VARCHAR(100) NOT NULL
- CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- UpdatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
"""
if not self.connection:
self.connect()
full_table_name = f"{self.namespace}.{table_name}"
try:
# Drop table if requested
if drop_if_exists:
drop_sql = f"DROP TABLE IF EXISTS {full_table_name}"
self.cursor.execute(drop_sql)
logger.info(f"✓ Dropped existing table: {full_table_name}")
# Create table with VECTOR column
create_sql = f"""
CREATE TABLE {full_table_name} (
ResourceID VARCHAR(255) PRIMARY KEY,
PatientID VARCHAR(255) NOT NULL,
DocumentType VARCHAR(255) NOT NULL,
TextContent VARCHAR(10000),
SourceBundle VARCHAR(500),
Embedding VECTOR(DOUBLE, {self.vector_dimension}) NOT NULL,
EmbeddingModel VARCHAR(100) NOT NULL,
CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UpdatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
self.cursor.execute(create_sql)
self.connection.commit()
logger.info(f"✓ Created table: {full_table_name}")
logger.info(f" Vector dimension: {self.vector_dimension}")
logger.info(f" Similarity metric: COSINE")
except Exception as e:
logger.error(f"✗ Failed to create table: {e}")
raise
def insert_vector(
self,
resource_id: str,
patient_id: str,
document_type: str,
text_content: str,
embedding: List[float],
embedding_model: str,
source_bundle: Optional[str] = None,
table_name: str = "ClinicalNoteVectors"
) -> None:
"""
Insert a single vector into the database.
Args:
resource_id: Unique resource identifier
patient_id: Patient identifier
document_type: Type of clinical document
text_content: Clinical note text
embedding: Vector embedding (list of floats)
embedding_model: Model used to generate embedding
source_bundle: Optional source FHIR bundle reference
table_name: Target table name
Raises:
ValueError: If vector dimension doesn't match
Exception: If insertion fails
"""
if not self.connection:
self.connect()
# Validate vector dimension
if len(embedding) != self.vector_dimension:
raise ValueError(
f"Vector dimension mismatch: expected {self.vector_dimension}, "
f"got {len(embedding)}"
)
full_table_name = f"{self.namespace}.{table_name}"
# Convert vector to VECTOR literal format
# IRIS expects: TO_VECTOR('[0.1,0.2,0.3,...]', DOUBLE)
vector_str = "[" + ",".join(map(str, embedding)) + "]"
try:
insert_sql = f"""
INSERT INTO {full_table_name} (
ResourceID,
PatientID,
DocumentType,
TextContent,
SourceBundle,
Embedding,
EmbeddingModel
) VALUES (?, ?, ?, ?, ?, TO_VECTOR(?, DOUBLE), ?)
"""
self.cursor.execute(
insert_sql,
(
resource_id,
patient_id,
document_type,
text_content,
source_bundle,
vector_str,
embedding_model
)
)
self.connection.commit()
except Exception as e:
logger.error(f"✗ Failed to insert vector for {resource_id}: {e}")
raise
def insert_vectors_batch(
self,
vectors: List[Dict[str, Any]],
table_name: str = "ClinicalNoteVectors"
) -> Tuple[int, int]:
"""
Insert multiple vectors in a batch.
Args:
vectors: List of vector dictionaries with keys:
- resource_id
- patient_id
- document_type
- text_content
- embedding
- embedding_model
- source_bundle (optional)
table_name: Target table name
Returns:
Tuple of (successful_count, failed_count)
"""
if not self.connection:
self.connect()
success_count = 0
failed_count = 0
for vector_data in vectors:
try:
self.insert_vector(
resource_id=vector_data["resource_id"],
patient_id=vector_data["patient_id"],
document_type=vector_data["document_type"],
text_content=vector_data["text_content"],
embedding=vector_data["embedding"],
embedding_model=vector_data["embedding_model"],
source_bundle=vector_data.get("source_bundle"),
table_name=table_name
)
success_count += 1
except Exception as e:
logger.warning(f"Failed to insert {vector_data.get('resource_id')}: {e}")
failed_count += 1
logger.info(f"✓ Batch insert: {success_count} successful, {failed_count} failed")
return success_count, failed_count
def search_similar(
self,
query_vector: List[float],
top_k: int = 10,
patient_id: Optional[str] = None,
document_type: Optional[str] = None,
table_name: str = "ClinicalNoteVectors"
) -> List[Dict[str, Any]]:
"""
Search for similar vectors using COSINE similarity.
Args:
query_vector: Query embedding vector
top_k: Number of results to return
patient_id: Optional filter by patient ID
document_type: Optional filter by document type
table_name: Table to search
Returns:
List of dictionaries containing:
- resource_id
- patient_id
- document_type
- text_content
- similarity (0-1, higher is more similar)
Raises:
ValueError: If vector dimension doesn't match
"""
if not self.connection:
self.connect()
# Validate vector dimension
if len(query_vector) != self.vector_dimension:
raise ValueError(
f"Query vector dimension mismatch: expected {self.vector_dimension}, "
f"got {len(query_vector)}"
)
full_table_name = f"{self.namespace}.{table_name}"
# Convert query vector to VECTOR literal
vector_str = "[" + ",".join(map(str, query_vector)) + "]"
# Build query with optional filters
where_clauses = []
params = []
if patient_id:
where_clauses.append("PatientID = ?")
params.append(patient_id)
if document_type:
where_clauses.append("DocumentType = ?")
params.append(document_type)
where_sql = ""
if where_clauses:
where_sql = "WHERE " + " AND ".join(where_clauses)
# VECTOR_COSINE returns similarity (0-1, higher is better)
search_sql = f"""
SELECT TOP {top_k}
ResourceID,
PatientID,
DocumentType,
TextContent,
SourceBundle,
VECTOR_COSINE(Embedding, TO_VECTOR('{vector_str}', DOUBLE)) AS Similarity
FROM {full_table_name}
{where_sql}
ORDER BY Similarity DESC
"""
try:
if params:
self.cursor.execute(search_sql, params)
else:
self.cursor.execute(search_sql)
results = []
for row in self.cursor.fetchall():
results.append({
"resource_id": row[0],
"patient_id": row[1],
"document_type": row[2],
"text_content": row[3],
"source_bundle": row[4],
"similarity": float(row[5])
})
logger.info(f"✓ Found {len(results)} similar vectors (top {top_k})")
return results
except Exception as e:
logger.error(f"✗ Vector search failed: {e}")
raise
def count_vectors(self, table_name: str = "ClinicalNoteVectors") -> int:
"""
Count total vectors in table.
Args:
table_name: Table to count
Returns:
Number of vectors
"""
if not self.connection:
self.connect()
full_table_name = f"{self.namespace}.{table_name}"
count_sql = f"SELECT COUNT(*) FROM {full_table_name}"
try:
self.cursor.execute(count_sql)
count = self.cursor.fetchone()[0]
return int(count)
except Exception as e:
logger.error(f"✗ Count failed: {e}")
raise
def get_vector_stats(
self,
table_name: str = "ClinicalNoteVectors"
) -> Dict[str, Any]:
"""
Get statistics about vectors in table.
Args:
table_name: Table to analyze
Returns:
Dictionary with statistics:
- total_vectors
- unique_patients
- unique_document_types
- document_type_counts
"""
if not self.connection:
self.connect()
full_table_name = f"{self.namespace}.{table_name}"
try:
# Total vectors
total = self.count_vectors(table_name)
# Unique patients
self.cursor.execute(
f"SELECT COUNT(DISTINCT PatientID) FROM {full_table_name}"
)
unique_patients = int(self.cursor.fetchone()[0])
# Document type breakdown
self.cursor.execute(f"""
SELECT DocumentType, COUNT(*) as count
FROM {full_table_name}
GROUP BY DocumentType
ORDER BY count DESC
""")
doc_type_counts = {}
for row in self.cursor.fetchall():
doc_type_counts[row[0]] = int(row[1])
stats = {
"total_vectors": total,
"unique_patients": unique_patients,
"unique_document_types": len(doc_type_counts),
"document_type_counts": doc_type_counts
}
logger.info(f"✓ Vector stats: {total} vectors, {unique_patients} patients")
return stats
except Exception as e:
logger.error(f"✗ Stats query failed: {e}")
raise
def insert_image_vector(
self,
image_id: str,
patient_id: str,
study_type: str,
image_path: str,
embedding: List[float],
related_report_id: Optional[str] = None,
table_name: str = "MedicalImageVectors"
) -> None:
"""
Insert an image vector into the database.
Args:
image_id: Unique image identifier
patient_id: Patient identifier
study_type: Type of medical imaging study (e.g., 'Chest X-Ray', 'CT Scan')
image_path: Path to the image file
embedding: Vector embedding (list of floats)
related_report_id: Optional reference to related clinical note
table_name: Target table name
Raises:
ValueError: If vector dimension doesn't match
Exception: If insertion fails
"""
if not self.connection:
self.connect()
# Validate vector dimension
if len(embedding) != self.vector_dimension:
raise ValueError(
f"Vector dimension mismatch: expected {self.vector_dimension}, "
f"got {len(embedding)}"
)
full_table_name = f"{self.namespace}.{table_name}"
# Convert vector to VECTOR literal format
vector_str = "[" + ",".join(map(str, embedding)) + "]"
try:
insert_sql = f"""
INSERT INTO {full_table_name} (
ImageID,
PatientID,
StudyType,
ImagePath,
Embedding,
RelatedReportID
) VALUES (?, ?, ?, ?, TO_VECTOR(?, DOUBLE), ?)
"""
self.cursor.execute(
insert_sql,
(
image_id,
patient_id,
study_type,
image_path,
vector_str,
related_report_id
)
)
self.connection.commit()
except Exception as e:
logger.error(f"✗ Failed to insert image vector for {image_id}: {e}")
raise
def search_similar_images(
self,
query_vector: List[float],
top_k: int = 10,
patient_id: Optional[str] = None,
study_type: Optional[str] = None,
table_name: str = "MedicalImageVectors"
) -> List[Dict[str, Any]]:
"""
Search for similar images using COSINE similarity.
Args:
query_vector: Query embedding vector
top_k: Number of results to return
patient_id: Optional filter by patient ID
study_type: Optional filter by study type
table_name: Table to search
Returns:
List of dictionaries containing:
- image_id
- patient_id
- study_type
- image_path
- related_report_id
- similarity (0-1, higher is more similar)
Raises:
ValueError: If vector dimension doesn't match
"""
if not self.connection:
self.connect()
# Validate vector dimension
if len(query_vector) != self.vector_dimension:
raise ValueError(
f"Query vector dimension mismatch: expected {self.vector_dimension}, "
f"got {len(query_vector)}"
)
full_table_name = f"{self.namespace}.{table_name}"
# Convert query vector to VECTOR literal
vector_str = "[" + ",".join(map(str, query_vector)) + "]"
# Build query with optional filters
where_clauses = []
params = []
if patient_id:
where_clauses.append("PatientID = ?")
params.append(patient_id)
if study_type:
where_clauses.append("StudyType = ?")
params.append(study_type)
where_sql = ""
if where_clauses:
where_sql = "WHERE " + " AND ".join(where_clauses)
# VECTOR_COSINE returns similarity (0-1, higher is better)
search_sql = f"""
SELECT TOP {top_k}
ImageID,
PatientID,
StudyType,
ImagePath,
RelatedReportID,
VECTOR_COSINE(Embedding, TO_VECTOR('{vector_str}', DOUBLE)) AS Similarity
FROM {full_table_name}
{where_sql}
ORDER BY Similarity DESC
"""
try:
if params:
self.cursor.execute(search_sql, params)
else:
self.cursor.execute(search_sql)
results = []
for row in self.cursor.fetchall():
results.append({
"image_id": row[0],
"patient_id": row[1],
"study_type": row[2],
"image_path": row[3],
"related_report_id": row[4],
"similarity": float(row[5])
})
logger.info(f"✓ Found {len(results)} similar images (top {top_k})")
return results
except Exception as e:
logger.error(f"✗ Image vector search failed: {e}")
raise
# Example usage
if __name__ == "__main__":
# Example: Connect and query
client = IRISVectorDBClient()
with client:
# Create table
# client.create_clinical_note_vectors_table(drop_if_exists=True)
# Get stats
stats = client.get_vector_stats()
print(f"\nVector Database Stats:")
print(f" Total vectors: {stats['total_vectors']:,}")
print(f" Unique patients: {stats['unique_patients']:,}")
print(f" Document types: {stats['unique_document_types']}")
for doc_type, count in stats['document_type_counts'].items():
print(f" - {doc_type}: {count:,}")