import os
from typing import Annotated, List, TypedDict, Union # noqa: F401, UP035
import requests
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from ollama import Client
# --- Eternity Client (Integration Layer) ---
ETERNITY_URL = "http://localhost:8000"
def eternity_search(query: str) -> str:
"""Searches the Eternity long-term memory for relevant context."""
try:
response = requests.get("{}/search".format(ETERNITY_URL),
params={"q": query})
if response.status_code == 200:
results = response.json()
if not results:
return "No relevant memories found."
# Format results for the LLM
context = "Found the following relevant memories:\n"
for r in results:
context += "- [{r}]{r2}...\n"\
.format(r=(round(r['distance'], 2)), r2=r['content'][:200])
return context
else:
return "Error searching memory: {}".format(response.text)
except Exception as e:
return "Failed to connect to Eternity: {}".format(e)
def eternity_add(content: str, tags: str = "langgraph_agent") -> str:
"""Stores a new memory into Eternity."""
try:
response = requests.post("{}/add".format(ETERNITY_URL),
data={"content": content, "tags": tags})
if response.status_code == 200:
return "Successfully stored memory."
else:
return "Error storing memory: {}".format(response.text)
except Exception as e:
return "Failed to connect to Eternity: {}".format(e)
# --- LangGraph Agent Definition ---
class AgentState(TypedDict):
# Use add_messages to append messages instead of overwriting
messages: Annotated[List[BaseMessage], add_messages] # noqa: UP006
context: str
# 1. Node: Recall (Search Memory)
def recall_memory(state: AgentState):
"""Reflects on the last user message and searches memory."""
if not state["messages"]:
return {"context": ""}
last_message = state["messages"][-1]
if isinstance(last_message, HumanMessage):
query = last_message.content
print("🧠 Recalling memory for: {}".format(query))
retrieved_context = eternity_search(query)
return {"context": retrieved_context}
return {"context": ""}
# 2. Node: Generate (LLM Completion via Ollama)
def generate_response(state: AgentState):
"""Generates a response using the retrieved context."""
# Simple prompt injection of the context
system_prompt = (
"You are an AI assistant with long-term memory named Eternity.\n"
"Use the provided [MEMORY CONTEXT] to answer the user's question personally.\n" # noqa: E501
"If the context is not relevant, answer usually.\n"
"After answering, you can decide to save important details to memory." # noqa: E501
)
context = state.get("context", "")
# Construct messages for Ollama API
ollama_messages = [{'role': 'system',
'content': system_prompt + "\n\n[MEMORY CONTEXT]:\n{}".format(context)}]
for msg in state["messages"]:
role = "user" if isinstance(msg, HumanMessage) else "assistant"
ollama_messages.append({'role': role, 'content': msg.content})
# Prepare Client
api_key = os.environ.get('OLLAMA_API_KEY')
if not api_key:
return {"messages":
[AIMessage(content="⚠️ OLLAMA_API_KEY "
"environment variable not found.")]}
client = Client(
host="https://ollama.com",
headers={'Authorization': 'Bearer ' + api_key}
)
print("🤖 Generating response via Ollama...")
full_response = ""
try:
# Stream response to be fancy, but accumulate for state
for part in client.chat(model='gpt-oss:120b',
messages=ollama_messages, stream=True):
content = part['message']['content']
print(content, end='', flush=True)
full_response += content
print() # Newline after stream
except Exception as e:
return {"messages":
[AIMessage(content="Error calling Ollama: {}".format(e))]}
return {"messages": [AIMessage(content=full_response)]}
# 3. Node: Memorize (Save important info)
def memorize_interaction(state: AgentState):
"""Saves the interaction to long-term memory."""
if len(state["messages"]) < 2:
return {}
last_human = state["messages"][-2]
last_ai = state["messages"][-1]
# In a real agent, we might use an LLM to decide WHAT to save (summary).
# For this example, we save the interaction pair.
memory_content = "User: {}\nAI: {}".format(last_human.content, last_ai.content) # noqa: E501, UP032
print("Saving interaction to Eternity...")
result = eternity_add(memory_content, tags="interaction,chat")
print("Result: {}".format(result))
return {}
# --- Workflow Construction ---
workflow = StateGraph(AgentState)
workflow.add_node("recall", recall_memory)
workflow.add_node("generate", generate_response)
workflow.add_node("memorize", memorize_interaction)
workflow.set_entry_point("recall")
workflow.add_edge("recall", "generate")
workflow.add_edge("generate", "memorize")
workflow.add_edge("memorize", END)
app_graph = workflow.compile()
# --- Main Execution Example ---
if __name__ == "__main__":
print("--- Starting LangGraph + Eternity Agent ---")
# 1. First turn: User introduces themselves
print("\n--- Turn 1 ---")
inputs = {"messages":
[HumanMessage(content="Hi, my name is JR "
"and I'm working on a Python project.")]}
final_state = app_graph.invoke(inputs)
# No need to print again as we streamed it
# 2. Second turn: Recall
print("\n--- Turn 2 (Testing Recall) ---")
inputs_2 = {"messages":
[HumanMessage(content="Do you remember what my name is?")]}
final_state_2 = app_graph.invoke(inputs_2)
# No need to print, streamed above