Skip to main content
Glama

Tavily Web Search MCP Server

by UpendraNath
langgraph_Agent.pyβ€’11.4 kB
""" Non-Blocking MCP Agent with LangGraph - Thread-Based Wrapper Solution This module provides a working integration of async MCP operations with LangGraph agents by using thread-based wrappers to avoid blocking conditions. """ import asyncio import os import threading import json from typing import Dict, Any, List, Optional from contextlib import AsyncExitStack from queue import Queue from dotenv import load_dotenv # MCP and LangChain imports from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from langchain.agents import create_agent from langchain_openai import ChatOpenAI from langchain_core.tools import BaseTool from langchain_core.documents import Document # Document processing imports from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_qdrant import QdrantVectorStore from qdrant_client import QdrantClient from qdrant_client.http.models import Distance, VectorParams load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") class AsyncMCPWrapper: """ Thread-based wrapper for MCP operations Runs async MCP operations in a separate thread with its own event loop This avoids blocking conditions when integrating with LangGraph """ def __init__(self): self.session = None self.thread = None self.loop = None self.queue = Queue() self._stop_event = threading.Event() def start(self): """Start the MCP session in a background thread""" self.thread = threading.Thread(target=self._run_event_loop, daemon=False) self.thread.start() # Wait for thread to be ready self.queue.get() def _run_event_loop(self): """Run the event loop in the background thread""" self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # Setup MCP and signal ready self.loop.run_until_complete(self._setup_mcp()) self.queue.put("ready") # Keep the loop alive until stop is called while not self._stop_event.is_set(): self.loop.run_until_complete(asyncio.sleep(0.1)) async def _setup_mcp(self): """Setup MCP client in the background thread's event loop""" print("πŸ”§ Setting up MCP in background thread...") server_params = StdioServerParameters( command="python", args=["server.py"], cwd=os.getcwd() ) exit_stack = AsyncExitStack() stdio_transport = await exit_stack.enter_async_context(stdio_client(server_params)) stdio, write = stdio_transport self.session = await exit_stack.enter_async_context(ClientSession(stdio, write)) await self.session.initialize() print("βœ… MCP session initialized in background thread") # Keep the exit stack alive self._exit_stack = exit_stack def call_tool(self, tool_name: str, tool_args: Dict[str, Any]) -> str: """ Synchronously call an MCP tool by scheduling it in the background thread """ result_queue = Queue() def run_tool(): try: # Schedule the coroutine in the background loop future = asyncio.run_coroutine_threadsafe( self.session.call_tool(tool_name, tool_args), self.loop ) result = future.result(timeout=30) if hasattr(result, 'content') and result.content: result_queue.put(str(result.content[0].text)) else: result_queue.put(str(result)) except Exception as e: result_queue.put(f"Error: {str(e)}") # Run in a thread to avoid blocking thread = threading.Thread(target=run_tool) thread.start() thread.join(timeout=35) if result_queue.empty(): return "Error: Tool call timed out" return result_queue.get() def stop(self): """Stop the background thread""" self._stop_event.set() if self.thread: self.thread.join(timeout=5) print("βœ… MCP wrapper stopped") # Global MCP wrapper mcp_wrapper = AsyncMCPWrapper() def get_mcp_tools() -> List[BaseTool]: """Get MCP tools with non-blocking wrapper""" print("πŸ”§ Loading MCP tools...") # Get tools from the session future = asyncio.run_coroutine_threadsafe( mcp_wrapper.session.list_tools(), mcp_wrapper.loop ) tools_result = future.result(timeout=10) print(f"βœ… Found {len(tools_result.tools)} MCP tools") tools = [] for tool_meta in tools_result.tools: print(f" - {tool_meta.name}: {tool_meta.description}") class MCPTool(BaseTool): name: str = tool_meta.name description: str = tool_meta.description def _run(self, **kwargs: Any) -> str: """Non-blocking tool call using the wrapper""" return mcp_wrapper.call_tool(self.name, kwargs) tools.append(MCPTool()) return tools def create_agent_with_tools(tools: List[BaseTool]): """Create a LangChain agent with MCP tools""" print("πŸ€– Creating agent...") llm = ChatOpenAI( model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY ) system_prompt = """You are a helpful AI assistant. You have access to these tools: - roll_dice: Roll dice with notation like "2d6" - web_search: Search the web for information - organize_and_categorize: Organize bookmarks - load_bookmark_data: Load bookmark and history data IMPORTANT: Always use tools when the user asks for something they can help with!""" agent = create_agent( model=llm, tools=tools, system_prompt=system_prompt ) print("βœ… Agent created successfully") return agent def setup_document_processing(): """Setup document processing with organized bookmark data""" print("πŸ“„ Setting up document processing...") organized_file = os.path.join("organized", "complete_organized.json") if not os.path.exists(organized_file): print(f"⚠️ Organized data file not found: {organized_file}") return None, None try: with open(organized_file, 'r', encoding='utf-8') as f: organized_data = json.load(f) # Convert bookmark data to documents documents = [] for category, items in organized_data.items(): if isinstance(items, list): for item in items: content = f"Title: {item.get('title', '')}\n" content += f"URL: {item.get('url', '')}\n" content += f"Category: {category}\n" content += f"Keywords: {', '.join(item.get('keywords', []))}\n" doc = Document( page_content=content, metadata={ 'url': item.get('url', ''), 'title': item.get('title', ''), 'category': category, 'source': item.get('source', ''), 'keywords': item.get('keywords', []) } ) documents.append(doc) print(f"πŸ“š Created {len(documents)} documents") if not documents: return None, None # Setup text splitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100 ) texts = text_splitter.split_documents(documents) # Setup embedding embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY) # Setup Qdrant vector store qdrant_client = QdrantClient(":memory:") qdrant_client.create_collection( collection_name="documents", vectors_config=VectorParams( size=len(embedding_model.embed_query("Hello")), distance=Distance.COSINE ), ) vector_store = QdrantVectorStore( client=qdrant_client, collection_name="documents", embedding=embedding_model, ) vector_store.add_documents(texts) retriever = vector_store.as_retriever( search_type="similarity", search_kwargs={"k": 3} ) print("βœ… Document processing setup complete") return retriever, vector_store except Exception as e: print(f"❌ Error setting up document processing: {e}") return None, None def test_agent(agent): """Test the agent with various queries""" print("\nπŸ§ͺ Testing agent with non-blocking tool calls...\n") test_queries = [ "Roll 2d6 dice", "Search for best practices in Python", "Organize my bookmarks", ] for query in test_queries: print(f"\nπŸ“ Query: {query}") print("-" * 60) try: # Use correct input format for LangGraph agents result = agent.invoke({ "messages": [{"role": "user", "content": query}] }) # Extract final response if 'messages' in result and result['messages']: last_msg = result['messages'][-1] response = last_msg.content if hasattr(last_msg, 'content') else str(last_msg) # Check if tools were used tool_used = any( hasattr(msg, 'tool_calls') and msg.tool_calls for msg in result['messages'] ) print(f"βœ… Response: {response[:300]}...") print(f"πŸ”§ Tools used: {'Yes βœ“' if tool_used else 'No'}") print(f"πŸ“Š Total messages: {len(result['messages'])}") else: print(f"❌ No response received") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() print("-" * 60) def main(): """Main function""" print("πŸš€ Starting Non-Blocking MCP Agent with LangGraph\n") print("=" * 60) try: # Start MCP wrapper (runs in background thread) mcp_wrapper.start() # Get tools tools = get_mcp_tools() # Create agent agent = create_agent_with_tools(tools) # Setup document processing retriever, vector_store = setup_document_processing() # Test agent test_agent(agent) print("\n" + "=" * 60) print("βœ… All tests completed successfully!") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() finally: # Cleanup mcp_wrapper.stop() print("🏁 Done!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/UpendraNath/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server