Skip to main content
Glama
build-knowledge-graph-aws.py5.95 kB
#!/usr/bin/env python3 """ Build knowledge graph on AWS using iris-vector-rag v0.5.4 GraphRAG pipeline This script uses the iris-vector-rag abstraction layer instead of raw SQL: - ConfigurationManager for settings - IRISVectorStore for vector operations - Entity extraction pipelines - Schema management for table creation """ import os import sys from pathlib import Path # Set environment variables for iris-vector-rag os.environ['VECTOR_DIMENSION'] = '1024' os.environ['IRIS_HOST'] = '3.84.250.46' os.environ['IRIS_PORT'] = '1972' os.environ['IRIS_NAMESPACE'] = '%SYS' os.environ['IRIS_USER'] = '_SYSTEM' os.environ['IRIS_PASSWORD'] = 'SYS' try: from iris_vector_rag.config.manager import ConfigurationManager from iris_vector_rag.storage.vector_store_iris import IRISVectorStore from iris_vector_rag.core.connection import ConnectionManager from iris_vector_rag.core.models import Document from iris_vector_rag.pipelines import GraphRAGPipeline print("✅ iris-vector-rag imports successful") except ImportError as e: print(f"❌ Failed to import iris-vector-rag: {e}") print("Run: pip install iris-vector-rag") sys.exit(1) import intersystems_iris.dbapi._DBAPI as iris import json def main(): """Build knowledge graph using iris-vector-rag pipeline.""" print("="*70) print("AWS GraphRAG Knowledge Graph Build") print("Using iris-vector-rag v0.5.4 Pipeline") print("="*70) # Use AWS config config_path = Path(__file__).parent.parent.parent / "config" / "fhir_graphrag_config.aws.yaml" if not config_path.exists(): print(f"❌ Config file not found: {config_path}") return 1 print(f"\n→ Loading configuration from {config_path}") config = ConfigurationManager(config_path=str(config_path)) print(f"✅ Configuration loaded") print(f" Host: {config.get('database:iris:host')}") print(f" Namespace: {config.get('database:iris:namespace')}") print(f" Vector dimension: {config.get('storage:vector_dimension')}") # Initialize vector store print(f"\n→ Initializing IRISVectorStore...") vector_store = IRISVectorStore(config_manager=config) print(f"✅ Vector store initialized") print(f" Table: {vector_store.table_name}") print(f" Dimension: {vector_store.vector_dimension}") # Load FHIR documents from AWS print(f"\n→ Loading FHIR documents from AWS IRIS...") conn = iris.connect( hostname='3.84.250.46', port=1972, namespace='%SYS', username='_SYSTEM', password='SYS' ) cursor = conn.cursor() cursor.execute(""" SELECT FHIRResourceId, ResourceString FROM SQLUser.FHIRDocuments ORDER BY ID """) rows = cursor.fetchall() documents = [] for fhir_id, resource_string in rows: try: fhir_json = json.loads(resource_string) # Extract hex-encoded clinical note text = "" if 'content' in fhir_json and len(fhir_json['content']) > 0: content = fhir_json['content'][0] if 'attachment' in content and 'data' in content['attachment']: try: decoded_bytes = bytes.fromhex(content['attachment']['data']) text = decoded_bytes.decode('utf-8') except Exception as e: print(f"⚠️ Failed to decode document {fhir_id}: {e}") continue if text: # Create Document object for iris-vector-rag doc = Document( page_content=text, id=str(fhir_id), metadata={ 'resource_type': 'DocumentReference', 'fhir_id': fhir_id } ) documents.append(doc) except Exception as e: print(f"⚠️ Error processing document {fhir_id}: {e}") continue cursor.close() conn.close() print(f"✅ Loaded {len(documents)} FHIR documents") if len(documents) == 0: print("❌ No documents to process") return 1 # Show sample document print(f"\n→ Sample document:") print(f" ID: {documents[0].id}") print(f" Content preview: {documents[0].page_content[:100]}...") # Initialize GraphRAG pipeline print(f"\n→ Initializing GraphRAG pipeline...") try: pipeline = GraphRAGPipeline( config_manager=config, vector_store=vector_store ) print(f"✅ GraphRAG pipeline initialized") except Exception as e: print(f"❌ Failed to initialize pipeline: {e}") import traceback traceback.print_exc() return 1 # Ingest documents into knowledge graph print(f"\n→ Ingesting {len(documents)} documents into knowledge graph...") try: pipeline.ingest(documents) print(f"✅ Documents ingested successfully") except Exception as e: print(f"❌ Failed to ingest documents: {e}") import traceback traceback.print_exc() return 1 # Test query print(f"\n→ Testing knowledge graph query...") try: results = pipeline.query("chest pain", top_k=3) print(f"✅ Query successful!") print(f" Found {len(results)} results") for i, result in enumerate(results[:3], 1): print(f" {i}. {result.get('content', '')[:80]}...") except Exception as e: print(f"⚠️ Query failed: {e}") # Non-fatal - knowledge graph may still be built print(f"\n{'='*70}") print(f"✅ GraphRAG Knowledge Graph Complete!") print(f"{'='*70}") print(f"Documents processed: {len(documents)}") print(f"Using iris-vector-rag v0.5.4 GraphRAG pipeline") return 0 if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isc-tdyar/medical-graphrag-assistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server