import asyncio
import sys
import os
import json
from dotenv import load_dotenv
import google.generativeai as genai
from google.ai.generativelanguage_v1beta.types import content
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Load environment variables
load_dotenv()
# Configuration
SERVER_SCRIPT = os.path.join(os.path.dirname(__file__), "mcp_server.py")
HISTORY_FILE = "chat_history.json"
MODEL_NAME = "gemini-2.0-flash"
class TripChatbot:
def __init__(self):
self.api_key = os.getenv("GOOGLE_API_KEY")
if not self.api_key:
print("Error: GOOGLE_API_KEY not found in .env file.")
print("Please create a .env file with your API key.")
sys.exit(1)
genai.configure(api_key=self.api_key)
self.model = None
self.chat = None
self.history = self.load_history()
def load_history(self):
# For simplicity in this PoC with Gemini, we'll start fresh or implement simple persistence
# Gemini SDK manages history in the ChatSession object, but we can serialize it if needed.
# Here we will just return an empty list for the SDK to manage,
# or we could reconstruct the history object.
return []
def _transform_schema(self, schema):
"""Recursively transform JSON schema to Gemini Schema."""
if not isinstance(schema, dict):
return schema
new_schema = {}
for k, v in schema.items():
if k == 'title':
continue
elif k == 'type':
if isinstance(v, str):
# Map string types to Gemini Type enum
type_map = {
'string': content.Type.STRING,
'number': content.Type.NUMBER,
'integer': content.Type.INTEGER,
'boolean': content.Type.BOOLEAN,
'array': content.Type.ARRAY,
'object': content.Type.OBJECT
}
new_schema[k] = type_map.get(v.lower(), content.Type.TYPE_UNSPECIFIED)
else:
new_schema[k] = v
elif k == 'properties':
new_schema[k] = {pk: self._transform_schema(pv) for pk, pv in v.items()}
elif k == 'items':
new_schema[k] = self._transform_schema(v)
else:
new_schema[k] = v
return new_schema
def mcp_tool_to_gemini(self, tool):
# Transform the schema to match Gemini's requirements
schema = self._transform_schema(tool.inputSchema)
return content.FunctionDeclaration(
name=tool.name,
description=tool.description,
parameters=schema
)
async def run(self):
# Start MCP Server
server_params = StdioServerParameters(
command=sys.executable,
args=[SERVER_SCRIPT],
env=None
)
print("Starting Trip Planner Chatbot (Gemini Powered)...")
print("Connecting to MCP Server...")
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 1. Discover Tools
mcp_tools = await session.list_tools()
gemini_tools = [self.mcp_tool_to_gemini(t) for t in mcp_tools.tools]
# Initialize Gemini Model with Tools
system_instruction = """
You are a helpful assistant for a Trip Planner application.
You have access to two databases and external travel APIs:
1. Neo4j (Graph Database): Contains organizational data (Employees, Teams, Projects, Skills) and their relationships.
2. Postgres (Relational Database): Contains food preferences, food catalog, trip bookings, venues, and detailed team member records.
3. External APIs: Real-time weather data and country information.
**Available Tools**:
- **run_cypher_query**: Execute Cypher queries against Neo4j for organizational data
- **run_sql_query**: Execute SQL queries against Postgres for food/venue/trip data
- **get_trip_recommendations**: Comprehensive trip planning with real-time weather
* Parameters: location (required), start_date, end_date, num_people, preferences, budget_per_person
* Preferences: "adventure, food, culture, relaxation, shopping, nature" (comma-separated)
* Returns: Weather forecast, recommendations, tips, external resource links
- **get_country_info**: Get country details (currency, languages, capital)
**Trip Planning Strategy**:
1. Extract trip parameters from user query:
- Location (required)
- Dates (format: YYYY-MM-DD) or default to upcoming weekend
- Number of people (count from team data if applicable)
- Preferences (adventure, food, culture, etc.)
- Budget per person (if mentioned)
2. For team trips:
- Use run_cypher_query to get team members count
- Use run_sql_query to get food preferences/restrictions
- Incorporate dietary needs into preferences
- Pass complete info to get_trip_recommendations
3. Present results in a user-friendly format:
- Highlight weather conditions and practical advice
- Summarize recommendations by category
- Mention external resources for more details
- Give packing tips based on weather
**Database Query Guidelines**:
1. **Fuzzy Matching**: When searching for names (people, teams, etc.), always use case-insensitive partial matching (e.g., ILIKE in SQL, CONTAINS in Cypher).
2. **Complete Lists**: When the user asks for a list, retrieve ALL matching records (no LIMIT unless specified).
3. Choose the right tool based on what data is needed:
- Organizational structure/relationships → run_cypher_query
- Food preferences/venues/bookings → run_sql_query
- Travel planning with weather → get_trip_recommendations
- Country details → get_country_info
Always be helpful, provide complete information, and give practical travel advice.
"""
self.model = genai.GenerativeModel(
model_name=MODEL_NAME,
tools=[content.Tool(function_declarations=gemini_tools)],
system_instruction=system_instruction
)
self.chat = self.model.start_chat(enable_automatic_function_calling=False) # We handle execution manually to bridge to MCP
print(f"Loaded {len(gemini_tools)} tools: {[t.name for t in gemini_tools]}")
print("\n--- Chat Session Started (Type 'quit' to exit) ---")
# 2. Chat Loop
while True:
user_input = input("\nYou: ")
if user_input.lower() in ['quit', 'exit']:
break
# Process with LLM
await self.process_turn(session, user_input)
async def process_turn(self, session, user_input):
print("Assistant is thinking...", end="", flush=True)
# Send message to Gemini
response = await self.chat.send_message_async(user_input)
# Loop to handle tool calls
while True:
# Check if there are any candidates
if not response.candidates:
print("\nError: No candidates returned from Gemini.")
break
part = response.candidates[0].content.parts[0]
# Check if it's a function call
if part.function_call:
# Collect all function calls in this turn
function_responses = []
# Iterate through all parts to handle multiple tool calls if present
# (Though typically Gemini 1.5/2.0 might return one or multiple, we should handle the list)
for part in response.candidates[0].content.parts:
if not part.function_call:
continue
fc = part.function_call
tool_name = fc.name
tool_args = dict(fc.args)
print(f"\n[Tool Call] {tool_name}({tool_args})")
# Execute tool via MCP
try:
result = await session.call_tool(tool_name, arguments=tool_args)
if result.content:
tool_output = result.content[0].text
else:
tool_output = "No content returned from tool."
except Exception as e:
tool_output = f"Error: {str(e)}"
print(f"[Tool Result] {tool_output[:100]}...")
# Construct FunctionResponse for this specific call
function_responses.append(content.Part(
function_response=content.FunctionResponse(
name=tool_name,
response={"result": tool_output}
)
))
# Send ALL results back to Gemini in a single message
if function_responses:
print("Assistant is thinking...", end="", flush=True)
response = await self.chat.send_message_async(function_responses)
else:
# Should not happen if part.function_call was true, but safety check
break
else:
# Final text response
print("\n")
print(f"Assistant: {response.text}")
break
if __name__ == "__main__":
try:
asyncio.run(TripChatbot().run())
except KeyboardInterrupt:
print("\nGoodbye!")