import asyncio
import sys
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Define the path to the server script
SERVER_SCRIPT = os.path.join(os.path.dirname(__file__), "mcp_server.py")
class TripPlannerOrchestrator:
def __init__(self, session: ClientSession):
self.session = session
async def run_mission(self, user_query: str):
print(f"\n--- Orchestrator received query: '{user_query}' ---")
# In a real LLM-based app, the LLM would analyze the query and decide which agents to call.
# Here, we simulate the "Plan" phase of the agent.
# Step 1: Identify the Manager and Location from the query (Simulated NLU)
manager_name = "Raj" # Extracted from "Raj's team"
location = "Udaipur" # Extracted from "Udaipur trip"
print(f"Orchestrator: Extracted Manager='{manager_name}', Location='{location}'")
# Step 2: Delegate to Team Agent
print("\nOrchestrator: Delegating to Team Agent to find team members...")
team_members = await self.team_agent_task(manager_name)
print(f"Orchestrator: Team members found: {team_members}")
# Step 3: Delegate to Food Agent
print("\nOrchestrator: Delegating to Food Agent to get preferences...")
food_prefs = await self.food_agent_task(team_members)
print("Orchestrator: Food preferences retrieved.")
# Step 4: Delegate to Travel Agent
print("\nOrchestrator: Delegating to Travel Agent for itinerary...")
itinerary = await self.travel_agent_task(location)
print("Orchestrator: Itinerary retrieved.")
# Step 5: Synthesize Final Result
print("\n--- Final Response ---")
print(f"Here is the plan for {manager_name}'s team trip to {location}:")
print("\nTeam Food Preferences:")
for person, prefs in food_prefs.items():
print(f" - {person}: {', '.join(prefs)}")
print(f"\nSuggested Itinerary for {location}:")
print(itinerary)
print("----------------------")
async def team_agent_task(self, manager_name: str):
# The Team Agent now "generates" a Cypher query instead of calling a specific tool
print(f" [Team Agent] Generating Cypher query for manager: {manager_name}...")
cypher_query = f"MATCH (m:Person {{name: '{manager_name}'}})-[:MANAGES]->(e:Person) RETURN e.name AS employee_name"
print(f" [Team Agent] Executing: {cypher_query}")
result = await self.session.call_tool("run_cypher_query", arguments={"query": cypher_query})
# Parse result (list of dicts)
import json
data = json.loads(result.content[0].text)
# Extract names
names = [record['employee_name'] for record in data if 'employee_name' in record]
return names
async def food_agent_task(self, team_members: list):
# The Food Agent "generates" a SQL query
print(f" [Food Agent] Generating SQL query for team: {team_members}...")
if not team_members:
return {}
# Format list for SQL IN clause: ('Raj', 'Sarah')
names_tuple = str(tuple(team_members)).replace(",)", ")")
sql_query = f"SELECT person_name, preference_type, detail FROM food_preferences WHERE person_name IN {names_tuple}"
print(f" [Food Agent] Executing: {sql_query}")
result = await self.session.call_tool("run_sql_query", arguments={"query": sql_query})
import json
data = json.loads(result.content[0].text)
# Process data into map
prefs_map = {}
for row in data:
if "error" in row:
print(f" [Error] {row['error']}")
continue
name = row['person_name']
if name not in prefs_map:
prefs_map[name] = []
prefs_map[name].append(f"{row['preference_type']}: {row['detail']}")
return prefs_map
async def travel_agent_task(self, location: str):
# 1. Get Comprehensive Trip Recommendations with enhanced parameters
print(f" [Travel Agent] Planning trip to {location}...")
# In a real scenario, these would be extracted from the user query or team data
# For this demo, we'll use sample parameters
rec_result = await self.session.call_tool(
"get_trip_recommendations",
arguments={
"location": location,
"num_people": 3, # Based on team size (would be dynamic)
"preferences": "food, culture, sightseeing", # Based on team interests
# start_date and end_date will auto-default to upcoming weekend
# budget_per_person can be added if available
}
)
import json
recommendations = json.loads(rec_result.content[0].text)
# 2. Get Country Info (External API)
country = "India" # Would be extracted from location or recommendations
print(f" [Travel Agent] Fetching country info for: {country}...")
country_result = await self.session.call_tool("get_country_info", arguments={"country_name": country})
country_info = json.loads(country_result.content[0].text)
# 3. Format comprehensive report
full_report = f"=== TRIP PLAN FOR {location.upper()} ===\n\n"
# Trip Details
if "trip_details" in recommendations:
details = recommendations["trip_details"]
full_report += f"Duration: {details.get('duration_days')} days ({details.get('start_date')} to {details.get('end_date')})\n"
full_report += f"Group Size: {details.get('num_people')} people\n"
full_report += f"Preferences: {', '.join(details.get('preferences', []))}\n\n"
# Weather Forecast
if recommendations.get("weather_forecast"):
weather = recommendations["weather_forecast"]
if "summary" in weather:
full_report += f"Weather: {weather['summary']}\n\n"
# Recommendations
if recommendations.get("recommendations"):
full_report += "RECOMMENDATIONS:\n"
for rec in recommendations["recommendations"]:
full_report += f"\n{rec.get('category', 'General')}:\n"
for suggestion in rec.get('suggestions', []):
full_report += f" • {suggestion}\n"
# Tips
if recommendations.get("tips"):
full_report += f"\nTRAVEL TIPS:\n"
for tip in recommendations["tips"]:
full_report += f" • {tip}\n"
# Country Info
full_report += f"\n--- DESTINATION INFO ({country}) ---\n"
if "error" not in country_info:
full_report += f"Capital: {country_info.get('capital')}\n"
full_report += f"Currency: {', '.join(country_info.get('currencies', []))}\n"
full_report += f"Languages: {', '.join(country_info.get('languages', []))}\n"
else:
full_report += "Could not fetch country info.\n"
# External Resources
if recommendations.get("useful_resources"):
full_report += f"\nUSEFUL LINKS:\n"
resources = recommendations["useful_resources"]
for key, url in resources.items():
full_report += f" • {key.replace('_', ' ').title()}: {url}\n"
return full_report
async def main():
# Set up the connection to the MCP server
server_params = StdioServerParameters(
command=sys.executable,
args=[SERVER_SCRIPT],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the session
await session.initialize()
# List tools to verify connection
tools = await session.list_tools()
print(f"Connected to MCP Server. Available tools: {[t.name for t in tools.tools]}")
# Run the Orchestrator
orchestrator = TripPlannerOrchestrator(session)
await orchestrator.run_mission("Plan a healthy food itinerary for Udaipur trip and list food preferences for people in Raj’s team.")
if __name__ == "__main__":
# Ensure we are in the right directory or add src to path if needed
# But since we use absolute path for SERVER_SCRIPT, it should be fine.
asyncio.run(main())