Skip to main content
Glama

Coconuts MCP Server

by tchardonnens
database.py11.8 kB
import sqlite3 import json from typing import List, Dict, Optional, Any from dataclasses import dataclass from datetime import datetime import os @dataclass class SavedPlace: """Data model for a Google Maps saved place""" id: Optional[int] = None name: str = "" address: str = "" latitude: float = 0.0 longitude: float = 0.0 place_id: Optional[str] = None types: List[str] = None rating: Optional[float] = None user_ratings_total: Optional[int] = None price_level: Optional[int] = None website: Optional[str] = None phone_number: Optional[str] = None opening_hours: Optional[Dict[str, Any]] = None photos: Optional[List[str]] = None notes: Optional[str] = None tags: Optional[List[str]] = None created_at: Optional[str] = None updated_at: Optional[str] = None def __post_init__(self): if self.types is None: self.types = [] if self.photos is None: self.photos = [] if self.tags is None: self.tags = [] if self.created_at is None: self.created_at = datetime.now().isoformat() if self.updated_at is None: self.updated_at = datetime.now().isoformat() class GoogleMapsDatabase: """SQLite database manager for Google Maps saved places""" def __init__(self, db_path: str = "data/google_maps_places.db"): self.db_path = db_path # Ensure the directory exists os.makedirs(os.path.dirname(db_path), exist_ok=True) self.init_database() def get_connection(self) -> sqlite3.Connection: """Get a database connection""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def init_database(self) -> None: """Initialize the database with the required tables""" with self.get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS saved_places ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, address TEXT NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, place_id TEXT UNIQUE, types TEXT, -- JSON array of place types rating REAL, user_ratings_total INTEGER, price_level INTEGER, website TEXT, phone_number TEXT, opening_hours TEXT, -- JSON object photos TEXT, -- JSON array of photo URLs notes TEXT, tags TEXT, -- JSON array of user tags created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Create indexes for better performance conn.execute("CREATE INDEX IF NOT EXISTS idx_place_id ON saved_places(place_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_location ON saved_places(latitude, longitude)") conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON saved_places(name)") conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON saved_places(created_at)") conn.commit() def save_place(self, place: SavedPlace) -> int: """Save a place to the database and return its ID""" place.updated_at = datetime.now().isoformat() with self.get_connection() as conn: cursor = conn.execute(""" INSERT OR REPLACE INTO saved_places (name, address, latitude, longitude, place_id, types, rating, user_ratings_total, price_level, website, phone_number, opening_hours, photos, notes, tags, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( place.name, place.address, place.latitude, place.longitude, place.place_id, json.dumps(place.types), place.rating, place.user_ratings_total, place.price_level, place.website, place.phone_number, json.dumps(place.opening_hours) if place.opening_hours else None, json.dumps(place.photos), place.notes, json.dumps(place.tags), place.created_at, place.updated_at )) conn.commit() return cursor.lastrowid def get_place(self, place_id: int) -> Optional[SavedPlace]: """Get a place by its database ID""" with self.get_connection() as conn: row = conn.execute("SELECT * FROM saved_places WHERE id = ?", (place_id,)).fetchone() if row: return self._row_to_place(row) return None def get_place_by_google_id(self, google_place_id: str) -> Optional[SavedPlace]: """Get a place by its Google Place ID""" with self.get_connection() as conn: row = conn.execute("SELECT * FROM saved_places WHERE place_id = ?", (google_place_id,)).fetchone() if row: return self._row_to_place(row) return None def get_all_places(self, limit: Optional[int] = None, offset: int = 0) -> List[SavedPlace]: """Get all saved places with optional pagination""" with self.get_connection() as conn: query = "SELECT * FROM saved_places ORDER BY created_at DESC" params = [] if limit: query += " LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(query, params).fetchall() return [self._row_to_place(row) for row in rows] def search_places(self, query: str, limit: Optional[int] = None) -> List[SavedPlace]: """Search places by name, address, or tags""" with self.get_connection() as conn: search_query = """ SELECT * FROM saved_places WHERE name LIKE ? OR address LIKE ? OR tags LIKE ? ORDER BY created_at DESC """ params = [f"%{query}%", f"%{query}%", f"%{query}%"] if limit: search_query += " LIMIT ?" params.append(limit) rows = conn.execute(search_query, params).fetchall() return [self._row_to_place(row) for row in rows] def get_places_by_location(self, latitude: float, longitude: float, radius_km: float = 10.0) -> List[SavedPlace]: """Get places within a radius of the given coordinates""" with self.get_connection() as conn: # Using Haversine formula for distance calculation query = """ SELECT *, (6371 * acos(cos(radians(?)) * cos(radians(latitude)) * cos(radians(longitude) - radians(?)) + sin(radians(?)) * sin(radians(latitude)))) AS distance FROM saved_places WHERE distance <= ? ORDER BY distance """ rows = conn.execute(query, (latitude, longitude, latitude, radius_km)).fetchall() return [self._row_to_place(row) for row in rows] def get_places_by_tag(self, tag: str) -> List[SavedPlace]: """Get all places with a specific tag""" with self.get_connection() as conn: rows = conn.execute( "SELECT * FROM saved_places WHERE tags LIKE ? ORDER BY created_at DESC", (f'%"{tag}"%',) ).fetchall() return [self._row_to_place(row) for row in rows] def update_place(self, place_id: int, **updates) -> bool: """Update a place with the given fields""" if not updates: return False place = self.get_place(place_id) if not place: return False # Update the place object for key, value in updates.items(): if hasattr(place, key): setattr(place, key, value) place.updated_at = datetime.now().isoformat() with self.get_connection() as conn: conn.execute(""" UPDATE saved_places SET name = ?, address = ?, latitude = ?, longitude = ?, place_id = ?, types = ?, rating = ?, user_ratings_total = ?, price_level = ?, website = ?, phone_number = ?, opening_hours = ?, photos = ?, notes = ?, tags = ?, updated_at = ? WHERE id = ? """, ( place.name, place.address, place.latitude, place.longitude, place.place_id, json.dumps(place.types), place.rating, place.user_ratings_total, place.price_level, place.website, place.phone_number, json.dumps(place.opening_hours) if place.opening_hours else None, json.dumps(place.photos), place.notes, json.dumps(place.tags), place.updated_at, place_id )) conn.commit() return True def delete_place(self, place_id: int) -> bool: """Delete a place by its ID""" with self.get_connection() as conn: cursor = conn.execute("DELETE FROM saved_places WHERE id = ?", (place_id,)) conn.commit() return cursor.rowcount > 0 def get_statistics(self) -> Dict[str, Any]: """Get database statistics""" with self.get_connection() as conn: total_places = conn.execute("SELECT COUNT(*) FROM saved_places").fetchone()[0] # Get most common tags all_tags = [] rows = conn.execute("SELECT tags FROM saved_places WHERE tags IS NOT NULL").fetchall() for row in rows: tags = json.loads(row['tags']) all_tags.extend(tags) tag_counts = {} for tag in all_tags: tag_counts[tag] = tag_counts.get(tag, 0) + 1 # Get places by rating avg_rating = conn.execute("SELECT AVG(rating) FROM saved_places WHERE rating IS NOT NULL").fetchone()[0] return { "total_places": total_places, "average_rating": round(avg_rating, 2) if avg_rating else None, "most_common_tags": sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10] } def _row_to_place(self, row: sqlite3.Row) -> SavedPlace: """Convert a database row to a SavedPlace object""" return SavedPlace( id=row['id'], name=row['name'], address=row['address'], latitude=row['latitude'], longitude=row['longitude'], place_id=row['place_id'], types=json.loads(row['types']) if row['types'] else [], rating=row['rating'], user_ratings_total=row['user_ratings_total'], price_level=row['price_level'], website=row['website'], phone_number=row['phone_number'], opening_hours=json.loads(row['opening_hours']) if row['opening_hours'] else None, photos=json.loads(row['photos']) if row['photos'] else [], notes=row['notes'], tags=json.loads(row['tags']) if row['tags'] else [], created_at=row['created_at'], updated_at=row['updated_at'] )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tchardonnens/coconuts-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server