"""Database layer for Travel Company MCP Server"""
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Optional
from pathlib import Path
class Database:
"""Main database connection manager"""
def __init__(self, db_path: str = "data/travel_company.db"):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
def initialize_schema(self):
"""Create database tables"""
cursor = self.conn.cursor()
# Customers table
cursor.execute("""
CREATE TABLE IF NOT EXISTS customers (
customer_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
phone TEXT,
address TEXT,
city TEXT,
state TEXT,
country TEXT,
registration_date DATE NOT NULL,
loyalty_tier TEXT CHECK(loyalty_tier IN ('Bronze', 'Silver', 'Gold', 'Platinum')),
preferences TEXT
)
""")
# Trips table
cursor.execute("""
CREATE TABLE IF NOT EXISTS trips (
trip_id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER NOT NULL,
destination TEXT NOT NULL,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
cost DECIMAL(10, 2) NOT NULL,
status TEXT CHECK(status IN ('completed', 'upcoming', 'cancelled')),
booking_date DATE NOT NULL,
num_travelers INTEGER NOT NULL DEFAULT 1,
trip_type TEXT,
notes TEXT,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
)
""")
# Requests table
cursor.execute("""
CREATE TABLE IF NOT EXISTS requests (
request_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
phone TEXT,
destination_interest TEXT,
travel_dates TEXT,
num_travelers INTEGER,
budget_range TEXT,
message TEXT,
request_date DATE NOT NULL,
status TEXT CHECK(status IN ('pending', 'contacted', 'converted', 'closed'))
)
""")
# Create indexes for performance
cursor.execute("CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(name)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trips_customer ON trips(customer_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trips_destination ON trips(destination)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trips_dates ON trips(start_date, end_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_requests_email ON requests(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_requests_status ON requests(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_requests_date ON requests(request_date)")
self.conn.commit()
def close(self):
"""Close database connection"""
self.conn.close()
class CustomerDB:
"""Customer data access layer"""
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
def search(self, query: str, search_by: str = "name") -> List[Dict[str, Any]]:
"""
Search for customers
Args:
query: Search query string
search_by: Field to search by (name, email, phone, customer_id)
"""
cursor = self.conn.cursor()
if search_by == "customer_id":
cursor.execute(
"SELECT * FROM customers WHERE customer_id = ? LIMIT 50",
(query,)
)
elif search_by == "email":
cursor.execute(
"SELECT * FROM customers WHERE email LIKE ? LIMIT 50",
(f"%{query}%",)
)
elif search_by == "phone":
cursor.execute(
"SELECT * FROM customers WHERE phone LIKE ? LIMIT 50",
(f"%{query}%",)
)
else: # name
cursor.execute(
"SELECT * FROM customers WHERE name LIKE ? LIMIT 50",
(f"%{query}%",)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_by_id(self, customer_id: int) -> Optional[Dict[str, Any]]:
"""Get customer by ID"""
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM customers WHERE customer_id = ?", (customer_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_profile(self, customer_id: int) -> Optional[Dict[str, Any]]:
"""Get customer profile with statistics"""
customer = self.get_by_id(customer_id)
if not customer:
return None
cursor = self.conn.cursor()
# Get trip statistics
cursor.execute("""
SELECT
COUNT(*) as total_trips,
SUM(cost) as lifetime_spending,
MAX(start_date) as last_trip_date
FROM trips
WHERE customer_id = ? AND status != 'cancelled'
""", (customer_id,))
stats = cursor.fetchone()
customer["statistics"] = {
"total_trips": stats["total_trips"] if stats else 0,
"lifetime_spending": float(stats["lifetime_spending"]) if stats and stats["lifetime_spending"] else 0.0,
"last_trip_date": stats["last_trip_date"] if stats else None
}
return customer
class TripDB:
"""Trip data access layer"""
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
def search(self, destination: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
status: Optional[str] = None,
limit: int = 50) -> List[Dict[str, Any]]:
"""
Search trips with filters
Args:
destination: Filter by destination (partial match)
start_date: Filter trips starting after this date
end_date: Filter trips ending before this date
status: Filter by status (completed, upcoming, cancelled)
limit: Maximum number of results
"""
cursor = self.conn.cursor()
query = "SELECT * FROM trips WHERE 1=1"
params = []
if destination:
query += " AND destination LIKE ?"
params.append(f"%{destination}%")
if start_date:
query += " AND start_date >= ?"
params.append(start_date)
if end_date:
query += " AND end_date <= ?"
params.append(end_date)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY start_date DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_by_customer(self, customer_id: int, limit: int = 50) -> List[Dict[str, Any]]:
"""Get trip history for a customer"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM trips
WHERE customer_id = ?
ORDER BY start_date DESC
LIMIT ?
""", (customer_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
class RequestDB:
"""Request data access layer"""
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
def search(self, email: Optional[str] = None,
destination: Optional[str] = None,
status: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 50) -> List[Dict[str, Any]]:
"""
Search information requests
Args:
email: Filter by email (partial match)
destination: Filter by destination interest
status: Filter by status
date_from: Filter requests from this date
date_to: Filter requests to this date
limit: Maximum number of results
"""
cursor = self.conn.cursor()
query = "SELECT * FROM requests WHERE 1=1"
params = []
if email:
query += " AND email LIKE ?"
params.append(f"%{email}%")
if destination:
query += " AND destination_interest LIKE ?"
params.append(f"%{destination}%")
if status:
query += " AND status = ?"
params.append(status)
if date_from:
query += " AND request_date >= ?"
params.append(date_from)
if date_to:
query += " AND request_date <= ?"
params.append(date_to)
query += " ORDER BY request_date DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_pending(self, days_back: int = 30) -> List[Dict[str, Any]]:
"""Get pending requests from the last N days"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM requests
WHERE status = 'pending'
AND request_date >= date('now', '-' || ? || ' days')
ORDER BY request_date DESC
""", (days_back,))
rows = cursor.fetchall()
return [dict(row) for row in rows]