Skip to main content
Glama
talknerdytome-labs

Google Sheets Analytics MCP

mcp_server.py63.3 kB
#!/usr/bin/env python3 """TNTM Google Sheets Analytics MCP Server - Analyze Google Sheets data with natural language queries""" import asyncio import json import sys import os import sqlite3 import re import hashlib import time import logging import gc from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List, Dict, Any # Set up paths SCRIPT_DIR = Path(__file__).parent.absolute() PROJECT_ROOT = SCRIPT_DIR.parent # Go up one level to project root os.chdir(PROJECT_ROOT) # MCP imports from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.types import Tool, TextContent import mcp.server.stdio # Google imports from googleapiclient.discovery import build from google.oauth2.credentials import Credentials # Constants SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly'] MAX_ROWS_PER_SYNC = 100000 # Default limit - balances completeness with performance # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create server with TNTM branding app = Server( "tntm-sheets-server", version="1.0.0" ) class GoogleSheetsService: def __init__(self): self.db_path = PROJECT_ROOT / 'data' / 'sheets_data.sqlite' self.db_path.parent.mkdir(exist_ok=True) self._init_database() # Rate limiting and throttling self.api_calls = [] # Track API call timestamps self.max_calls_per_minute = 50 # Conservative limit self.min_delay_between_calls = 0.6 # 600ms between calls self.last_api_call = 0 # Change debouncing self.pending_changes = {} # spreadsheet_id -> last_change_time self.debounce_seconds = 5 # Wait 5 seconds after last change before syncing # Cache management self.cache_ttl_seconds = 300 # 5 minutes default cache TTL self.force_refresh_threshold = 86400 # Force refresh after 24 hours def _init_database(self): """Initialize SQLite database with metadata table and optimal settings for large datasets""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: # Configure SQLite for better concurrency and performance with 1M+ rows conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for better concurrency conn.execute("PRAGMA synchronous=NORMAL") # Balance between safety and performance conn.execute("PRAGMA cache_size=100000") # Increased cache for large datasets (100K pages) conn.execute("PRAGMA temp_store=memory") # Store temp tables in memory conn.execute("PRAGMA mmap_size=1073741824") # 1GB memory map for large datasets conn.execute("PRAGMA busy_timeout=10000") # 10 second timeout for better concurrency # Metadata table to track synced sheets conn.execute(""" CREATE TABLE IF NOT EXISTS _sheet_metadata ( spreadsheet_id TEXT, spreadsheet_title TEXT, sheet_name TEXT, table_name TEXT, row_count INTEGER, column_count INTEGER, last_modified TIMESTAMP, content_hash TEXT, sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (spreadsheet_id, sheet_name) ) """) # Add indexes for commonly queried columns conn.execute(""" CREATE INDEX IF NOT EXISTS idx_metadata_spreadsheet ON _sheet_metadata(spreadsheet_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_metadata_sync_time ON _sheet_metadata(sync_time) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_metadata_table_name ON _sheet_metadata(table_name) """) def _get_db_connection(self) -> sqlite3.Connection: """Get a database connection with optimal settings for large datasets""" conn = sqlite3.connect(self.db_path, timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA cache_size=100000") conn.execute("PRAGMA temp_store=memory") conn.execute("PRAGMA mmap_size=1073741824") conn.execute("PRAGMA busy_timeout=10000") return conn def get_credentials(self) -> Optional[Credentials]: """Get Google credentials""" token_path = PROJECT_ROOT / 'data' / 'token.json' if not token_path.exists(): return None try: creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) # Refresh token if expired if creds and creds.expired and creds.refresh_token: from google.auth.transport.requests import Request creds.refresh(Request()) # Save the refreshed token with open(token_path, 'w') as token: token.write(creds.to_json()) return creds if creds and creds.valid else None except Exception as e: print(f"Error loading credentials: {e}") return None def extract_spreadsheet_id(self, url: str) -> Optional[str]: """Extract spreadsheet ID from URL""" patterns = [ r'https://docs\.google\.com/spreadsheets/d/([a-zA-Z0-9-_]+)', r'^([a-zA-Z0-9-_]+)$' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def _calculate_content_hash(self, values: List[List[str]]) -> str: """Calculate hash of sheet content for change detection""" content_str = json.dumps(values, sort_keys=True) return hashlib.md5(content_str.encode()).hexdigest() def _calculate_content_hash_streaming(self, chunks) -> str: """Calculate hash progressively for large datasets""" hasher = hashlib.md5() row_count = 0 for chunk in chunks: # For very large datasets, sample every 10th row after first 1000 for i, row in enumerate(chunk): if row_count < 1000 or row_count % 10 == 0: hasher.update(json.dumps(row, sort_keys=True).encode()) row_count += 1 return hasher.hexdigest() async def _fetch_sheet_chunked(self, sheets_service, spreadsheet_id: str, sheet_name: str, total_rows: int, chunk_size: int = 50000): """Fetch sheet data in chunks for large datasets""" chunks_fetched = 0 row_offset = 0 # Determine actual data range (columns) await self._wait_for_rate_limit() self._record_api_call() # Get first row to determine column range sample_range = f"'{sheet_name}'!1:1" result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=sample_range ).execute() first_row = result.get('values', [[]])[0] if result.get('values') else [] if not first_row: return # Calculate actual column range (A to last column with data) last_col = self._number_to_column(len(first_row)) while row_offset < total_rows: # Calculate chunk range start_row = row_offset + 1 end_row = min(row_offset + chunk_size, total_rows) range_name = f"'{sheet_name}'!A{start_row}:{last_col}{end_row}" # Fetch chunk with rate limiting await self._wait_for_rate_limit() self._record_api_call() chunk_result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() chunk_data = chunk_result.get('values', []) if chunk_data: # Include headers in first chunk if chunks_fetched == 0 and row_offset > 0: chunk_data = [first_row] + chunk_data yield chunk_data chunks_fetched += 1 row_offset = end_row # Progress callback placeholder progress = (row_offset / total_rows) * 100 logger.info(f"Fetched {row_offset}/{total_rows} rows ({progress:.1f}%)") def _number_to_column(self, n: int) -> str: """Convert column number to letter (1=A, 26=Z, 27=AA, etc.)""" result = "" while n > 0: n -= 1 result = chr(n % 26 + ord('A')) + result n //= 26 return result def _get_sheet_changes(self, cursor: sqlite3.Cursor, spreadsheet_id: str, sheet_name: str, values: List[List[str]]) -> Dict[str, Any]: """Check if sheet has changed since last sync""" current_hash = self._calculate_content_hash(values) current_rows = len(values) - 1 if values else 0 current_cols = len(values[0]) if values else 0 cursor.execute(""" SELECT content_hash, row_count, column_count, sync_time FROM _sheet_metadata WHERE spreadsheet_id = ? AND sheet_name = ? """, (spreadsheet_id, sheet_name)) result = cursor.fetchone() if not result: return { "is_new": True, "has_changes": True, "changes": ["New sheet - first sync"] } old_hash, old_rows, old_cols, last_sync = result changes = [] if current_hash != old_hash: changes.append("Content modified") if current_rows != old_rows: changes.append(f"Row count changed: {old_rows} → {current_rows}") if current_cols != old_cols: changes.append(f"Column count changed: {old_cols} → {current_cols}") return { "is_new": False, "has_changes": len(changes) > 0, "changes": changes, "last_sync": last_sync } def _check_rate_limit(self) -> bool: """Check if we're within rate limits""" now = time.time() # Remove old API calls (older than 1 minute) self.api_calls = [call_time for call_time in self.api_calls if now - call_time < 60] # Check if we're at the limit if len(self.api_calls) >= self.max_calls_per_minute: return False # Check minimum delay between calls if now - self.last_api_call < self.min_delay_between_calls: return False return True async def _wait_for_rate_limit(self): """Wait until we can make another API call""" while not self._check_rate_limit(): now = time.time() # Calculate wait time wait_time = max( self.min_delay_between_calls - (now - self.last_api_call), 0.1 # Minimum 100ms wait ) await asyncio.sleep(wait_time) def _record_api_call(self): """Record an API call for rate limiting""" now = time.time() self.api_calls.append(now) self.last_api_call = now # Prevent memory leak: keep only recent API calls (last 2 minutes) self.api_calls = [call_time for call_time in self.api_calls if now - call_time < 120] def cleanup(self): """Clean up service state (call at end of tool execution)""" # Clear rate limiting state to prevent cross-session contamination self.api_calls.clear() self.last_api_call = 0 # Clear pending changes state self.pending_changes.clear() def _should_debounce(self, spreadsheet_id: str) -> bool: """Check if we should wait before syncing due to recent changes""" if spreadsheet_id not in self.pending_changes: return False time_since_change = time.time() - self.pending_changes[spreadsheet_id] return time_since_change < self.debounce_seconds def _mark_change_pending(self, spreadsheet_id: str): """Mark that a spreadsheet has pending changes""" self.pending_changes[spreadsheet_id] = time.time() def _is_cache_stale(self, cursor: sqlite3.Cursor, spreadsheet_id: str, sheet_name: str) -> Dict[str, Any]: """Check if cached data is stale and needs refresh""" # Get metadata for cache validation cursor.execute(""" SELECT sync_time, table_name, row_count FROM _sheet_metadata WHERE spreadsheet_id = ? AND sheet_name = ? """, (spreadsheet_id, sheet_name)) result = cursor.fetchone() if not result: return { "is_stale": True, "reason": "no_metadata", "action": "full_sync" } sync_time_str, table_name, expected_rows = result # Parse sync time try: sync_time = datetime.fromisoformat(sync_time_str.replace('Z', '+00:00')) age_seconds = (datetime.now() - sync_time.replace(tzinfo=None)).total_seconds() except: return { "is_stale": True, "reason": "invalid_sync_time", "action": "full_sync" } # Check if cache is too old if age_seconds > self.force_refresh_threshold: return { "is_stale": True, "reason": "forced_refresh_threshold", "age_hours": age_seconds / 3600, "action": "full_sync" } # Check if cache TTL exceeded if age_seconds > self.cache_ttl_seconds: return { "is_stale": True, "reason": "cache_ttl_exceeded", "age_minutes": age_seconds / 60, "action": "change_check" } # Verify table actually exists and has expected data try: cursor.execute(f"SELECT COUNT(*) FROM {table_name}") actual_rows = cursor.fetchone()[0] if actual_rows != expected_rows: return { "is_stale": True, "reason": "row_count_mismatch", "expected": expected_rows, "actual": actual_rows, "action": "full_sync" } except sqlite3.OperationalError: return { "is_stale": True, "reason": "table_missing", "action": "full_sync" } return { "is_stale": False, "reason": "cache_valid", "age_minutes": age_seconds / 60, "action": "use_cache" } def _should_force_refresh(self, cache_status: Dict[str, Any]) -> bool: """Determine if we should force refresh regardless of change detection""" return cache_status["action"] in ["full_sync"] def _get_cache_strategy(self, cursor: sqlite3.Cursor, spreadsheet_id: str, sheet_name: str) -> Dict[str, Any]: """Get comprehensive caching strategy for a sheet""" cache_status = self._is_cache_stale(cursor, spreadsheet_id, sheet_name) # Check for pending changes has_pending_changes = spreadsheet_id in self.pending_changes should_debounce = self._should_debounce(spreadsheet_id) strategy = { "cache_status": cache_status, "has_pending_changes": has_pending_changes, "should_debounce": should_debounce, "recommended_action": cache_status["action"] } # Override action based on pending changes if has_pending_changes and not should_debounce: strategy["recommended_action"] = "change_check" elif should_debounce: strategy["recommended_action"] = "wait_for_debounce" return strategy # Service instances will be created per tool call to prevent session state issues @app.list_tools() async def handle_list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="smart_sync", description="Sync Google Sheet data intelligently with row limits", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "description": "Google Sheets URL" }, "max_rows": { "type": "integer", "description": "Maximum rows to sync per sheet (default: 100000)", "default": 100000 }, "sheets": { "type": "array", "items": {"type": "string"}, "description": "Specific sheets to sync (optional, syncs all if not provided)" } }, "required": ["url"] } ), Tool( name="query_sheets", description="Query synced sheet data using SQL (supports JOINs across tabs)", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "SQL query to run" } }, "required": ["query"] } ), Tool( name="list_synced_sheets", description="List all synced sheets and their tables", inputSchema={ "type": "object", "properties": {} } ), Tool( name="analyze_sheets", description="Analyze relationships between sheets and suggest queries", inputSchema={ "type": "object", "properties": { "question": { "type": "string", "description": "What you want to analyze across sheets" } }, "required": ["question"] } ), Tool( name="get_sheet_preview", description="Preview any sheet without syncing", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "description": "Google Sheets URL" }, "sheet_name": { "type": "string", "description": "Specific sheet to preview (optional)" }, "rows": { "type": "integer", "description": "Number of rows to preview (default: 10)", "default": 10 } }, "required": ["url"] } ), Tool( name="check_sheet_changes", description="Check if synced sheets have been modified and need re-syncing", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "description": "Google Sheets URL to check (optional - checks all if not provided)" }, "auto_sync": { "type": "boolean", "description": "Automatically sync changed sheets (default: false)", "default": False }, "batch_size": { "type": "integer", "description": "Number of sheets to check per batch (default: 5)", "default": 5 } } } ), Tool( name="batch_sync_changes", description="Sync multiple changed sheets with rate limiting and progress tracking", inputSchema={ "type": "object", "properties": { "max_sheets": { "type": "integer", "description": "Maximum sheets to sync in one batch (default: 10)", "default": 10 }, "delay_between_sheets": { "type": "number", "description": "Seconds to wait between sheet syncs (default: 1.0)", "default": 1.0 } } } ) ] @app.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" if name == "smart_sync": # Create fresh service instance for this tool call service = GoogleSheetsService() url = arguments.get("url") max_rows = arguments.get("max_rows", MAX_ROWS_PER_SYNC) target_sheets = arguments.get("sheets", []) # Enhanced input validation if not url or not isinstance(url, str) or not url.strip(): return [TextContent(type="text", text=json.dumps({ "error": "Google Sheets URL is required", "details": "Please provide a valid Google Sheets URL", "examples": [ "https://docs.google.com/spreadsheets/d/1ABC123.../edit", "1ABC123DEF456..." ], "tip": "Copy the URL from your browser when viewing the Google Sheet" }))] url = url.strip() # Clean the URL spreadsheet_id = service.extract_spreadsheet_id(url) if not spreadsheet_id: return [TextContent(type="text", text=json.dumps({ "error": "Invalid Google Sheets URL format", "details": "Could not extract spreadsheet ID from the provided URL", "examples": [ "https://docs.google.com/spreadsheets/d/1ABC123.../edit", "1ABC123DEF456..." ], "tip": "Make sure to copy the complete URL from your browser" }))] creds = service.get_credentials() if not creds: error_msg = { "error": "Authentication failed", "details": "No valid credentials found. Please ensure token.json exists in the data/ directory.", "token_path": str(PROJECT_ROOT / 'data' / 'token.json'), "token_exists": (PROJECT_ROOT / 'data' / 'token.json').exists() } return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] conn = None try: # Check if we should debounce this sync if service._should_debounce(spreadsheet_id): return [TextContent(type="text", text=json.dumps({ "status": "debounced", "message": f"Recent changes detected. Please wait {service.debounce_seconds} seconds before syncing.", "spreadsheet_id": spreadsheet_id }))] # Wait for rate limit if needed await service._wait_for_rate_limit() sheets_service = build('sheets', 'v4', credentials=creds) # Get spreadsheet metadata service._record_api_call() spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() title = spreadsheet['properties']['title'] sheets = spreadsheet['sheets'] # Connect to database with optimal settings conn = service._get_db_connection() cursor = conn.cursor() synced_sheets = [] total_rows = 0 for sheet in sheets: sheet_title = sheet['properties']['title'] # Skip if specific sheets requested and this isn't one if target_sheets and sheet_title not in target_sheets: continue # Create safe table name safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', sheet_title.lower()) safe_name = f"sheet_{safe_name}" if safe_name[0].isdigit() else safe_name # Get sheet dimensions to determine if chunking is needed sheet_properties = sheet['properties'] grid_properties = sheet_properties.get('gridProperties', {}) sheet_rows = grid_properties.get('rowCount', 0) sheet_cols = grid_properties.get('columnCount', 0) # Determine actual data rows (may be less than sheet dimensions) actual_rows = min(sheet_rows, max_rows) # Decide whether to use chunked fetching use_chunking = actual_rows > 10000 chunk_size = 50000 if actual_rows > 100000 else 10000 values = [] headers = None if use_chunking and actual_rows > 0: # Use chunked fetching for large sheets logger.info(f"Using chunked fetching for {sheet_title}: {actual_rows} rows") async for chunk in service._fetch_sheet_chunked( sheets_service, spreadsheet_id, sheet_title, actual_rows, chunk_size ): if not headers: headers = chunk[0] if chunk else [] values = chunk else: # Append data rows (skip header in subsequent chunks) values.extend(chunk[1:] if len(chunk) > 1 else []) # Break if we've reached max_rows if len(values) >= max_rows: values = values[:max_rows] break else: # Use traditional single fetch for smaller sheets await service._wait_for_rate_limit() range_name = f"'{sheet_title}'!A1:Z{max_rows}" service._record_api_call() result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() values = result.get('values', []) if not values: continue # Initialize change_info for this sheet iteration change_info = None # Get caching strategy for this sheet cache_strategy = service._get_cache_strategy(cursor, spreadsheet_id, sheet_title) # Handle different cache strategies if cache_strategy["recommended_action"] == "use_cache": synced_sheets.append({ "sheet_name": sheet_title, "table_name": safe_name, "rows": cache_strategy["cache_status"].get("expected", len(values) - 1), "status": "cached", "cache_age_minutes": cache_strategy["cache_status"]["age_minutes"], "last_sync": cache_strategy["cache_status"].get("last_sync") }) continue elif cache_strategy["recommended_action"] == "wait_for_debounce": synced_sheets.append({ "sheet_name": sheet_title, "table_name": safe_name, "status": "debounced", "message": f"Waiting {service.debounce_seconds}s for changes to settle" }) continue elif cache_strategy["recommended_action"] == "change_check": # Check for actual changes change_info = service._get_sheet_changes(cursor, spreadsheet_id, sheet_title, values) if not change_info["has_changes"] and not service._should_force_refresh(cache_strategy["cache_status"]): synced_sheets.append({ "sheet_name": sheet_title, "table_name": safe_name, "rows": len(values) - 1, "status": "no_changes", "cache_status": cache_strategy["cache_status"]["reason"], "last_sync": change_info.get("last_sync") }) continue # If we reach here, we need to perform a full sync # Ensure change_info is set for full sync scenarios if change_info is None: change_info = service._get_sheet_changes(cursor, spreadsheet_id, sheet_title, values) # Create table headers = values[0] if values else [] safe_headers = [re.sub(r'[^a-zA-Z0-9_]', '_', h.lower()) for h in headers] # Drop existing table cursor.execute(f"DROP TABLE IF EXISTS {safe_name}") # Create new table with row ID and optimized column types columns = 'row_id INTEGER PRIMARY KEY, ' + ', '.join([f"{h} TEXT" for h in safe_headers]) cursor.execute(f"CREATE TABLE {safe_name} ({columns})") # Prepare data for bulk insert data_rows = values[1:] if len(values) > 1 else [] row_count = len(data_rows) if row_count > 0: # Use bulk insert with executemany for performance bulk_data = [] for idx, row in enumerate(data_rows, 1): # Pad row to match headers padded_row = row + [''] * (len(headers) - len(row)) bulk_data.append([idx] + padded_row) # Insert in batches for optimal performance batch_size = 10000 placeholders = ', '.join(['?' for _ in range(len(headers) + 1)]) for i in range(0, len(bulk_data), batch_size): batch = bulk_data[i:i + batch_size] cursor.executemany(f"INSERT INTO {safe_name} VALUES ({placeholders})", batch) # Log progress for large datasets if row_count > 10000: progress = min(i + batch_size, row_count) logger.info(f"Inserted {progress}/{row_count} rows into {safe_name}") total_rows += row_count # Calculate content hash efficiently if use_chunking and row_count > 100000: # Use sampling for very large datasets sample_size = min(10000, row_count) sample_indices = range(0, row_count, row_count // sample_size) sample_values = [values[0]] + [values[i+1] for i in sample_indices if i+1 < len(values)] content_hash = service._calculate_content_hash(sample_values) else: content_hash = service._calculate_content_hash(values) # Add indexes after bulk insert for better performance if row_count > 10000: # Create indexes on commonly queried columns (first few columns often used) for i, header in enumerate(safe_headers[:3]): try: cursor.execute(f"CREATE INDEX idx_{safe_name}_{header} ON {safe_name}({header})") except: pass # Index creation might fail for some columns # Update metadata cursor.execute(""" INSERT OR REPLACE INTO _sheet_metadata (spreadsheet_id, spreadsheet_title, sheet_name, table_name, row_count, column_count, content_hash) VALUES (?, ?, ?, ?, ?, ?, ?) """, (spreadsheet_id, title, sheet_title, safe_name, row_count, len(headers), content_hash)) synced_sheets.append({ "sheet_name": sheet_title, "table_name": safe_name, "rows": row_count, "columns": safe_headers, "status": "synced", "changes": change_info["changes"] }) # Memory cleanup for large datasets if row_count > 50000: del values if 'bulk_data' in locals(): del bulk_data gc.collect() logger.info(f"Memory cleanup performed after syncing {sheet_title}") conn.commit() conn.close() result = { "status": "success", "spreadsheet": title, "sheets_synced": len(synced_sheets), "total_rows": total_rows, "sheets": synced_sheets, "note": f"Limited to {max_rows} rows per sheet" } except Exception as e: result = {"error": str(e)} finally: # Clean up resources if conn: try: conn.close() except: pass service.cleanup() return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "query_sheets": # Create fresh service instance for this tool call service = GoogleSheetsService() query = arguments.get("query", "") # Security: Block destructive SQL operations if query: dangerous_commands = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'PRAGMA'] query_upper = query.strip().upper() if any(cmd in query_upper for cmd in dangerous_commands): return [TextContent(type="text", text=json.dumps({ "error": "Only SELECT queries are allowed for data analysis", "details": "Destructive operations (DROP, DELETE, etc.) are not permitted", "tip": "Use SELECT statements to query your synced data safely" }))] try: with service._get_db_connection() as conn: cursor = conn.cursor() # Add automatic LIMIT if not present for safety query_lower = query.strip().lower() if 'limit' not in query_lower: # Add a reasonable default limit query = f"{query} LIMIT 10000" cursor.execute(query) columns = [desc[0] for desc in cursor.description] if cursor.description else [] # Stream results instead of fetchall for large datasets result_rows = [] total_fetched = 0 max_display_rows = 1000 # Maximum rows to return to client while True: # Fetch in batches batch = cursor.fetchmany(1000) if not batch: break total_fetched += len(batch) # Only keep rows up to display limit if len(result_rows) < max_display_rows: remaining = max_display_rows - len(result_rows) result_rows.extend(batch[:remaining]) result = { "columns": columns, "rows": result_rows, "total_rows": total_fetched, "limited": total_fetched > max_display_rows, "display_limit": max_display_rows } if total_fetched > max_display_rows: result["note"] = f"Showing first {max_display_rows} of {total_fetched} rows. Add LIMIT to your query to control results." except Exception as e: result = {"error": str(e)} return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "list_synced_sheets": # Create fresh service instance for this tool call service = GoogleSheetsService() try: with service._get_db_connection() as conn: cursor = conn.cursor() # Get synced sheets info cursor.execute(""" SELECT spreadsheet_title, sheet_name, table_name, row_count, sync_time FROM _sheet_metadata ORDER BY spreadsheet_title, sheet_name """) sheets = cursor.fetchall() # Get all tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_%'") tables = [t[0] for t in cursor.fetchall()] result = { "synced_sheets": [ { "spreadsheet": s[0], "sheet": s[1], "table": s[2], "rows": s[3], "synced_at": s[4] } for s in sheets ], "tables": tables } except Exception as e: result = {"error": str(e)} return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "analyze_sheets": # Create fresh service instance for this tool call service = GoogleSheetsService() question = arguments.get("question", "") try: with service._get_db_connection() as conn: cursor = conn.cursor() # Get all tables and their columns cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_%'") tables = cursor.fetchall() table_info = {} common_columns = {} for table in tables: table_name = table[0] cursor.execute(f"PRAGMA table_info({table_name})") columns = [(col[1], col[2]) for col in cursor.fetchall()] table_info[table_name] = columns # Track common columns for JOIN suggestions for col_name, col_type in columns: if col_name not in ['row_id']: if col_name not in common_columns: common_columns[col_name] = [] common_columns[col_name].append(table_name) # Find potential JOIN columns join_candidates = {col: tables for col, tables in common_columns.items() if len(tables) > 1} # Generate suggestions based on question suggestions = [] if any(word in question.lower() for word in ['combine', 'join', 'merge', 'together']): for col, tables in join_candidates.items(): if len(tables) == 2: suggestions.append(f"SELECT * FROM {tables[0]} JOIN {tables[1]} ON {tables[0]}.{col} = {tables[1]}.{col}") if 'all' in question.lower(): suggestions.append("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_%'") if any(word in question.lower() for word in ['count', 'how many']): for table in table_info: suggestions.append(f"SELECT COUNT(*) as total_rows FROM {table}") result = { "tables": list(table_info.keys()), "table_schemas": {name: [{"column": col[0], "type": col[1]} for col in cols] for name, cols in table_info.items()}, "common_columns": join_candidates, "suggested_queries": suggestions, "tip": "Use query_sheets to run any SQL query across your synced data" } except Exception as e: result = {"error": str(e)} return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "get_sheet_preview": # Create fresh service instance for this tool call service = GoogleSheetsService() url = arguments.get("url", "") sheet_name = arguments.get("sheet_name") rows = arguments.get("rows", 10) spreadsheet_id = service.extract_spreadsheet_id(url) if not spreadsheet_id: return [TextContent(type="text", text=json.dumps({"error": "Invalid URL"}))] creds = service.get_credentials() if not creds: error_msg = { "error": "Authentication failed", "details": "No valid credentials found. Please ensure token.json exists in the data/ directory.", "token_path": str(PROJECT_ROOT / 'data' / 'token.json'), "token_exists": (PROJECT_ROOT / 'data' / 'token.json').exists() } return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] try: sheets_service = build('sheets', 'v4', credentials=creds) # Get spreadsheet info spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() title = spreadsheet['properties']['title'] sheet_names = [s['properties']['title'] for s in spreadsheet['sheets']] # Get preview of specified or first sheet target_sheet = sheet_name if sheet_name and sheet_name in sheet_names else sheet_names[0] range_name = f"'{target_sheet}'!A1:Z{rows}" result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() values = result.get('values', []) preview = { "status": "success", "title": title, "available_sheets": sheet_names, "previewing_sheet": target_sheet, "preview_rows": len(values), "headers": values[0] if values else [], "data": values[1:] if len(values) > 1 else [] } except Exception as e: preview = {"error": str(e)} return [TextContent(type="text", text=json.dumps(preview, indent=2))] elif name == "check_sheet_changes": # Create fresh service instance for this tool call service = GoogleSheetsService() url = arguments.get("url") auto_sync = arguments.get("auto_sync", False) creds = service.get_credentials() if not creds: error_msg = { "error": "Authentication failed", "details": "No valid credentials found. Please ensure token.json exists in the data/ directory.", "token_path": str(PROJECT_ROOT / 'data' / 'token.json'), "token_exists": (PROJECT_ROOT / 'data' / 'token.json').exists() } return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] try: sheets_service = build('sheets', 'v4', credentials=creds) # Get sheets to check sheets_to_check = [] if url: # Check specific spreadsheet spreadsheet_id = service.extract_spreadsheet_id(url) if not spreadsheet_id: return [TextContent(type="text", text=json.dumps({"error": "Invalid URL"}))] with service._get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT spreadsheet_id, spreadsheet_title, sheet_name, table_name FROM _sheet_metadata WHERE spreadsheet_id = ? """, (spreadsheet_id,)) sheets_to_check = cursor.fetchall() else: # Check all synced sheets with service._get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT spreadsheet_id, spreadsheet_title, sheet_name, table_name FROM _sheet_metadata """) sheets_to_check = cursor.fetchall() if not sheets_to_check: return [TextContent(type="text", text=json.dumps({ "status": "no_sheets", "message": "No synced sheets found to check" }))] changes_found = [] synced_count = 0 for sheet_info in sheets_to_check: spreadsheet_id, spreadsheet_title, sheet_name, table_name = sheet_info try: # First check sheet metadata for quick change detection sheet_meta = sheets_service.spreadsheets().get( spreadsheetId=spreadsheet_id, ranges=[f"'{sheet_name}'"], fields="sheets(properties)" ).execute() # Get sheet dimensions for efficient checking sheet_props = sheet_meta['sheets'][0]['properties'] grid_props = sheet_props.get('gridProperties', {}) current_rows = grid_props.get('rowCount', 0) # For large sheets, use sampling instead of fetching all data if current_rows > 10000: # Sample approach: check first 100, last 100, and some random rows sample_ranges = [ f"'{sheet_name}'!A1:Z100", # First 100 rows f"'{sheet_name}'!A{max(1, current_rows-100)}:Z{current_rows}" # Last 100 rows ] batch_result = sheets_service.spreadsheets().values().batchGet( spreadsheetId=spreadsheet_id, ranges=sample_ranges ).execute() # Combine samples for hash calculation values = [] for range_data in batch_result.get('valueRanges', []): values.extend(range_data.get('values', [])) else: # For smaller sheets, get all data range_name = f"'{sheet_name}'!A1:Z{min(current_rows, 5000)}" result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() values = result.get('values', []) # Check for changes with service._get_db_connection() as temp_conn: temp_cursor = temp_conn.cursor() # Quick check: compare row counts first temp_cursor.execute(""" SELECT row_count FROM _sheet_metadata WHERE spreadsheet_id = ? AND sheet_name = ? """, (spreadsheet_id, sheet_name)) stored_row_count = temp_cursor.fetchone() if stored_row_count and stored_row_count[0] != current_rows: change_info = { "has_changes": True, "changes": [f"Row count changed: {stored_row_count[0]} → {current_rows}"] } else: change_info = service._get_sheet_changes(temp_cursor, spreadsheet_id, sheet_name, values) if change_info["has_changes"]: change_entry = { "spreadsheet": spreadsheet_title, "sheet": sheet_name, "table": table_name, "changes": change_info["changes"], "last_sync": change_info.get("last_sync"), "synced": False } # Auto-sync if requested if auto_sync: # Re-sync this sheet (simplified version) if values: headers = values[0] if values else [] safe_headers = [re.sub(r'[^a-zA-Z0-9_]', '_', h.lower()) for h in headers] with service._get_db_connection() as conn: cursor = conn.cursor() # Drop and recreate table cursor.execute(f"DROP TABLE IF EXISTS {table_name}") columns = 'row_id INTEGER PRIMARY KEY, ' + ', '.join([f"{h} TEXT" for h in safe_headers]) cursor.execute(f"CREATE TABLE {table_name} ({columns})") # Insert data for idx, row in enumerate(values[1:], 1): padded_row = row + [''] * (len(headers) - len(row)) placeholders = ', '.join(['?' for _ in range(len(headers) + 1)]) cursor.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", [idx] + padded_row) # Update metadata content_hash = service._calculate_content_hash(values) cursor.execute(""" UPDATE _sheet_metadata SET row_count = ?, column_count = ?, content_hash = ?, sync_time = CURRENT_TIMESTAMP WHERE spreadsheet_id = ? AND sheet_name = ? """, (len(values) - 1, len(headers), content_hash, spreadsheet_id, sheet_name)) conn.commit() change_entry["synced"] = True synced_count += 1 changes_found.append(change_entry) except Exception as e: changes_found.append({ "spreadsheet": spreadsheet_title, "sheet": sheet_name, "error": str(e) }) result = { "status": "success", "total_sheets_checked": len(sheets_to_check), "sheets_with_changes": len(changes_found), "auto_synced": synced_count if auto_sync else 0, "changes": changes_found } if not changes_found: result["message"] = "All sheets are up to date" except Exception as e: result = {"error": str(e)} return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "batch_sync_changes": # Create fresh service instance for this tool call service = GoogleSheetsService() max_sheets = arguments.get("max_sheets", 10) delay_between_sheets = arguments.get("delay_between_sheets", 1.0) creds = service.get_credentials() if not creds: error_msg = { "error": "Authentication failed", "details": "No valid credentials found. Please ensure token.json exists in the data/ directory.", "token_path": str(PROJECT_ROOT / 'data' / 'token.json'), "token_exists": (PROJECT_ROOT / 'data' / 'token.json').exists() } return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] try: # Get all sheets that need syncing with service._get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT spreadsheet_id, spreadsheet_title, sheet_name, table_name FROM _sheet_metadata ORDER BY sync_time ASC LIMIT ? """, (max_sheets,)) sheets_to_sync = cursor.fetchall() if not sheets_to_sync: return [TextContent(type="text", text=json.dumps({ "status": "no_sheets", "message": "No sheets found to sync" }))] sheets_service = build('sheets', 'v4', credentials=creds) synced_sheets = [] failed_sheets = [] skipped_sheets = [] for i, (spreadsheet_id, spreadsheet_title, sheet_name, table_name) in enumerate(sheets_to_sync): try: # Add delay between sheets if i > 0: await asyncio.sleep(delay_between_sheets) # Check if we should skip due to debouncing if service._should_debounce(spreadsheet_id): skipped_sheets.append({ "spreadsheet": spreadsheet_title, "sheet": sheet_name, "reason": "debounced" }) continue # Wait for rate limit await service._wait_for_rate_limit() # Get sheet dimensions first service._record_api_call() sheet_meta = sheets_service.spreadsheets().get( spreadsheetId=spreadsheet_id, ranges=[f"'{sheet_name}'"], fields="sheets(properties)" ).execute() # Get actual dimensions sheet_props = sheet_meta['sheets'][0]['properties'] grid_props = sheet_props.get('gridProperties', {}) total_rows = grid_props.get('rowCount', 0) total_cols = grid_props.get('columnCount', 0) # Determine column range dynamically last_col = service._number_to_column(min(total_cols, 26)) # Limit to Z for compatibility # Get current sheet data with dynamic range range_name = f"'{sheet_name}'!A1:{last_col}{min(total_rows, 10000)}" service._record_api_call() result = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() values = result.get('values', []) if not values: continue # Check for changes # Create temporary connection for change detection (this will be fixed in Phase 1b) with service._get_db_connection() as temp_conn: temp_cursor = temp_conn.cursor() change_info = service._get_sheet_changes(temp_cursor, spreadsheet_id, sheet_name, values) if not change_info["has_changes"]: skipped_sheets.append({ "spreadsheet": spreadsheet_title, "sheet": sheet_name, "reason": "no_changes" }) continue # Sync the sheet headers = values[0] if values else [] safe_headers = [re.sub(r'[^a-zA-Z0-9_]', '_', h.lower()) for h in headers] with service._get_db_connection() as conn: cursor = conn.cursor() # Drop and recreate table cursor.execute(f"DROP TABLE IF EXISTS {table_name}") columns = 'row_id INTEGER PRIMARY KEY, ' + ', '.join([f"{h} TEXT" for h in safe_headers]) cursor.execute(f"CREATE TABLE {table_name} ({columns})") # Insert data for idx, row in enumerate(values[1:], 1): padded_row = row + [''] * (len(headers) - len(row)) placeholders = ', '.join(['?' for _ in range(len(headers) + 1)]) cursor.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", [idx] + padded_row) # Update metadata content_hash = service._calculate_content_hash(values) cursor.execute(""" UPDATE _sheet_metadata SET row_count = ?, column_count = ?, content_hash = ?, sync_time = CURRENT_TIMESTAMP WHERE spreadsheet_id = ? AND sheet_name = ? """, (len(values) - 1, len(headers), content_hash, spreadsheet_id, sheet_name)) conn.commit() synced_sheets.append({ "spreadsheet": spreadsheet_title, "sheet": sheet_name, "rows": len(values) - 1, "changes": change_info["changes"] }) except Exception as e: failed_sheets.append({ "spreadsheet": spreadsheet_title, "sheet": sheet_name, "error": str(e) }) result = { "status": "completed", "total_processed": len(sheets_to_sync), "synced": len(synced_sheets), "skipped": len(skipped_sheets), "failed": len(failed_sheets), "synced_sheets": synced_sheets, "skipped_sheets": skipped_sheets, "failed_sheets": failed_sheets, "rate_limit_info": { "api_calls_in_last_minute": len(service.api_calls), "max_calls_per_minute": service.max_calls_per_minute } } except Exception as e: result = {"error": str(e)} return [TextContent(type="text", text=json.dumps(result, indent=2))] else: return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}))] async def main(): """Main entry point with proper cleanup""" try: async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, InitializationOptions( server_name="TNTM Google Sheets Analytics", server_version="1.0.0", capabilities=app.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) except KeyboardInterrupt: print("Server shutting down gracefully...", file=sys.stderr) except Exception as e: print(f"Server error: {e}", file=sys.stderr) raise finally: # Clean up any remaining database connections # With our new architecture, this is mostly handled by per-tool cleanup pass if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/talknerdytome-labs/google-sheet-analytics-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server