azure_openai_neo4j.py•8.53 kB
"""
Copyright 2025, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from logging import INFO
from dotenv import load_dotenv
from openai import AsyncOpenAI
from graphiti_core import Graphiti
from graphiti_core.embedder.azure_openai import AzureOpenAIEmbedderClient
from graphiti_core.llm_client.azure_openai_client import AzureOpenAILLMClient
from graphiti_core.llm_client.config import LLMConfig
from graphiti_core.nodes import EpisodeType
#################################################
# CONFIGURATION
#################################################
# Set up logging and environment variables for
# connecting to Neo4j database and Azure OpenAI
#################################################
# Configure logging
logging.basicConfig(
level=INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger(__name__)
load_dotenv()
# Neo4j connection parameters
# Make sure Neo4j Desktop is running with a local DBMS started
neo4j_uri = os.environ.get('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.environ.get('NEO4J_USER', 'neo4j')
neo4j_password = os.environ.get('NEO4J_PASSWORD', 'password')
# Azure OpenAI connection parameters
azure_endpoint = os.environ.get('AZURE_OPENAI_ENDPOINT')
azure_api_key = os.environ.get('AZURE_OPENAI_API_KEY')
azure_deployment = os.environ.get('AZURE_OPENAI_DEPLOYMENT', 'gpt-4.1')
azure_embedding_deployment = os.environ.get(
'AZURE_OPENAI_EMBEDDING_DEPLOYMENT', 'text-embedding-3-small'
)
if not azure_endpoint or not azure_api_key:
raise ValueError('AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set')
async def main():
#################################################
# INITIALIZATION
#################################################
# Connect to Neo4j and Azure OpenAI, then set up
# Graphiti indices. This is required before using
# other Graphiti functionality
#################################################
# Initialize Azure OpenAI client
azure_client = AsyncOpenAI(
base_url=f'{azure_endpoint}/openai/v1/',
api_key=azure_api_key,
)
# Create LLM and Embedder clients
llm_client = AzureOpenAILLMClient(
azure_client=azure_client,
config=LLMConfig(model=azure_deployment, small_model=azure_deployment),
)
embedder_client = AzureOpenAIEmbedderClient(
azure_client=azure_client, model=azure_embedding_deployment
)
# Initialize Graphiti with Neo4j connection and Azure OpenAI clients
graphiti = Graphiti(
neo4j_uri,
neo4j_user,
neo4j_password,
llm_client=llm_client,
embedder=embedder_client,
)
try:
#################################################
# ADDING EPISODES
#################################################
# Episodes are the primary units of information
# in Graphiti. They can be text or structured JSON
# and are automatically processed to extract entities
# and relationships.
#################################################
# Example: Add Episodes
# Episodes list containing both text and JSON episodes
episodes = [
{
'content': 'Kamala Harris is the Attorney General of California. She was previously '
'the district attorney for San Francisco.',
'type': EpisodeType.text,
'description': 'podcast transcript',
},
{
'content': 'As AG, Harris was in office from January 3, 2011 – January 3, 2017',
'type': EpisodeType.text,
'description': 'podcast transcript',
},
{
'content': {
'name': 'Gavin Newsom',
'position': 'Governor',
'state': 'California',
'previous_role': 'Lieutenant Governor',
'previous_location': 'San Francisco',
},
'type': EpisodeType.json,
'description': 'podcast metadata',
},
]
# Add episodes to the graph
for i, episode in enumerate(episodes):
await graphiti.add_episode(
name=f'California Politics {i}',
episode_body=(
episode['content']
if isinstance(episode['content'], str)
else json.dumps(episode['content'])
),
source=episode['type'],
source_description=episode['description'],
reference_time=datetime.now(timezone.utc),
)
print(f'Added episode: California Politics {i} ({episode["type"].value})')
#################################################
# BASIC SEARCH
#################################################
# The simplest way to retrieve relationships (edges)
# from Graphiti is using the search method, which
# performs a hybrid search combining semantic
# similarity and BM25 text retrieval.
#################################################
# Perform a hybrid search combining semantic similarity and BM25 retrieval
print("\nSearching for: 'Who was the California Attorney General?'")
results = await graphiti.search('Who was the California Attorney General?')
# Print search results
print('\nSearch Results:')
for result in results:
print(f'UUID: {result.uuid}')
print(f'Fact: {result.fact}')
if hasattr(result, 'valid_at') and result.valid_at:
print(f'Valid from: {result.valid_at}')
if hasattr(result, 'invalid_at') and result.invalid_at:
print(f'Valid until: {result.invalid_at}')
print('---')
#################################################
# CENTER NODE SEARCH
#################################################
# For more contextually relevant results, you can
# use a center node to rerank search results based
# on their graph distance to a specific node
#################################################
# Use the top search result's UUID as the center node for reranking
if results and len(results) > 0:
# Get the source node UUID from the top result
center_node_uuid = results[0].source_node_uuid
print('\nReranking search results based on graph distance:')
print(f'Using center node UUID: {center_node_uuid}')
reranked_results = await graphiti.search(
'Who was the California Attorney General?',
center_node_uuid=center_node_uuid,
)
# Print reranked search results
print('\nReranked Search Results:')
for result in reranked_results:
print(f'UUID: {result.uuid}')
print(f'Fact: {result.fact}')
if hasattr(result, 'valid_at') and result.valid_at:
print(f'Valid from: {result.valid_at}')
if hasattr(result, 'invalid_at') and result.invalid_at:
print(f'Valid until: {result.invalid_at}')
print('---')
else:
print('No results found in the initial search to use as center node.')
finally:
#################################################
# CLEANUP
#################################################
# Always close the connection to Neo4j when
# finished to properly release resources
#################################################
# Close the connection
await graphiti.close()
print('\nConnection closed')
if __name__ == '__main__':
asyncio.run(main())