Skip to main content
Glama
chatbot.py10.7 kB
import asyncio import sys import os import json from dotenv import load_dotenv import google.generativeai as genai from google.ai.generativelanguage_v1beta.types import content from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Load environment variables load_dotenv() # Configuration SERVER_SCRIPT = os.path.join(os.path.dirname(__file__), "mcp_server.py") HISTORY_FILE = "chat_history.json" MODEL_NAME = "gemini-2.0-flash" class TripChatbot: def __init__(self): self.api_key = os.getenv("GOOGLE_API_KEY") if not self.api_key: print("Error: GOOGLE_API_KEY not found in .env file.") print("Please create a .env file with your API key.") sys.exit(1) genai.configure(api_key=self.api_key) self.model = None self.chat = None self.history = self.load_history() def load_history(self): # For simplicity in this PoC with Gemini, we'll start fresh or implement simple persistence # Gemini SDK manages history in the ChatSession object, but we can serialize it if needed. # Here we will just return an empty list for the SDK to manage, # or we could reconstruct the history object. return [] def _transform_schema(self, schema): """Recursively transform JSON schema to Gemini Schema.""" if not isinstance(schema, dict): return schema new_schema = {} for k, v in schema.items(): if k == 'title': continue elif k == 'type': if isinstance(v, str): # Map string types to Gemini Type enum type_map = { 'string': content.Type.STRING, 'number': content.Type.NUMBER, 'integer': content.Type.INTEGER, 'boolean': content.Type.BOOLEAN, 'array': content.Type.ARRAY, 'object': content.Type.OBJECT } new_schema[k] = type_map.get(v.lower(), content.Type.TYPE_UNSPECIFIED) else: new_schema[k] = v elif k == 'properties': new_schema[k] = {pk: self._transform_schema(pv) for pk, pv in v.items()} elif k == 'items': new_schema[k] = self._transform_schema(v) else: new_schema[k] = v return new_schema def mcp_tool_to_gemini(self, tool): # Transform the schema to match Gemini's requirements schema = self._transform_schema(tool.inputSchema) return content.FunctionDeclaration( name=tool.name, description=tool.description, parameters=schema ) async def run(self): # Start MCP Server server_params = StdioServerParameters( command=sys.executable, args=[SERVER_SCRIPT], env=None ) print("Starting Trip Planner Chatbot (Gemini Powered)...") print("Connecting to MCP Server...") async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # 1. Discover Tools mcp_tools = await session.list_tools() gemini_tools = [self.mcp_tool_to_gemini(t) for t in mcp_tools.tools] # Initialize Gemini Model with Tools system_instruction = """ You are a helpful assistant for a Trip Planner application. You have access to two databases and external travel APIs: 1. Neo4j (Graph Database): Contains organizational data (Employees, Teams, Projects, Skills) and their relationships. 2. Postgres (Relational Database): Contains food preferences, food catalog, trip bookings, venues, and detailed team member records. 3. External APIs: Real-time weather data and country information. **Available Tools**: - **run_cypher_query**: Execute Cypher queries against Neo4j for organizational data - **run_sql_query**: Execute SQL queries against Postgres for food/venue/trip data - **get_trip_recommendations**: Comprehensive trip planning with real-time weather * Parameters: location (required), start_date, end_date, num_people, preferences, budget_per_person * Preferences: "adventure, food, culture, relaxation, shopping, nature" (comma-separated) * Returns: Weather forecast, recommendations, tips, external resource links - **get_country_info**: Get country details (currency, languages, capital) **Trip Planning Strategy**: 1. Extract trip parameters from user query: - Location (required) - Dates (format: YYYY-MM-DD) or default to upcoming weekend - Number of people (count from team data if applicable) - Preferences (adventure, food, culture, etc.) - Budget per person (if mentioned) 2. For team trips: - Use run_cypher_query to get team members count - Use run_sql_query to get food preferences/restrictions - Incorporate dietary needs into preferences - Pass complete info to get_trip_recommendations 3. Present results in a user-friendly format: - Highlight weather conditions and practical advice - Summarize recommendations by category - Mention external resources for more details - Give packing tips based on weather **Database Query Guidelines**: 1. **Fuzzy Matching**: When searching for names (people, teams, etc.), always use case-insensitive partial matching (e.g., ILIKE in SQL, CONTAINS in Cypher). 2. **Complete Lists**: When the user asks for a list, retrieve ALL matching records (no LIMIT unless specified). 3. Choose the right tool based on what data is needed: - Organizational structure/relationships → run_cypher_query - Food preferences/venues/bookings → run_sql_query - Travel planning with weather → get_trip_recommendations - Country details → get_country_info Always be helpful, provide complete information, and give practical travel advice. """ self.model = genai.GenerativeModel( model_name=MODEL_NAME, tools=[content.Tool(function_declarations=gemini_tools)], system_instruction=system_instruction ) self.chat = self.model.start_chat(enable_automatic_function_calling=False) # We handle execution manually to bridge to MCP print(f"Loaded {len(gemini_tools)} tools: {[t.name for t in gemini_tools]}") print("\n--- Chat Session Started (Type 'quit' to exit) ---") # 2. Chat Loop while True: user_input = input("\nYou: ") if user_input.lower() in ['quit', 'exit']: break # Process with LLM await self.process_turn(session, user_input) async def process_turn(self, session, user_input): print("Assistant is thinking...", end="", flush=True) # Send message to Gemini response = await self.chat.send_message_async(user_input) # Loop to handle tool calls while True: # Check if there are any candidates if not response.candidates: print("\nError: No candidates returned from Gemini.") break part = response.candidates[0].content.parts[0] # Check if it's a function call if part.function_call: # Collect all function calls in this turn function_responses = [] # Iterate through all parts to handle multiple tool calls if present # (Though typically Gemini 1.5/2.0 might return one or multiple, we should handle the list) for part in response.candidates[0].content.parts: if not part.function_call: continue fc = part.function_call tool_name = fc.name tool_args = dict(fc.args) print(f"\n[Tool Call] {tool_name}({tool_args})") # Execute tool via MCP try: result = await session.call_tool(tool_name, arguments=tool_args) if result.content: tool_output = result.content[0].text else: tool_output = "No content returned from tool." except Exception as e: tool_output = f"Error: {str(e)}" print(f"[Tool Result] {tool_output[:100]}...") # Construct FunctionResponse for this specific call function_responses.append(content.Part( function_response=content.FunctionResponse( name=tool_name, response={"result": tool_output} ) )) # Send ALL results back to Gemini in a single message if function_responses: print("Assistant is thinking...", end="", flush=True) response = await self.chat.send_message_async(function_responses) else: # Should not happen if part.function_call was true, but safety check break else: # Final text response print("\n") print(f"Assistant: {response.text}") break if __name__ == "__main__": try: asyncio.run(TripChatbot().run()) except KeyboardInterrupt: print("\nGoodbye!")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SUSHRUTH3002/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server