langgraph_agent.py•11.1 kB
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_core.documents import Document
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from dotenv import load_dotenv
import os
import re
import asyncio
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
### AgentState
class AgentState(TypedDict):
question: str
context: list[Document] | None
sentiment: str | None
audio_file: str | None
response: str | None
### Data Collectors - Load FAQ PDF
file_path = "C:/Users/Inés/AIE8-MCP-Session/Data/240807-faqs-corporate-sustainability-reporting_en.pdf"
loader = PyMuPDFLoader(file_path)
documents = loader.load()
print(f"Loaded {len(documents)} documents from {file_path}")
# Preprocess the documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
texts = text_splitter.split_documents(documents)
# Embedding model and vector store
embedding_model = OpenAIEmbeddings()
# Qdrant vectorstore
qdrant_client = QdrantClient(":memory:")
qdrant_client.create_collection(
collection_name="documents",
vectors_config=VectorParams(size=len(embedding_model.embed_query("Hello")), distance=Distance.COSINE),
)
vector_store = QdrantVectorStore(
client=qdrant_client,
collection_name="documents",
embedding=embedding_model,
)
# Add documents to the vector store
_ = vector_store.add_documents(texts)
# Create retriever
retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3})
### RAG ChatTemplate for Generator
TEMPLATE = """
#CONTEXT:
{context}
#QUERY:
{question}
Use the provided context to answer the provided user query.
Only use the provided context to answer the query. If you do not know the answer, or it's not contained in the provided context, respond with "I don't know".
Keep the answer concise and helpful.
"""
chat_prompt = ChatPromptTemplate.from_messages([
("human", TEMPLATE)
])
# Generator LLM
llm_generator = ChatOpenAI(model="gpt-4o", temperature=0) # type: ignore
# Generator LCEL chain
generator_chain = chat_prompt | llm_generator | StrOutputParser()
### Setup MCP Client and Tools at Module Level
print("Setting up MCP client and loading tools...")
# We need to run async code at module level, so we'll use a helper
async def setup_mcp():
"""Setup MCP client and return agent with tools"""
client = MultiServerMCPClient({
"miscellaneous": {
"command": "python",
"args": ["C:/Users/Inés/AIE8-MCP-Session/server.py"],
"transport": "stdio",
},
"sentiment": {
"command": "python",
"args": ["C:/Users/Inés/AIE8-MCP-Session/custom_server.py"],
"transport": "stdio",
}
})
# Get tools from MCP servers
tools = await client.get_tools()
# Create agent that can EXECUTE tools (not just bind them)
agent = create_agent("openai:gpt-4o", tools)
return agent, tools
# Initialize agent at module level
agent_executor, mcp_tools = asyncio.run(setup_mcp())
generator_with_tools = llm_generator.bind_tools(mcp_tools) # For LCEL chain
print(f"MCP tools loaded and agent created!")
### Node Functions
async def classify_sentiment_node(state: AgentState) -> dict:
"""Classify sentiment using MCP sentiment_classification tool"""
question = state["question"]
print(f"\n=== Classifying sentiment for: '{question}' ===")
# Use agent_executor which actually EXECUTES MCP tools
response = await agent_executor.ainvoke({
"messages": [HumanMessage(content=f"Use sentiment_classification tool to classify the sentiment of this text: '{question}'")]
})
print(f"Agent response: {response}")
# Extract sentiment from agent response
sentiment = None
if isinstance(response, dict) and "messages" in response:
for msg in response["messages"]:
if hasattr(msg, 'content'):
content = str(msg.content)
content_upper = content.upper()
if "POSITIVE" in content_upper:
sentiment = "POSITIVE"
break
elif "NEGATIVE" in content_upper:
sentiment = "NEGATIVE"
break
elif "NEUTRAL" in content_upper:
sentiment = "NEUTRAL"
break
print(f"Detected sentiment: {sentiment}")
return {"sentiment": sentiment}
async def retrieve_documents_node(state: AgentState) -> dict:
"""Retrieve relevant FAQ documents from vector store"""
question = state["question"]
print(f"\n=== Retrieving documents for: '{question}' ===")
# Use retriever to get relevant documents
retrieved_docs = await retriever.ainvoke(question)
print(f"Retrieved {len(retrieved_docs)} documents")
return {"context": retrieved_docs}
async def generate_response_node(state: AgentState) -> dict:
"""Generate coherent answer using LCEL generator chain based on retrieved documents"""
question = state["question"]
context = state.get("context", [])
print(f"\n=== Generating response using LCEL chain ===")
# Format context for the prompt
context_str = "\n\n".join([doc.page_content for doc in context]) if context else "No context available"
# Use LCEL chain to generate response
response = await generator_chain.ainvoke({
'question': question,
'context': context_str
})
print(f"Generated response: {response[:100]}...")
return {'response': response}
async def generate_audio_node(state: AgentState) -> dict:
"""Generate audio using MCP text_to_speech tool"""
print(f"\n=== Generating audio ===")
# Determine what to say based on sentiment
sentiment = state.get("sentiment")
if sentiment and "NEGATIVE" in sentiment.upper():
# For negative sentiment, use the generated response from the LCEL chain
text_to_speak = state.get("response") or "I understand your concern."
else:
# Acknowledge positive/neutral feedback
text_to_speak = f"Thank you for your feedback about: {state['question']}"
print(f"Text to convert: {text_to_speak[:100]}...")
# Call MCP text_to_speech tool via agent_executor
response = await agent_executor.ainvoke({
"messages": [HumanMessage(content=f"Use text_to_speech tool to convert this text into speech: '{text_to_speak}'")]
})
# Extract audio filename from agent response
audio_filename = None
print(f"Audio response: {response}")
# Extract from agent response messages
if isinstance(response, dict) and "messages" in response:
for msg in response["messages"]:
if hasattr(msg, 'content'):
content = str(msg.content)
if "speech_" in content:
match = re.search(r'speech_[a-f0-9]{8}\.mp3', content)
if match:
audio_filename = match.group(0)
print(f"Found audio filename: {audio_filename}")
break
print(f"Audio generated: {audio_filename}")
return {
"audio_file": audio_filename
}
### Conditional Router
def should_retrieve_documents(state: AgentState) -> str:
"""Route based on sentiment - retrieve docs only for negative sentiment"""
sentiment = state.get("sentiment")
if sentiment and "NEGATIVE" in sentiment.upper():
print("→ Routing to document retrieval (negative sentiment)")
return "retrieve"
else:
print("→ Skipping document retrieval (positive/neutral sentiment)")
return "skip"
### Build LangGraph Workflow
async def main():
"""Main function - builds and runs the graph"""
# MCP tools are already set up at module level!
# Build the StateGraph
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("classify_sentiment", classify_sentiment_node)
builder.add_node("retrieve_documents", retrieve_documents_node)
builder.add_node("generate_response", generate_response_node)
builder.add_node("generate_audio", generate_audio_node)
# Add edges
builder.add_edge(START, "classify_sentiment")
# Conditional routing based on sentiment
builder.add_conditional_edges(
"classify_sentiment",
should_retrieve_documents,
{
"retrieve": "retrieve_documents",
"skip": "generate_audio" # Skip directly to audio for positive
}
)
# After retrieving documents, generate response
builder.add_edge("retrieve_documents", "generate_response")
# After generating response, create audio
builder.add_edge("generate_response", "generate_audio")
# End after audio generation
builder.add_edge("generate_audio", END)
# Compile the graph
app = builder.compile()
# Test with negative sentiment question
print("\n" + "="*60)
print("TEST 1: Negative Sentiment (should retrieve → generate → audio)")
print("="*60)
result1 = await app.ainvoke({
"question": "I'm frustrated with corporate sustainability reporting requirements",
"context": None,
"sentiment": None,
"audio_file": None,
"response": None
})
print("\n=== RESULTS ===")
print(f"Question: {result1['question']}")
print(f"Sentiment: {result1.get('sentiment')}")
response_text = result1.get('response') or 'N/A'
if response_text != 'N/A':
print(f"Response: {response_text[:200]}...")
else:
print(f"Response: {response_text}")
print(f"Audio File: {result1.get('audio_file')}")
# Test with positive sentiment question
print("\n" + "="*60)
print("TEST 2: Positive Sentiment (should skip retrieval)")
print("="*60)
result2 = await app.ainvoke({
"question": "I love how easy the sustainability reporting process is!",
"context": None,
"sentiment": None,
"audio_file": None,
"response": None
})
print("\n=== RESULTS ===")
print(f"Question: {result2['question']}")
print(f"Sentiment: {result2.get('sentiment')}")
print(f"Response: {result2.get('response', 'N/A')}")
print(f"Audio File: {result2.get('audio_file')}")
if __name__ == "__main__":
asyncio.run(main())