Skip to main content
Glama
orchestrator.py8.67 kB
import asyncio import sys import os from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Define the path to the server script SERVER_SCRIPT = os.path.join(os.path.dirname(__file__), "mcp_server.py") class TripPlannerOrchestrator: def __init__(self, session: ClientSession): self.session = session async def run_mission(self, user_query: str): print(f"\n--- Orchestrator received query: '{user_query}' ---") # In a real LLM-based app, the LLM would analyze the query and decide which agents to call. # Here, we simulate the "Plan" phase of the agent. # Step 1: Identify the Manager and Location from the query (Simulated NLU) manager_name = "Raj" # Extracted from "Raj's team" location = "Udaipur" # Extracted from "Udaipur trip" print(f"Orchestrator: Extracted Manager='{manager_name}', Location='{location}'") # Step 2: Delegate to Team Agent print("\nOrchestrator: Delegating to Team Agent to find team members...") team_members = await self.team_agent_task(manager_name) print(f"Orchestrator: Team members found: {team_members}") # Step 3: Delegate to Food Agent print("\nOrchestrator: Delegating to Food Agent to get preferences...") food_prefs = await self.food_agent_task(team_members) print("Orchestrator: Food preferences retrieved.") # Step 4: Delegate to Travel Agent print("\nOrchestrator: Delegating to Travel Agent for itinerary...") itinerary = await self.travel_agent_task(location) print("Orchestrator: Itinerary retrieved.") # Step 5: Synthesize Final Result print("\n--- Final Response ---") print(f"Here is the plan for {manager_name}'s team trip to {location}:") print("\nTeam Food Preferences:") for person, prefs in food_prefs.items(): print(f" - {person}: {', '.join(prefs)}") print(f"\nSuggested Itinerary for {location}:") print(itinerary) print("----------------------") async def team_agent_task(self, manager_name: str): # The Team Agent now "generates" a Cypher query instead of calling a specific tool print(f" [Team Agent] Generating Cypher query for manager: {manager_name}...") cypher_query = f"MATCH (m:Person {{name: '{manager_name}'}})-[:MANAGES]->(e:Person) RETURN e.name AS employee_name" print(f" [Team Agent] Executing: {cypher_query}") result = await self.session.call_tool("run_cypher_query", arguments={"query": cypher_query}) # Parse result (list of dicts) import json data = json.loads(result.content[0].text) # Extract names names = [record['employee_name'] for record in data if 'employee_name' in record] return names async def food_agent_task(self, team_members: list): # The Food Agent "generates" a SQL query print(f" [Food Agent] Generating SQL query for team: {team_members}...") if not team_members: return {} # Format list for SQL IN clause: ('Raj', 'Sarah') names_tuple = str(tuple(team_members)).replace(",)", ")") sql_query = f"SELECT person_name, preference_type, detail FROM food_preferences WHERE person_name IN {names_tuple}" print(f" [Food Agent] Executing: {sql_query}") result = await self.session.call_tool("run_sql_query", arguments={"query": sql_query}) import json data = json.loads(result.content[0].text) # Process data into map prefs_map = {} for row in data: if "error" in row: print(f" [Error] {row['error']}") continue name = row['person_name'] if name not in prefs_map: prefs_map[name] = [] prefs_map[name].append(f"{row['preference_type']}: {row['detail']}") return prefs_map async def travel_agent_task(self, location: str): # 1. Get Comprehensive Trip Recommendations with enhanced parameters print(f" [Travel Agent] Planning trip to {location}...") # In a real scenario, these would be extracted from the user query or team data # For this demo, we'll use sample parameters rec_result = await self.session.call_tool( "get_trip_recommendations", arguments={ "location": location, "num_people": 3, # Based on team size (would be dynamic) "preferences": "food, culture, sightseeing", # Based on team interests # start_date and end_date will auto-default to upcoming weekend # budget_per_person can be added if available } ) import json recommendations = json.loads(rec_result.content[0].text) # 2. Get Country Info (External API) country = "India" # Would be extracted from location or recommendations print(f" [Travel Agent] Fetching country info for: {country}...") country_result = await self.session.call_tool("get_country_info", arguments={"country_name": country}) country_info = json.loads(country_result.content[0].text) # 3. Format comprehensive report full_report = f"=== TRIP PLAN FOR {location.upper()} ===\n\n" # Trip Details if "trip_details" in recommendations: details = recommendations["trip_details"] full_report += f"Duration: {details.get('duration_days')} days ({details.get('start_date')} to {details.get('end_date')})\n" full_report += f"Group Size: {details.get('num_people')} people\n" full_report += f"Preferences: {', '.join(details.get('preferences', []))}\n\n" # Weather Forecast if recommendations.get("weather_forecast"): weather = recommendations["weather_forecast"] if "summary" in weather: full_report += f"Weather: {weather['summary']}\n\n" # Recommendations if recommendations.get("recommendations"): full_report += "RECOMMENDATIONS:\n" for rec in recommendations["recommendations"]: full_report += f"\n{rec.get('category', 'General')}:\n" for suggestion in rec.get('suggestions', []): full_report += f" • {suggestion}\n" # Tips if recommendations.get("tips"): full_report += f"\nTRAVEL TIPS:\n" for tip in recommendations["tips"]: full_report += f" • {tip}\n" # Country Info full_report += f"\n--- DESTINATION INFO ({country}) ---\n" if "error" not in country_info: full_report += f"Capital: {country_info.get('capital')}\n" full_report += f"Currency: {', '.join(country_info.get('currencies', []))}\n" full_report += f"Languages: {', '.join(country_info.get('languages', []))}\n" else: full_report += "Could not fetch country info.\n" # External Resources if recommendations.get("useful_resources"): full_report += f"\nUSEFUL LINKS:\n" resources = recommendations["useful_resources"] for key, url in resources.items(): full_report += f" • {key.replace('_', ' ').title()}: {url}\n" return full_report async def main(): # Set up the connection to the MCP server server_params = StdioServerParameters( command=sys.executable, args=[SERVER_SCRIPT], env=None ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the session await session.initialize() # List tools to verify connection tools = await session.list_tools() print(f"Connected to MCP Server. Available tools: {[t.name for t in tools.tools]}") # Run the Orchestrator orchestrator = TripPlannerOrchestrator(session) await orchestrator.run_mission("Plan a healthy food itinerary for Udaipur trip and list food preferences for people in Raj’s team.") if __name__ == "__main__": # Ensure we are in the right directory or add src to path if needed # But since we use absolute path for SERVER_SCRIPT, it should be fine. asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SUSHRUTH3002/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server