Skip to main content
Glama

Tavily Web Search MCP Server

by inesaranab
langgraph_agent.py11.1 kB
from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from langchain_mcp_adapters.tools import load_mcp_tools from langchain.agents import create_agent from langchain_core.messages import HumanMessage from langchain_community.document_loaders import PyMuPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_qdrant import QdrantVectorStore from qdrant_client import QdrantClient from qdrant_client.http.models import Distance, VectorParams from langchain_core.documents import Document from langchain_core.prompts.chat import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.graph import StateGraph, START, END from typing_extensions import TypedDict from dotenv import load_dotenv import os import re import asyncio load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") ### AgentState class AgentState(TypedDict): question: str context: list[Document] | None sentiment: str | None audio_file: str | None response: str | None ### Data Collectors - Load FAQ PDF file_path = "C:/Users/Inés/AIE8-MCP-Session/Data/240807-faqs-corporate-sustainability-reporting_en.pdf" loader = PyMuPDFLoader(file_path) documents = loader.load() print(f"Loaded {len(documents)} documents from {file_path}") # Preprocess the documents text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) texts = text_splitter.split_documents(documents) # Embedding model and vector store embedding_model = OpenAIEmbeddings() # Qdrant vectorstore qdrant_client = QdrantClient(":memory:") qdrant_client.create_collection( collection_name="documents", vectors_config=VectorParams(size=len(embedding_model.embed_query("Hello")), distance=Distance.COSINE), ) vector_store = QdrantVectorStore( client=qdrant_client, collection_name="documents", embedding=embedding_model, ) # Add documents to the vector store _ = vector_store.add_documents(texts) # Create retriever retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3}) ### RAG ChatTemplate for Generator TEMPLATE = """ #CONTEXT: {context} #QUERY: {question} Use the provided context to answer the provided user query. Only use the provided context to answer the query. If you do not know the answer, or it's not contained in the provided context, respond with "I don't know". Keep the answer concise and helpful. """ chat_prompt = ChatPromptTemplate.from_messages([ ("human", TEMPLATE) ]) # Generator LLM llm_generator = ChatOpenAI(model="gpt-4o", temperature=0) # type: ignore # Generator LCEL chain generator_chain = chat_prompt | llm_generator | StrOutputParser() ### Setup MCP Client and Tools at Module Level print("Setting up MCP client and loading tools...") # We need to run async code at module level, so we'll use a helper async def setup_mcp(): """Setup MCP client and return agent with tools""" client = MultiServerMCPClient({ "miscellaneous": { "command": "python", "args": ["C:/Users/Inés/AIE8-MCP-Session/server.py"], "transport": "stdio", }, "sentiment": { "command": "python", "args": ["C:/Users/Inés/AIE8-MCP-Session/custom_server.py"], "transport": "stdio", } }) # Get tools from MCP servers tools = await client.get_tools() # Create agent that can EXECUTE tools (not just bind them) agent = create_agent("openai:gpt-4o", tools) return agent, tools # Initialize agent at module level agent_executor, mcp_tools = asyncio.run(setup_mcp()) generator_with_tools = llm_generator.bind_tools(mcp_tools) # For LCEL chain print(f"MCP tools loaded and agent created!") ### Node Functions async def classify_sentiment_node(state: AgentState) -> dict: """Classify sentiment using MCP sentiment_classification tool""" question = state["question"] print(f"\n=== Classifying sentiment for: '{question}' ===") # Use agent_executor which actually EXECUTES MCP tools response = await agent_executor.ainvoke({ "messages": [HumanMessage(content=f"Use sentiment_classification tool to classify the sentiment of this text: '{question}'")] }) print(f"Agent response: {response}") # Extract sentiment from agent response sentiment = None if isinstance(response, dict) and "messages" in response: for msg in response["messages"]: if hasattr(msg, 'content'): content = str(msg.content) content_upper = content.upper() if "POSITIVE" in content_upper: sentiment = "POSITIVE" break elif "NEGATIVE" in content_upper: sentiment = "NEGATIVE" break elif "NEUTRAL" in content_upper: sentiment = "NEUTRAL" break print(f"Detected sentiment: {sentiment}") return {"sentiment": sentiment} async def retrieve_documents_node(state: AgentState) -> dict: """Retrieve relevant FAQ documents from vector store""" question = state["question"] print(f"\n=== Retrieving documents for: '{question}' ===") # Use retriever to get relevant documents retrieved_docs = await retriever.ainvoke(question) print(f"Retrieved {len(retrieved_docs)} documents") return {"context": retrieved_docs} async def generate_response_node(state: AgentState) -> dict: """Generate coherent answer using LCEL generator chain based on retrieved documents""" question = state["question"] context = state.get("context", []) print(f"\n=== Generating response using LCEL chain ===") # Format context for the prompt context_str = "\n\n".join([doc.page_content for doc in context]) if context else "No context available" # Use LCEL chain to generate response response = await generator_chain.ainvoke({ 'question': question, 'context': context_str }) print(f"Generated response: {response[:100]}...") return {'response': response} async def generate_audio_node(state: AgentState) -> dict: """Generate audio using MCP text_to_speech tool""" print(f"\n=== Generating audio ===") # Determine what to say based on sentiment sentiment = state.get("sentiment") if sentiment and "NEGATIVE" in sentiment.upper(): # For negative sentiment, use the generated response from the LCEL chain text_to_speak = state.get("response") or "I understand your concern." else: # Acknowledge positive/neutral feedback text_to_speak = f"Thank you for your feedback about: {state['question']}" print(f"Text to convert: {text_to_speak[:100]}...") # Call MCP text_to_speech tool via agent_executor response = await agent_executor.ainvoke({ "messages": [HumanMessage(content=f"Use text_to_speech tool to convert this text into speech: '{text_to_speak}'")] }) # Extract audio filename from agent response audio_filename = None print(f"Audio response: {response}") # Extract from agent response messages if isinstance(response, dict) and "messages" in response: for msg in response["messages"]: if hasattr(msg, 'content'): content = str(msg.content) if "speech_" in content: match = re.search(r'speech_[a-f0-9]{8}\.mp3', content) if match: audio_filename = match.group(0) print(f"Found audio filename: {audio_filename}") break print(f"Audio generated: {audio_filename}") return { "audio_file": audio_filename } ### Conditional Router def should_retrieve_documents(state: AgentState) -> str: """Route based on sentiment - retrieve docs only for negative sentiment""" sentiment = state.get("sentiment") if sentiment and "NEGATIVE" in sentiment.upper(): print("→ Routing to document retrieval (negative sentiment)") return "retrieve" else: print("→ Skipping document retrieval (positive/neutral sentiment)") return "skip" ### Build LangGraph Workflow async def main(): """Main function - builds and runs the graph""" # MCP tools are already set up at module level! # Build the StateGraph builder = StateGraph(AgentState) # Add nodes builder.add_node("classify_sentiment", classify_sentiment_node) builder.add_node("retrieve_documents", retrieve_documents_node) builder.add_node("generate_response", generate_response_node) builder.add_node("generate_audio", generate_audio_node) # Add edges builder.add_edge(START, "classify_sentiment") # Conditional routing based on sentiment builder.add_conditional_edges( "classify_sentiment", should_retrieve_documents, { "retrieve": "retrieve_documents", "skip": "generate_audio" # Skip directly to audio for positive } ) # After retrieving documents, generate response builder.add_edge("retrieve_documents", "generate_response") # After generating response, create audio builder.add_edge("generate_response", "generate_audio") # End after audio generation builder.add_edge("generate_audio", END) # Compile the graph app = builder.compile() # Test with negative sentiment question print("\n" + "="*60) print("TEST 1: Negative Sentiment (should retrieve → generate → audio)") print("="*60) result1 = await app.ainvoke({ "question": "I'm frustrated with corporate sustainability reporting requirements", "context": None, "sentiment": None, "audio_file": None, "response": None }) print("\n=== RESULTS ===") print(f"Question: {result1['question']}") print(f"Sentiment: {result1.get('sentiment')}") response_text = result1.get('response') or 'N/A' if response_text != 'N/A': print(f"Response: {response_text[:200]}...") else: print(f"Response: {response_text}") print(f"Audio File: {result1.get('audio_file')}") # Test with positive sentiment question print("\n" + "="*60) print("TEST 2: Positive Sentiment (should skip retrieval)") print("="*60) result2 = await app.ainvoke({ "question": "I love how easy the sustainability reporting process is!", "context": None, "sentiment": None, "audio_file": None, "response": None }) print("\n=== RESULTS ===") print(f"Question: {result2['question']}") print(f"Sentiment: {result2.get('sentiment')}") print(f"Response: {result2.get('response', 'N/A')}") print(f"Audio File: {result2.get('audio_file')}") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/inesaranab/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server