import asyncio
import sys
import os
import json
from dotenv import load_dotenv
import google.generativeai as genai
from google.ai.generativelanguage_v1beta.types import content
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Load environment variables
load_dotenv()
# Configuration
SERVER_SCRIPT = os.path.join(os.path.dirname(__file__), "mcp_server.py")
MODEL_NAME = "gemini-2.0-flash"
def transform_schema_to_gemini(schema):
"""Recursively transform JSON schema to Gemini Schema."""
if not isinstance(schema, dict):
return schema
# Fields that Gemini Schema supports
supported_fields = {'type', 'description', 'properties', 'items', 'required', 'enum', 'format', 'nullable'}
new_schema = {}
for k, v in schema.items():
# Skip unsupported fields like 'title', 'default', etc.
if k not in supported_fields:
continue
if k == 'type':
if isinstance(v, str):
type_map = {
'string': content.Type.STRING,
'number': content.Type.NUMBER,
'integer': content.Type.INTEGER,
'boolean': content.Type.BOOLEAN,
'array': content.Type.ARRAY,
'object': content.Type.OBJECT
}
new_schema[k] = type_map.get(v.lower(), content.Type.TYPE_UNSPECIFIED)
else:
new_schema[k] = v
elif k == 'properties':
new_schema[k] = {pk: transform_schema_to_gemini(pv) for pk, pv in v.items()}
elif k == 'items':
new_schema[k] = transform_schema_to_gemini(v)
else:
new_schema[k] = v
return new_schema
def mcp_tool_to_gemini(tool):
schema = transform_schema_to_gemini(tool.inputSchema)
return content.FunctionDeclaration(
name=tool.name,
description=tool.description,
parameters=schema
)
class Agent:
def __init__(self, name, system_instruction, mcp_session=None, mcp_tools=None, local_tools=None):
self.name = name
self.mcp_session = mcp_session
self.local_tools = local_tools or {} # Map name -> callable
# Prepare tools for Gemini
gemini_tools = []
# Add MCP tools
if mcp_tools:
gemini_tools.extend([mcp_tool_to_gemini(t) for t in mcp_tools])
# Add Local tools
if self.local_tools:
for tool_name, tool_def in self.local_tools.items():
gemini_tools.append(content.FunctionDeclaration(
name=tool_name,
description=tool_def['description'],
parameters=tool_def['schema']
))
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
print("Error: GOOGLE_API_KEY not found.")
sys.exit(1)
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel(
model_name=MODEL_NAME,
tools=[content.Tool(function_declarations=gemini_tools)] if gemini_tools else None,
system_instruction=system_instruction
)
self.chat_session = self.model.start_chat(enable_automatic_function_calling=False)
async def chat(self, user_input):
# print(f"[{self.name}] Received: {user_input}")
response = await self.chat_session.send_message_async(user_input)
while True:
if not response.candidates:
return "Error: No response from model."
# Check for function calls in ANY part of the response
function_calls = []
for part in response.candidates[0].content.parts:
if part.function_call:
function_calls.append(part.function_call)
if function_calls:
function_responses = []
for fc in function_calls:
tool_name = fc.name
tool_args = dict(fc.args)
print(f" [{self.name} -> Tool] {tool_name}({tool_args})")
tool_output = ""
try:
# Check if it's a local tool
if tool_name in self.local_tools:
# Execute local tool (which might be an async call to another agent)
result = await self.local_tools[tool_name]['func'](**tool_args)
tool_output = str(result)
# Check if it's an MCP tool
elif self.mcp_session:
result = await self.mcp_session.call_tool(tool_name, arguments=tool_args)
if result.content:
tool_output = result.content[0].text
else:
tool_output = "No content returned."
else:
tool_output = f"Error: Tool {tool_name} not found."
except Exception as e:
tool_output = f"Error executing {tool_name}: {str(e)}"
print(f" [{self.name} <- Result] {tool_output[:200]}...")
function_responses.append(content.Part(
function_response=content.FunctionResponse(
name=tool_name,
response={"result": tool_output}
)
))
if function_responses:
response = await self.chat_session.send_message_async(function_responses)
else:
break
else:
return response.text
async def main():
server_params = StdioServerParameters(
command=sys.executable,
args=[SERVER_SCRIPT],
env=None
)
print("Starting A2A Chatbot System...")
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Discover all MCP tools
all_tools = await session.list_tools()
tools_map = {t.name: t for t in all_tools.tools}
# --- Define Sub-Agents ---
# 1. Team Agent
# team_tools = [tools_map.get("get_team_members"), tools_map.get("run_cypher_query")]
team_tools = [tools_map.get("run_cypher_query")]
team_agent = Agent(
name="TeamAgent",
system_instruction="""You are a Team Management expert.
You have access to the organization's graph database (Neo4j).
**Schema - Nodes**:
- Employee (name, email, employee_id, role, level, joined_on, years_of_experience, phone)
- Team (name, description, formation_date, team_size)
- Project (name, status, description, start_date, end_date, budget, priority)
- Location (city, country, timezone, office_code)
- Skill (name, category, proficiency_levels)
- Department (name, description, budget, head_count)
**Schema - Relationships**:
- (:Employee)-[:WORKS_IN]->(:Team)
- (:Employee)-[:LEADS]->(:Team)
- (:Employee)-[:MANAGES]->(:Employee)
- (:Employee)-[:HAS_SKILL {proficiency, years}]->(:Skill)
- (:Employee)-[:LOCATED_IN]->(:Location)
- (:Employee)-[:CONTRIBUTES_TO {role, allocation}]->(:Project)
- (:Employee)-[:COLLABORATES_WITH {context}]->(:Employee)
- (:Employee)-[:MENTORS {since}]->(:Employee)
- (:Project)-[:OWNED_BY]->(:Team)
- (:Team)-[:BELONGS_TO]->(:Department)
- (:Department)-[:PART_OF]->(:Department)
**CRITICAL RULES**:
1. NEVER use LIMIT in Cypher queries unless explicitly requested
2. When asked about team members, ALWAYS return ALL members with BOTH name AND email
3. Always use COLLECT() to gather all results when multiple records are expected
4. Example query for team members: MATCH (e:Employee)-[:WORKS_IN]->(t:Team {name: 'Alpha Team'}) RETURN e.name AS name, e.email AS email
5. When providing results, format as a complete list with all names and emails
Use 'run_cypher_query' for all questions about employees, teams, skills, projects, or organizational structures.
When asked about any entity, ALWAYS return their full details as it is required to link data in other nodes or databases.""",
mcp_session=session,
mcp_tools=[t for t in team_tools if t]
)
# 2. Food Agent
# food_tools = [tools_map.get("get_food_preferences"), tools_map.get("run_sql_query")]
food_tools = [tools_map.get("run_sql_query")]
food_agent = Agent(
name="FoodAgent",
system_instruction="""You are a Food & Catering expert.
You have access to the food preferences database (Postgres).
**Schema - Tables**:
- cuisines (cuisine_id, cuisine_name, origin_country, description, is_popular)
- menu_items (item_id, item_name, cuisine_id, course_type, is_vegetarian, is_vegan, spice_level, calories, price_range, description, ingredients, preparation_time_mins, created_at)
- employee_food_preferences (preference_id, employee_email, item_id, preference_type, preference_level, notes, last_updated)
- dietary_restrictions (restriction_id, employee_email, restriction_type, severity, notes, certified_date)
- food_allergies (allergy_id, employee_email, allergen, severity, diagnosed_date, notes)
- venues (venue_id, venue_name, venue_type, city, country, address, latitude, longitude, phone, email, website, average_rating, price_range, seating_capacity, has_vegetarian_options, has_vegan_options, accepts_group_bookings, parking_available, outdoor_seating, cuisines_offered, opening_hours, created_at)
- venue_menu_items (venue_menu_id, venue_id, item_id, price, is_available, is_signature_dish)
- trip_bookings (booking_id, trip_name, destination_city, destination_country, start_date, end_date, total_participants, organizer_email, budget_per_person, accommodation_booked, transportation_booked, status, special_requirements, created_at, updated_at)
- trip_activities (activity_id, booking_id, activity_date, activity_time, activity_type, venue_id, activity_name, description, estimated_cost, duration_hours, booking_reference, status, notes)
**IMPORTANT NOTES**:
- Employee data is linked via 'employee_email' field which corresponds to Employee.email in Neo4j
- Use joins between employee_food_preferences, menu_items, and cuisines to find food preferences
- To find cuisines someone prefers: JOIN employee_food_preferences -> menu_items -> cuisines
- Do NOT use 'LIMIT' in your SQL queries unless the user explicitly asks for a specific number of results
- When asked to list items (e.g., cuisines, food types), list ALL of them found in the database
**CRITICAL - Data Matching Rules**:
- ALWAYS use ILIKE operator (case-insensitive) when matching any data fields
- NEVER use = or == for data comparisons
- Example: WHERE employee_email ILIKE 'sarah.williams@example.com' (NOT WHERE employee_email = 'Sarah.Williams@example.com')
- For name-based searches, use ILIKE with % wildcards: WHERE employee_email ILIKE '%sarah%'
- Some fields in the database are stored in lowercase and some fields may have both uppercase and lowercase letters.
Use 'run_sql_query' for questions about cuisines, menu items, food preferences, dietary restrictions, allergies, venues, and trip planning.""",
mcp_session=session,
mcp_tools=[t for t in food_tools if t]
)
# 3. Travel Agent
travel_tools = [tools_map.get("get_trip_recommendations"), tools_map.get("get_country_info")]
travel_agent = Agent(
name="TravelAgent",
system_instruction="""You are a Travel Planning expert with access to real-time data.
**Your Tools**:
1. **get_trip_recommendations**: Comprehensive trip planning with real-time weather data
- Parameters:
* location (required): City name (e.g., "Paris", "Mumbai", "Tokyo")
* start_date (optional): Trip start date in YYYY-MM-DD format
* end_date (optional): Trip end date in YYYY-MM-DD format
* num_people (optional): Number of travelers (default: 1)
* preferences (optional): Comma-separated preferences like "adventure, food, culture, relaxation, shopping, nature"
* budget_per_person (optional): Budget per person in local currency
- Provides:
* Real-time weather forecast (if OPENWEATHER_API_KEY is set)
* Preference-based recommendations (adventure, nature, food, culture, sightseeing, relaxation, shopping)
* Group travel tips for multiple travelers
* Budget-aware suggestions
* Weather-based packing advice
* External resource links (TripAdvisor, Google Maps, Booking.com, WikiTravel)
2. **get_country_info**: Get country details (currency, languages, capital, region)
**Strategy for Trip Planning**:
1. If user provides group details (team members), extract the count for num_people
2. If user mentions food preferences from team data, include them in preferences parameter
3. Always ask for or infer trip dates if not provided (tool auto-defaults to upcoming weekend)
4. Extract user preferences from their query (adventure, food, shopping, etc.)
5. If budget is mentioned, include it in the request
6. Parse the JSON response and present it in a user-friendly format
7. Highlight weather conditions and give practical advice
8. If planning for a team, emphasize group-friendly recommendations
**Example Scenarios**:
- "Plan a trip to Paris" → Use minimal parameters, let tool auto-fill dates
- "Plan a 5-day trip to Goa for 4 people who love adventure and food" → Use all relevant parameters
- "Weekend trip to Mumbai for my team" → Get team count from context, use appropriate preferences
Provide detailed, exciting, and practical travel plans based on real-time data.""",
mcp_session=session,
mcp_tools=[t for t in travel_tools if t]
)
# --- Define Orchestrator Tools (A2A) ---
async def ask_team_agent(query: str):
return await team_agent.chat(query)
async def ask_food_agent(query: str):
return await food_agent.chat(query)
async def ask_travel_agent(query: str):
return await travel_agent.chat(query)
orchestrator_local_tools = {
"ask_team_agent": {
"func": ask_team_agent,
"description": "Ask the Team Agent about employees, teams, hierarchy, or skills.",
"schema": {"type": content.Type.OBJECT, "properties": {"query": {"type": content.Type.STRING}}, "required": ["query"]}
},
"ask_food_agent": {
"func": ask_food_agent,
"description": "Ask the Food Agent about food preferences, cuisines, or dietary restrictions.",
"schema": {"type": content.Type.OBJECT, "properties": {"query": {"type": content.Type.STRING}}, "required": ["query"]}
},
"ask_travel_agent": {
"func": ask_travel_agent,
"description": "Ask the Travel Agent about destinations, itineraries, or country info.",
"schema": {"type": content.Type.OBJECT, "properties": {"query": {"type": content.Type.STRING}}, "required": ["query"]}
}
}
# 4. Orchestrator Agent
orchestrator = Agent(
name="Orchestrator",
system_instruction="""You are the Main Orchestrator for the Trip Planner application.
Your job is to help users plan trips by coordinating with specialized agents.
You have access to three specialized agents:
1. **Team Agent**: Knows about people, teams, and organization structure (Neo4j).
2. **Food Agent**: Knows about food preferences and menus (Postgres).
3. **Travel Agent**: Knows about destinations and travel advice.
**Strategy**:
- Analyze the user's request.
- Break it down into sub-tasks.
- Call the appropriate agent(s) to get the information.
- **CRITICAL**: When calling an agent, provide the FULL context of what you need.
- **ALWAYS request ALL members/items**, never just one or a sample.
- When you get results from Team Agent with multiple people, you MUST pass ALL of them to Food Agent.
- Synthesize the final answer from the agents' responses.
**Example Workflows**:
Example 1: "Which team members in Alpha Team are vegetarian?"
1. Call `ask_team_agent("Get ALL members of Alpha Team with their names and emails")`
2. Receive list: [(Alice, alice@...), (John, john@...), (Jane, jane@...), (Ravi, ravi@...)]
3. Call `ask_food_agent("Check dietary restrictions for ALL these emails: alice@..., john@..., jane@..., ravi@...")`
4. Present complete results for ALL team members
Example 2: "Plan a trip to Goa for Sarah's team and check their food preferences."
1. Call `ask_team_agent("Get ALL members in Sarah's team with emails")`
2. Call `ask_food_agent("Get complete food preferences for ALL members: [all emails from step 1]")`
3. Count the team members (e.g., 5 people)
4. Summarize preferences (e.g., "3 vegetarian, 1 vegan, preferences: seafood, Indian cuisine")
5. Call `ask_travel_agent("Plan a trip to Goa for 5 people with preferences: food, culture, relaxation. Consider vegetarian and seafood options.")`
6. Combine all information into a final response with weather forecast, activities, and dining suggestions
Example 3: "Plan a weekend adventure trip to Udaipur from Dec 20-25 for 4 friends, budget 5000 per person"
1. Call `ask_travel_agent("Plan a trip to Udaipur from 2025-12-20 to 2025-12-25 for 4 people with preferences: adventure, nature, food. Budget is 5000 per person.")`
2. Present the weather forecast, recommendations, and budget-conscious tips
**NEVER truncate or sample data - always process ALL results from agents.**
Always be helpful and professional.
""",
mcp_session=None, # Orchestrator doesn't call MCP tools directly, only other agents
mcp_tools=[],
local_tools=orchestrator_local_tools
)
print("\n--- A2A Chatbot Ready (Type 'quit' to exit) ---")
while True:
user_input = input("\nYou: ")
if user_input.lower() in ['quit', 'exit']:
break
print("Orchestrator is thinking...", end="", flush=True)
response = await orchestrator.chat(user_input)
print(f"\n\nOrchestrator: {response}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nGoodbye!")