langgraph_Agent.pyβ’11.4 kB
"""
Non-Blocking MCP Agent with LangGraph - Thread-Based Wrapper Solution
This module provides a working integration of async MCP operations with LangGraph agents
by using thread-based wrappers to avoid blocking conditions.
"""
import asyncio
import os
import threading
import json
from typing import Dict, Any, List, Optional
from contextlib import AsyncExitStack
from queue import Queue
from dotenv import load_dotenv
# MCP and LangChain imports
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import BaseTool
from langchain_core.documents import Document
# Document processing imports
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
class AsyncMCPWrapper:
"""
Thread-based wrapper for MCP operations
Runs async MCP operations in a separate thread with its own event loop
This avoids blocking conditions when integrating with LangGraph
"""
def __init__(self):
self.session = None
self.thread = None
self.loop = None
self.queue = Queue()
self._stop_event = threading.Event()
def start(self):
"""Start the MCP session in a background thread"""
self.thread = threading.Thread(target=self._run_event_loop, daemon=False)
self.thread.start()
# Wait for thread to be ready
self.queue.get()
def _run_event_loop(self):
"""Run the event loop in the background thread"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Setup MCP and signal ready
self.loop.run_until_complete(self._setup_mcp())
self.queue.put("ready")
# Keep the loop alive until stop is called
while not self._stop_event.is_set():
self.loop.run_until_complete(asyncio.sleep(0.1))
async def _setup_mcp(self):
"""Setup MCP client in the background thread's event loop"""
print("π§ Setting up MCP in background thread...")
server_params = StdioServerParameters(
command="python",
args=["server.py"],
cwd=os.getcwd()
)
exit_stack = AsyncExitStack()
stdio_transport = await exit_stack.enter_async_context(stdio_client(server_params))
stdio, write = stdio_transport
self.session = await exit_stack.enter_async_context(ClientSession(stdio, write))
await self.session.initialize()
print("β
MCP session initialized in background thread")
# Keep the exit stack alive
self._exit_stack = exit_stack
def call_tool(self, tool_name: str, tool_args: Dict[str, Any]) -> str:
"""
Synchronously call an MCP tool by scheduling it in the background thread
"""
result_queue = Queue()
def run_tool():
try:
# Schedule the coroutine in the background loop
future = asyncio.run_coroutine_threadsafe(
self.session.call_tool(tool_name, tool_args),
self.loop
)
result = future.result(timeout=30)
if hasattr(result, 'content') and result.content:
result_queue.put(str(result.content[0].text))
else:
result_queue.put(str(result))
except Exception as e:
result_queue.put(f"Error: {str(e)}")
# Run in a thread to avoid blocking
thread = threading.Thread(target=run_tool)
thread.start()
thread.join(timeout=35)
if result_queue.empty():
return "Error: Tool call timed out"
return result_queue.get()
def stop(self):
"""Stop the background thread"""
self._stop_event.set()
if self.thread:
self.thread.join(timeout=5)
print("β
MCP wrapper stopped")
# Global MCP wrapper
mcp_wrapper = AsyncMCPWrapper()
def get_mcp_tools() -> List[BaseTool]:
"""Get MCP tools with non-blocking wrapper"""
print("π§ Loading MCP tools...")
# Get tools from the session
future = asyncio.run_coroutine_threadsafe(
mcp_wrapper.session.list_tools(),
mcp_wrapper.loop
)
tools_result = future.result(timeout=10)
print(f"β
Found {len(tools_result.tools)} MCP tools")
tools = []
for tool_meta in tools_result.tools:
print(f" - {tool_meta.name}: {tool_meta.description}")
class MCPTool(BaseTool):
name: str = tool_meta.name
description: str = tool_meta.description
def _run(self, **kwargs: Any) -> str:
"""Non-blocking tool call using the wrapper"""
return mcp_wrapper.call_tool(self.name, kwargs)
tools.append(MCPTool())
return tools
def create_agent_with_tools(tools: List[BaseTool]):
"""Create a LangChain agent with MCP tools"""
print("π€ Creating agent...")
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
api_key=OPENAI_API_KEY
)
system_prompt = """You are a helpful AI assistant. You have access to these tools:
- roll_dice: Roll dice with notation like "2d6"
- web_search: Search the web for information
- organize_and_categorize: Organize bookmarks
- load_bookmark_data: Load bookmark and history data
IMPORTANT: Always use tools when the user asks for something they can help with!"""
agent = create_agent(
model=llm,
tools=tools,
system_prompt=system_prompt
)
print("β
Agent created successfully")
return agent
def setup_document_processing():
"""Setup document processing with organized bookmark data"""
print("π Setting up document processing...")
organized_file = os.path.join("organized", "complete_organized.json")
if not os.path.exists(organized_file):
print(f"β οΈ Organized data file not found: {organized_file}")
return None, None
try:
with open(organized_file, 'r', encoding='utf-8') as f:
organized_data = json.load(f)
# Convert bookmark data to documents
documents = []
for category, items in organized_data.items():
if isinstance(items, list):
for item in items:
content = f"Title: {item.get('title', '')}\n"
content += f"URL: {item.get('url', '')}\n"
content += f"Category: {category}\n"
content += f"Keywords: {', '.join(item.get('keywords', []))}\n"
doc = Document(
page_content=content,
metadata={
'url': item.get('url', ''),
'title': item.get('title', ''),
'category': category,
'source': item.get('source', ''),
'keywords': item.get('keywords', [])
}
)
documents.append(doc)
print(f"π Created {len(documents)} documents")
if not documents:
return None, None
# Setup text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100
)
texts = text_splitter.split_documents(documents)
# Setup embedding
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
# Setup Qdrant vector store
qdrant_client = QdrantClient(":memory:")
qdrant_client.create_collection(
collection_name="documents",
vectors_config=VectorParams(
size=len(embedding_model.embed_query("Hello")),
distance=Distance.COSINE
),
)
vector_store = QdrantVectorStore(
client=qdrant_client,
collection_name="documents",
embedding=embedding_model,
)
vector_store.add_documents(texts)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 3}
)
print("β
Document processing setup complete")
return retriever, vector_store
except Exception as e:
print(f"β Error setting up document processing: {e}")
return None, None
def test_agent(agent):
"""Test the agent with various queries"""
print("\nπ§ͺ Testing agent with non-blocking tool calls...\n")
test_queries = [
"Roll 2d6 dice",
"Search for best practices in Python",
"Organize my bookmarks",
]
for query in test_queries:
print(f"\nπ Query: {query}")
print("-" * 60)
try:
# Use correct input format for LangGraph agents
result = agent.invoke({
"messages": [{"role": "user", "content": query}]
})
# Extract final response
if 'messages' in result and result['messages']:
last_msg = result['messages'][-1]
response = last_msg.content if hasattr(last_msg, 'content') else str(last_msg)
# Check if tools were used
tool_used = any(
hasattr(msg, 'tool_calls') and msg.tool_calls
for msg in result['messages']
)
print(f"β
Response: {response[:300]}...")
print(f"π§ Tools used: {'Yes β' if tool_used else 'No'}")
print(f"π Total messages: {len(result['messages'])}")
else:
print(f"β No response received")
except Exception as e:
print(f"β Error: {e}")
import traceback
traceback.print_exc()
print("-" * 60)
def main():
"""Main function"""
print("π Starting Non-Blocking MCP Agent with LangGraph\n")
print("=" * 60)
try:
# Start MCP wrapper (runs in background thread)
mcp_wrapper.start()
# Get tools
tools = get_mcp_tools()
# Create agent
agent = create_agent_with_tools(tools)
# Setup document processing
retriever, vector_store = setup_document_processing()
# Test agent
test_agent(agent)
print("\n" + "=" * 60)
print("β
All tests completed successfully!")
except Exception as e:
print(f"β Error: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup
mcp_wrapper.stop()
print("π Done!")
if __name__ == "__main__":
main()