from mcp.server.fastmcp import FastMCP
from db_utils import get_postgres_connection, get_neo4j_driver
import psycopg2.extras
import httpx
# Initialize the MCP Server
mcp = FastMCP("TripPlanner")
# @mcp.tool()
# def get_team_members(manager_name: str) -> list[str]:
# """
# Get the list of team members reporting to a specific manager from the Neo4j organization graph.
# """
# driver = get_neo4j_driver()
# query = """
# MATCH (m:Employee)-[:MANAGES]->(e:Employee)
# WHERE toLower(m.name) CONTAINS toLower($manager_name)
# RETURN e.name AS employee_name
# """
# try:
# with driver.session() as session:
# result = session.run(query, manager_name=manager_name)
# team = [record["employee_name"] for record in result]
# return team
# except Exception as e:
# return [f"Error querying Neo4j: {str(e)}"]
# finally:
# driver.close()
# @mcp.tool()
# def get_food_preferences(person_name: str) -> list[str]:
# """
# Get food preferences (dietary restrictions, favorites, allergies) for a person from Postgres.
# """
# conn = get_postgres_connection()
# try:
# with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# cur.execute(
# "SELECT preference_type, detail FROM food_preferences WHERE person_name ILIKE %s",
# (f"%{person_name}%",)
# )
# rows = cur.fetchall()
# if not rows:
# return [f"No preferences found for {person_name}"]
# prefs = [f"{row['preference_type']}: {row['detail']}" for row in rows]
# return prefs
# except Exception as e:
# return [f"Error querying Postgres: {str(e)}"]
# finally:
# conn.close()
@mcp.tool()
def get_trip_recommendations(
location: str,
start_date: str = None,
end_date: str = None,
num_people: int = 1,
preferences: str = None,
budget_per_person: float = None
) -> str:
"""
Get comprehensive travel recommendations for a specific location using real-time data.
Args:
location: City name (e.g., "Udaipur", "Paris", "Tokyo")
start_date: Trip start date in YYYY-MM-DD format (optional)
end_date: Trip end date in YYYY-MM-DD format (optional)
num_people: Number of travelers (default: 1)
preferences: User preferences like "adventure, food, culture, relaxation, shopping, nature" (optional)
budget_per_person: Budget per person in local currency (optional)
Returns:
Comprehensive trip plan with weather data, activities, and recommendations
"""
import json
from datetime import datetime, timedelta
# If preferences not provided, use default
if not preferences:
preferences = "sightseeing, culture, food"
# If dates not provided, use upcoming weekend
if not start_date:
today = datetime.now()
days_until_friday = (4 - today.weekday()) % 7
start_date = (today + timedelta(days=days_until_friday)).strftime("%Y-%m-%d")
if not end_date:
end_date = (datetime.strptime(start_date, "%Y-%m-%d") + timedelta(days=2)).strftime("%Y-%m-%d")
result = {
"location": location,
"trip_details": {
"start_date": start_date,
"end_date": end_date,
"duration_days": (datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 1,
"num_people": num_people,
"preferences": preferences.split(",") if isinstance(preferences, str) else preferences,
"budget_per_person": budget_per_person
},
"weather_forecast": None,
"recommendations": [],
"tips": []
}
try:
# Get weather data from OpenWeatherMap
# Note: User needs to set OPENWEATHER_API_KEY environment variable
# Get free API key from: https://openweathermap.org/api
import os
api_key = os.getenv("OPENWEATHER_API_KEY")
if api_key:
with httpx.Client(timeout=10.0) as client:
# Use the forecast API directly with city name - it will find coordinates automatically
weather_url = f"http://api.openweathermap.org/data/2.5/forecast?q={location}&appid={api_key}&units=metric"
weather_response = client.get(weather_url)
if weather_response.status_code == 200:
weather_data = weather_response.json()
# Extract city info (includes coordinates)
city_info = weather_data.get("city", {})
lat = city_info.get("coord", {}).get("lat")
lon = city_info.get("coord", {}).get("lon")
country = city_info.get("country")
city_name = city_info.get("name", location)
# Parse weather forecasts
forecasts = []
for item in weather_data.get("list", [])[:8]: # Next 24 hours
forecasts.append({
"datetime": item.get("dt_txt"),
"temp": f"{item['main']['temp']}°C",
"feels_like": f"{item['main']['feels_like']}°C",
"weather": item['weather'][0]['description'],
"humidity": f"{item['main']['humidity']}%",
"wind_speed": f"{item['wind']['speed']} m/s"
})
result["weather_forecast"] = {
"location": f"{city_name}, {country}" if country else city_name,
"coordinates": {"lat": lat, "lon": lon},
"forecasts": forecasts,
"summary": f"Weather in {city_name}: {forecasts[0]['weather']}, {forecasts[0]['temp']}"
}
elif weather_response.status_code == 404:
result["weather_forecast"] = {
"error": f"Could not find location: {location}",
"suggestion": "Try adding country name (e.g., 'Udaipur, India' or 'Paris, France')",
"status_code": 404
}
elif weather_response.status_code == 401:
result["weather_forecast"] = {
"error": "Invalid API key",
"message": "Please check your OPENWEATHER_API_KEY in .env file",
"status_code": 401
}
else:
result["weather_forecast"] = {
"error": f"Weather API error: {weather_response.status_code}",
"message": "Could not reach weather service"
}
else:
result["weather_forecast"] = {
"note": "Weather data unavailable. Set OPENWEATHER_API_KEY environment variable.",
"instructions": "Get free API key from https://openweathermap.org/api"
}
except Exception as e:
result["weather_forecast"] = {"error": f"Failed to fetch weather: {str(e)}"}
# Generate recommendations based on preferences
pref_list = [p.strip().lower() for p in preferences.split(",")] if isinstance(preferences, str) else []
recommendations = []
# General recommendations
recommendations.append({
"category": "Getting Started",
"suggestions": [
f"Research local customs and etiquette for {location}",
"Book accommodations in advance for better rates",
"Check visa requirements and travel insurance",
"Download offline maps and translation apps"
]
})
# Preference-based recommendations
if "adventure" in pref_list or "nature" in pref_list:
recommendations.append({
"category": "Adventure & Nature",
"suggestions": [
"Look for hiking trails and outdoor activities",
"Book adventure sports in advance (if available)",
"Check weather conditions for outdoor activities",
"Bring appropriate gear (hiking shoes, water bottles)"
]
})
if "food" in pref_list or "culture" in pref_list:
recommendations.append({
"category": "Food & Culture",
"suggestions": [
"Try local street food and traditional restaurants",
"Join food walking tours to discover hidden gems",
"Visit local markets and food halls",
"Check Tripadvisor or Google reviews for top-rated restaurants"
]
})
if "sightseeing" in pref_list or "culture" in pref_list:
recommendations.append({
"category": "Sightseeing & Culture",
"suggestions": [
"Book skip-the-line tickets for popular attractions",
"Consider guided tours for historical context",
"Visit museums on free entry days (if available)",
"Plan visits to main attractions early morning to avoid crowds"
]
})
if "relaxation" in pref_list:
recommendations.append({
"category": "Relaxation",
"suggestions": [
"Look for spa services and wellness centers",
"Find peaceful parks or waterfront areas",
"Book accommodations with relaxation amenities",
"Schedule downtime between activities"
]
})
if "shopping" in pref_list:
recommendations.append({
"category": "Shopping",
"suggestions": [
"Visit local markets for authentic souvenirs",
"Check opening hours of shopping districts",
"Research tax refund policies for tourists",
"Bring reusable bags for shopping"
]
})
result["recommendations"] = recommendations
# Budget tips
if budget_per_person:
result["tips"].append(f"With a budget of {budget_per_person} per person, prioritize free attractions and local eateries")
# Group travel tips
if num_people > 1:
result["tips"].extend([
f"Traveling with {num_people} people - consider group discounts for activities",
"Book group accommodations like vacation rentals for better value",
"Use group transportation (minibus/van) for cost efficiency"
])
# Weather-based tips
if result.get("weather_forecast") and "forecasts" in result["weather_forecast"]:
first_forecast = result["weather_forecast"]["forecasts"][0]
if "rain" in first_forecast["weather"].lower():
result["tips"].append("Pack rain gear and plan indoor activities")
temp = float(first_forecast["temp"].replace("°C", ""))
if temp > 30:
result["tips"].append("Hot weather expected - stay hydrated and use sun protection")
elif temp < 10:
result["tips"].append("Cold weather expected - pack warm clothing")
# External resources
result["useful_resources"] = {
"weather": f"https://openweathermap.org/city/{location.replace(' ', '-')}",
"tripadvisor": f"https://www.tripadvisor.com/Search?q={location.replace(' ', '+')}",
"google_maps": f"https://www.google.com/maps/search/?api=1&query={location.replace(' ', '+')}",
"booking": f"https://www.booking.com/searchresults.html?ss={location.replace(' ', '+')}",
"wikitravel": f"https://wikitravel.org/en/{location.replace(' ', '_')}"
}
return json.dumps(result, indent=2, default=str)
@mcp.tool()
def run_cypher_query(query: str) -> str:
"""
Execute a raw Cypher query against the Neo4j database.
Use this tool when you need to perform complex graph traversals or flexible queries.
Returns a JSON string with ALL matching records.
Schema / Nodes available:
- Employee (name, email, employee_id, role, level, joined_on, years_of_experience, phone)
- Team (name, description, formation_date, team_size)
- Project (name, status, description, start_date, end_date, budget, priority)
- Location (city, country, timezone, office_code)
- Skill (name, category, proficiency_levels)
- Department (name, description, budget, head_count)
Relationships:
- (:Employee)-[:WORKS_IN]->(:Team)
- (:Employee)-[:LEADS]->(:Team)
- (:Employee)-[:MANAGES]->(:Employee)
- (:Employee)-[:HAS_SKILL {proficiency, years}]->(:Skill)
- (:Employee)-[:LOCATED_IN]->(:Location)
- (:Employee)-[:CONTRIBUTES_TO {role, allocation}]->(:Project)
- (:Employee)-[:COLLABORATES_WITH {context}]->(:Employee)
- (:Employee)-[:MENTORS {since}]->(:Employee)
- (:Project)-[:OWNED_BY]->(:Team)
- (:Team)-[:BELONGS_TO]->(:Department)
- (:Department)-[:PART_OF]->(:Department)
IMPORTANT: This function returns ALL matching records. Do not add LIMIT unless explicitly needed.
"""
driver = get_neo4j_driver()
try:
with driver.session() as session:
result = session.run(query)
# Convert ALL Neo4j records to standard dicts for JSON serialization
records = []
for record in result:
records.append(dict(record))
# Log for debugging
print(f"[run_cypher_query] Query: {query}")
print(f"[run_cypher_query] Returned {len(records)} records")
if records:
print(f"[run_cypher_query] First record: {records[0]}")
if len(records) > 1:
print(f"[run_cypher_query] Last record: {records[-1]}")
# Return as JSON string to ensure all data is passed through
import json
result_json = json.dumps(records, default=str)
print(f"[run_cypher_query] JSON length: {len(result_json)} chars")
return result_json
except Exception as e:
error_msg = f"Error executing Cypher: {str(e)}"
print(f"[run_cypher_query] {error_msg}")
import json
return json.dumps([{"error": error_msg}])
finally:
driver.close()
import json
import sys
@mcp.tool()
def run_sql_query(query: str) -> str:
"""
Execute a raw SQL query against the Postgres database.
Use this tool for flexible relational data retrieval.
Returns a JSON string representation of the result.
Schema / Tables available:
- cuisines (cuisine_id, cuisine_name, origin_country, description, is_popular)
- menu_items (item_id, item_name, cuisine_id, course_type, is_vegetarian, is_vegan, spice_level, calories, price_range, description, ingredients, preparation_time_mins, created_at)
- employee_food_preferences (preference_id, employee_email, item_id, preference_type, preference_level, notes, last_updated)
- dietary_restrictions (restriction_id, employee_email, restriction_type, severity, notes, certified_date)
- food_allergies (allergy_id, employee_email, allergen, severity, diagnosed_date, notes)
- venues (venue_id, venue_name, venue_type, city, country, address, latitude, longitude, phone, email, website, average_rating, price_range, seating_capacity, has_vegetarian_options, has_vegan_options, accepts_group_bookings, parking_available, outdoor_seating, cuisines_offered, opening_hours, created_at)
- venue_menu_items (venue_menu_id, venue_id, item_id, price, is_available, is_signature_dish)
- trip_bookings (booking_id, trip_name, destination_city, destination_country, start_date, end_date, total_participants, organizer_email, budget_per_person, accommodation_booked, transportation_booked, status, special_requirements, created_at, updated_at)
- trip_activities (activity_id, booking_id, activity_date, activity_time, activity_type, venue_id, activity_name, description, estimated_cost, duration_hours, booking_reference, status, notes)
Note: employee_email fields link to Neo4j Employee.email for cross-database relationships.
"""
conn = get_postgres_connection()
try:
# Use RealDictCursor to get column names in the result
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(query)
if cur.description:
results = [dict(row) for row in cur.fetchall()]
return json.dumps(results, default=str)
else:
conn.commit()
return json.dumps([{"status": "success"}])
except Exception as e:
return json.dumps([{"error": f"Error executing SQL: {str(e)}"}])
finally:
conn.close()
@mcp.tool()
def get_country_info(country_name: str) -> dict:
"""
Fetch country information (currency, languages, etc.) from the public RestCountries API.
Useful for travel planning.
"""
# Using a public API for demonstration
url = f"https://restcountries.com/v3.1/name/{country_name}"
try:
with httpx.Client() as client:
response = client.get(url)
if response.status_code == 404:
return {"error": "Country not found"}
response.raise_for_status()
data = response.json()[0]
return {
"name": data.get("name", {}).get("common"),
"capital": data.get("capital", ["N/A"])[0],
"region": data.get("region"),
"currencies": list(data.get("currencies", {}).keys()),
"languages": list(data.get("languages", {}).values())
}
except Exception as e:
return {"error": f"Failed to fetch country info: {str(e)}"}
if __name__ == "__main__":
# This runs the server over stdio by default
mcp.run()