#!/usr/bin/env python3
"""
Simple Google Sheets MCP Server - Bridge between MCP clients and Google Sheets API
"""
import os
import sys
import logging
import json
from datetime import datetime, timezone
from pathlib import Path
from mcp.server.fastmcp import FastMCP
# Google API imports
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Configure logging to stderr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger("googlesheets-server")
# Initialize MCP server - NO PROMPT PARAMETER!
mcp = FastMCP("googlesheets")
# Configuration
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
GOOGLE_APPLICATION_CREDENTIALS = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
GOOGLE_CREDENTIALS_JSON = os.environ.get("GOOGLE_CREDENTIALS_JSON", "")
SERVICE_ACCOUNT_EMAIL = os.environ.get("SERVICE_ACCOUNT_EMAIL", "")
OAUTH_REDIRECT_URI = os.environ.get("OAUTH_REDIRECT_URI", "")
DRIVE_FOLDER_ID = os.environ.get("DRIVE_FOLDER_ID", "")
# Global service objects
_sheets_service = None
_drive_service = None
# === UTILITY FUNCTIONS ===
def get_credentials():
"""Get Google API credentials from service account or OAuth."""
creds = None
# Debug: Log environment variables (without sensitive data)
logger.info(f"GOOGLE_CREDENTIALS_JSON length: {len(GOOGLE_CREDENTIALS_JSON) if GOOGLE_CREDENTIALS_JSON else 0}")
logger.info(f"GOOGLE_APPLICATION_CREDENTIALS: {GOOGLE_APPLICATION_CREDENTIALS[:50] if GOOGLE_APPLICATION_CREDENTIALS else 'Not set'}...")
logger.info(f"SERVICE_ACCOUNT_EMAIL: {SERVICE_ACCOUNT_EMAIL if SERVICE_ACCOUNT_EMAIL else 'Not set'}")
# Try JSON credentials from environment variable first (for Docker MCP)
if GOOGLE_CREDENTIALS_JSON:
try:
creds_info = json.loads(GOOGLE_CREDENTIALS_JSON)
creds = service_account.Credentials.from_service_account_info(
creds_info, scopes=SCOPES)
logger.info("Using service account credentials from JSON env var")
return creds
except Exception as e:
logger.warning(f"Service account JSON auth failed: {e}")
# Try service account file
if GOOGLE_APPLICATION_CREDENTIALS and os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
try:
creds = service_account.Credentials.from_service_account_file(
GOOGLE_APPLICATION_CREDENTIALS, scopes=SCOPES)
logger.info("Using service account credentials from file")
return creds
except Exception as e:
logger.warning(f"Service account file auth failed: {e}")
# Try OAuth token
token_path = os.path.join(os.path.expanduser("~"), ".google_sheets_token.json")
if os.path.exists(token_path):
try:
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
if creds and creds.valid:
logger.info("Using OAuth credentials")
return creds
except Exception as e:
logger.warning(f"OAuth token load failed: {e}")
logger.error("No valid credentials found")
return None
def get_sheets_service():
"""Get or create Google Sheets service."""
global _sheets_service
if _sheets_service is None:
creds = get_credentials()
if creds:
_sheets_service = build('sheets', 'v4', credentials=creds)
return _sheets_service
def get_drive_service():
"""Get or create Google Drive service."""
global _drive_service
if _drive_service is None:
creds = get_credentials()
if creds:
_drive_service = build('drive', 'v3', credentials=creds)
return _drive_service
def format_error(e):
"""Format error message."""
if isinstance(e, HttpError):
return f"API Error {e.resp.status}: {e.error_details}"
return str(e)
# === MCP TOOLS ===
@mcp.tool()
async def list_spreadsheets(folder_id: str = "") -> str:
"""Lists spreadsheets in the configured Drive folder or accessible by the user."""
logger.info("Executing list_spreadsheets")
try:
drive = get_drive_service()
if not drive:
return "❌ Error: Unable to authenticate with Google API"
folder = folder_id.strip() or DRIVE_FOLDER_ID
query = "mimeType='application/vnd.google-apps.spreadsheet'"
if folder:
query += f" and '{folder}' in parents"
results = drive.files().list(
q=query,
pageSize=100,
fields="files(id, name)"
).execute()
files = results.get('files', [])
if not files:
return "📊 No spreadsheets found"
output = "📊 Spreadsheets:\n"
for f in files:
output += f"- {f['name']} (ID: {f['id']})\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def create_spreadsheet(title: str = "") -> str:
"""Creates a new spreadsheet with the specified title."""
logger.info(f"Executing create_spreadsheet with title={title}")
if not title.strip():
return "❌ Error: Title is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
spreadsheet = {
'properties': {
'title': title
}
}
result = sheets.spreadsheets().create(body=spreadsheet).execute()
return f"✅ Created spreadsheet: {result['properties']['title']}\nID: {result['spreadsheetId']}\nURL: https://docs.google.com/spreadsheets/d/{result['spreadsheetId']}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def get_sheet_data(spreadsheet_id: str = "", sheet: str = "", range_notation: str = "", include_grid_data: str = "false") -> str:
"""Reads data from a range in a sheet (A1 notation like 'A1:C10' or 'Sheet1!B2:D')."""
logger.info(f"Executing get_sheet_data with spreadsheet_id={spreadsheet_id}, sheet={sheet}, range={range_notation}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
full_range = f"{sheet}!{range_notation}" if range_notation.strip() else sheet
include_grid = include_grid_data.lower() == "true"
if include_grid:
result = sheets.spreadsheets().get(
spreadsheetId=spreadsheet_id,
ranges=[full_range],
includeGridData=True
).execute()
return f"✅ Grid data:\n{json.dumps(result.get('sheets', []), indent=2)}"
else:
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=full_range
).execute()
values = result.get('values', [])
if not values:
return "📊 No data found in range"
output = f"📊 Data from {full_range}:\n"
for row in values:
output += f"{', '.join(str(cell) for cell in row)}\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def get_sheet_formulas(spreadsheet_id: str = "", sheet: str = "", range_notation: str = "") -> str:
"""Reads formulas from a range in a sheet."""
logger.info(f"Executing get_sheet_formulas with spreadsheet_id={spreadsheet_id}, sheet={sheet}, range={range_notation}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
full_range = f"{sheet}!{range_notation}" if range_notation.strip() else sheet
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=full_range,
valueRenderOption='FORMULA'
).execute()
values = result.get('values', [])
if not values:
return "📊 No formulas found in range"
output = f"📊 Formulas from {full_range}:\n"
for row in values:
output += f"{', '.join(str(cell) for cell in row)}\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def update_cells(spreadsheet_id: str = "", sheet: str = "", range_notation: str = "", data: str = "") -> str:
"""Writes data to a specific range (overwrites existing data). Data should be JSON 2D array like [[1,2],[3,4]]."""
logger.info(f"Executing update_cells with spreadsheet_id={spreadsheet_id}, sheet={sheet}, range={range_notation}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not range_notation.strip():
return "❌ Error: range is required"
if not data.strip():
return "❌ Error: data is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse data as JSON
try:
values = json.loads(data)
except json.JSONDecodeError:
return "❌ Error: data must be valid JSON 2D array"
full_range = f"{sheet}!{range_notation}"
body = {
'values': values
}
result = sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=full_range,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Updated {result.get('updatedCells', 0)} cells in {full_range}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def batch_update_cells(spreadsheet_id: str = "", sheet: str = "", ranges: str = "") -> str:
"""Updates multiple ranges in one call. Ranges should be JSON object mapping range to 2D array like {\"A1:B2\":[[1,2],[3,4]],\"D5\":[[\"Hello\"]]}."""
logger.info(f"Executing batch_update_cells with spreadsheet_id={spreadsheet_id}, sheet={sheet}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not ranges.strip():
return "❌ Error: ranges is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse ranges as JSON
try:
ranges_dict = json.loads(ranges)
except json.JSONDecodeError:
return "❌ Error: ranges must be valid JSON object"
data = []
for range_key, values in ranges_dict.items():
data.append({
'range': f"{sheet}!{range_key}",
'values': values
})
body = {
'valueInputOption': 'USER_ENTERED',
'data': data
}
result = sheets.spreadsheets().values().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
return f"✅ Batch updated {result.get('totalUpdatedCells', 0)} cells across {len(data)} ranges"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def add_rows(spreadsheet_id: str = "", sheet: str = "", data: str = "") -> str:
"""Appends rows to the end of a sheet (after the last row with data). Data should be JSON 2D array."""
logger.info(f"Executing add_rows with spreadsheet_id={spreadsheet_id}, sheet={sheet}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not data.strip():
return "❌ Error: data is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse data as JSON
try:
values = json.loads(data)
except json.JSONDecodeError:
return "❌ Error: data must be valid JSON 2D array"
body = {
'values': values
}
result = sheets.spreadsheets().values().append(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Appended {result.get('updates', {}).get('updatedRows', 0)} rows to {sheet}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def list_sheets(spreadsheet_id: str = "") -> str:
"""Lists all sheet names within a spreadsheet."""
logger.info(f"Executing list_sheets with spreadsheet_id={spreadsheet_id}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
result = sheets.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheet_list = result.get('sheets', [])
if not sheet_list:
return "📊 No sheets found"
output = "📊 Sheets:\n"
for sheet in sheet_list:
props = sheet.get('properties', {})
output += f"- {props.get('title', 'Untitled')} (ID: {props.get('sheetId', 'N/A')})\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def create_sheet(spreadsheet_id: str = "", title: str = "") -> str:
"""Adds a new sheet (tab) to a spreadsheet."""
logger.info(f"Executing create_sheet with spreadsheet_id={spreadsheet_id}, title={title}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not title.strip():
return "❌ Error: title is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
body = {
'requests': [{
'addSheet': {
'properties': {
'title': title
}
}
}]
}
result = sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
new_sheet = result.get('replies', [{}])[0].get('addSheet', {}).get('properties', {})
return f"✅ Created sheet: {new_sheet.get('title', title)}\nSheet ID: {new_sheet.get('sheetId', 'N/A')}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def get_multiple_sheet_data(queries: str = "") -> str:
"""Fetches data from multiple ranges. Queries should be JSON array like [{\"spreadsheet_id\":\"abc\",\"sheet\":\"Sheet1\",\"range\":\"A1:B2\"}]."""
logger.info("Executing get_multiple_sheet_data")
if not queries.strip():
return "❌ Error: queries is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse queries as JSON
try:
query_list = json.loads(queries)
except json.JSONDecodeError:
return "❌ Error: queries must be valid JSON array"
results = []
for query in query_list:
sid = query.get('spreadsheet_id', '')
sheet = query.get('sheet', '')
range_notation = query.get('range', '')
if not sid or not sheet:
results.append({
'query': query,
'error': 'Missing spreadsheet_id or sheet'
})
continue
try:
full_range = f"{sheet}!{range_notation}" if range_notation else sheet
result = sheets.spreadsheets().values().get(
spreadsheetId=sid,
range=full_range
).execute()
results.append({
'query': query,
'data': result.get('values', [])
})
except Exception as e:
results.append({
'query': query,
'error': format_error(e)
})
return f"✅ Fetched {len(results)} ranges:\n{json.dumps(results, indent=2)}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def get_multiple_spreadsheet_summary(spreadsheet_ids: str = "", rows_to_fetch: str = "5") -> str:
"""Gets titles, sheet names, headers, and first few rows for multiple spreadsheets. spreadsheet_ids should be JSON array of IDs."""
logger.info("Executing get_multiple_spreadsheet_summary")
if not spreadsheet_ids.strip():
return "❌ Error: spreadsheet_ids is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse spreadsheet_ids as JSON
try:
id_list = json.loads(spreadsheet_ids)
except json.JSONDecodeError:
return "❌ Error: spreadsheet_ids must be valid JSON array"
rows = int(rows_to_fetch) if rows_to_fetch.strip() else 5
summaries = []
for sid in id_list:
try:
# Get spreadsheet metadata
result = sheets.spreadsheets().get(spreadsheetId=sid).execute()
title = result.get('properties', {}).get('title', 'Untitled')
sheet_list = result.get('sheets', [])
sheet_summaries = []
for sheet in sheet_list:
sheet_title = sheet.get('properties', {}).get('title', '')
# Get first N rows
data_result = sheets.spreadsheets().values().get(
spreadsheetId=sid,
range=f"{sheet_title}!A1:Z{rows}"
).execute()
values = data_result.get('values', [])
headers = values[0] if values else []
preview_rows = values[1:] if len(values) > 1 else []
sheet_summaries.append({
'sheet_name': sheet_title,
'headers': headers,
'preview_rows': preview_rows
})
summaries.append({
'spreadsheet_id': sid,
'title': title,
'sheets': sheet_summaries
})
except Exception as e:
summaries.append({
'spreadsheet_id': sid,
'error': format_error(e)
})
return f"✅ Summaries:\n{json.dumps(summaries, indent=2)}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def share_spreadsheet(spreadsheet_id: str = "", recipients: str = "", send_notification: str = "true") -> str:
"""Shares a spreadsheet with users. Recipients should be JSON array like [{\"email_address\":\"user@example.com\",\"role\":\"writer\"}]. Roles: reader, commenter, writer."""
logger.info(f"Executing share_spreadsheet with spreadsheet_id={spreadsheet_id}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not recipients.strip():
return "❌ Error: recipients is required"
try:
drive = get_drive_service()
if not drive:
return "❌ Error: Unable to authenticate with Google API"
# Parse recipients as JSON
try:
recipient_list = json.loads(recipients)
except json.JSONDecodeError:
return "❌ Error: recipients must be valid JSON array"
send_email = send_notification.lower() == "true"
successes = []
failures = []
for recipient in recipient_list:
email = recipient.get('email_address', '')
role = recipient.get('role', 'reader')
if not email:
failures.append({'recipient': recipient, 'error': 'Missing email_address'})
continue
try:
permission = {
'type': 'user',
'role': role,
'emailAddress': email
}
drive.permissions().create(
fileId=spreadsheet_id,
body=permission,
sendNotificationEmail=send_email
).execute()
successes.append(email)
except Exception as e:
failures.append({'email': email, 'error': format_error(e)})
output = f"✅ Shared with {len(successes)} users"
if successes:
output += f"\nSuccesses: {', '.join(successes)}"
if failures:
output += f"\nFailures: {json.dumps(failures, indent=2)}"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def add_columns(spreadsheet_id: str = "", sheet: str = "", num_columns: str = "1", position: str = "") -> str:
"""Adds columns to a sheet. Position is 1-based index after which to insert (omit to add at end)."""
logger.info(f"Executing add_columns with spreadsheet_id={spreadsheet_id}, sheet={sheet}, num_columns={num_columns}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Get sheet ID
result = sheets.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheet_id = None
for s in result.get('sheets', []):
if s.get('properties', {}).get('title') == sheet:
sheet_id = s.get('properties', {}).get('sheetId')
break
if sheet_id is None:
return f"❌ Error: Sheet '{sheet}' not found"
num_cols = int(num_columns) if num_columns.strip() else 1
request = {
'appendDimension' if not position.strip() else 'insertDimension': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'length': num_cols
}
}
if position.strip():
pos = int(position)
request['insertDimension']['range'] = {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': pos,
'endIndex': pos + num_cols
}
body = {'requests': [request]}
sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
return f"✅ Added {num_cols} column(s) to {sheet}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def copy_sheet(spreadsheet_id: str = "", sheet_id: str = "", new_title: str = "", destination_spreadsheet_id: str = "") -> str:
"""Duplicates a sheet within a spreadsheet or to another spreadsheet."""
logger.info(f"Executing copy_sheet with spreadsheet_id={spreadsheet_id}, sheet_id={sheet_id}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet_id.strip():
return "❌ Error: sheet_id is required"
if not new_title.strip():
return "❌ Error: new_title is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
sid = int(sheet_id)
dest_id = destination_spreadsheet_id.strip() or spreadsheet_id
body = {
'destinationSpreadsheetId': dest_id
}
result = sheets.spreadsheets().sheets().copyTo(
spreadsheetId=spreadsheet_id,
sheetId=sid,
body=body
).execute()
# Rename the copied sheet
copied_sheet_id = result.get('sheetId')
rename_body = {
'requests': [{
'updateSheetProperties': {
'properties': {
'sheetId': copied_sheet_id,
'title': new_title
},
'fields': 'title'
}
}]
}
sheets.spreadsheets().batchUpdate(
spreadsheetId=dest_id,
body=rename_body
).execute()
return f"✅ Copied sheet to {dest_id} as '{new_title}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def rename_sheet(spreadsheet_id: str = "", sheet_id: str = "", new_title: str = "") -> str:
"""Renames an existing sheet."""
logger.info(f"Executing rename_sheet with spreadsheet_id={spreadsheet_id}, sheet_id={sheet_id}, new_title={new_title}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet_id.strip():
return "❌ Error: sheet_id is required"
if not new_title.strip():
return "❌ Error: new_title is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
sid = int(sheet_id)
body = {
'requests': [{
'updateSheetProperties': {
'properties': {
'sheetId': sid,
'title': new_title
},
'fields': 'title'
}
}]
}
sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
return f"✅ Renamed sheet to '{new_title}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def add_conditional_formatting(spreadsheet_id: str = "", sheet: str = "", range_notation: str = "", rule_config: str = "") -> str:
"""Adds conditional formatting rule to a range. rule_config should be JSON with condition type, values, and format properties."""
logger.info(f"Executing add_conditional_formatting with spreadsheet_id={spreadsheet_id}, sheet={sheet}, range={range_notation}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not range_notation.strip():
return "❌ Error: range_notation is required"
if not rule_config.strip():
return "❌ Error: rule_config is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
try:
config = json.loads(rule_config)
except json.JSONDecodeError:
return "❌ Error: rule_config must be valid JSON object"
result = sheets.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheet_id = None
for s in result.get('sheets', []):
if s.get('properties', {}).get('title') == sheet:
sheet_id = s.get('properties', {}).get('sheetId')
break
if sheet_id is None:
return f"❌ Error: Sheet '{sheet}' not found"
full_range = f"{sheet}!{range_notation}"
range_parts = range_notation.split(':')
start_cell = range_parts[0] if range_parts else 'A1'
end_cell = range_parts[1] if len(range_parts) > 1 else start_cell
def col_to_index(col):
index = 0
for char in col:
if char.isalpha():
index = index * 26 + (ord(char.upper()) - ord('A') + 1)
return index - 1
def parse_cell(cell):
col = ''
row = ''
for char in cell:
if char.isalpha():
col += char
else:
row += char
return col_to_index(col), int(row) - 1 if row else 0
start_col, start_row = parse_cell(start_cell)
end_col, end_row = parse_cell(end_cell)
ranges_obj = [{
'sheetId': sheet_id,
'startRowIndex': start_row,
'endRowIndex': end_row + 1,
'startColumnIndex': start_col,
'endColumnIndex': end_col + 1
}]
rule = {'ranges': ranges_obj}
rule_type = config.get('type', 'boolean')
if rule_type == 'boolean':
boolean_rule = {}
condition = config.get('condition', {})
if condition:
boolean_rule['condition'] = condition
format_spec = config.get('format', {})
if format_spec:
boolean_rule['format'] = format_spec
rule['booleanRule'] = boolean_rule
elif rule_type == 'gradient':
gradient_rule = config.get('gradientRule', {})
if gradient_rule:
rule['gradientRule'] = gradient_rule
body = {
'requests': [{
'addConditionalFormatRule': {
'rule': rule,
'index': 0
}
}]
}
sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
return f"✅ Added conditional formatting rule to {full_range}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def update_conditional_formatting(spreadsheet_id: str = "", sheet_id: str = "", rule_index: str = "", rule_config: str = "", new_index: str = "") -> str:
"""Updates or moves an existing conditional formatting rule. Provide either rule_config to replace or new_index to move."""
logger.info(f"Executing update_conditional_formatting with spreadsheet_id={spreadsheet_id}, sheet_id={sheet_id}, rule_index={rule_index}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet_id.strip():
return "❌ Error: sheet_id is required"
if not rule_index.strip():
return "❌ Error: rule_index is required"
if not rule_config.strip() and not new_index.strip():
return "❌ Error: Either rule_config or new_index must be provided"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
sid = int(sheet_id)
idx = int(rule_index)
request_obj = {
'updateConditionalFormatRule': {
'sheetId': sid,
'index': idx
}
}
if rule_config.strip():
try:
config = json.loads(rule_config)
except json.JSONDecodeError:
return "❌ Error: rule_config must be valid JSON object"
ranges_obj = config.get('ranges', [])
rule = {'ranges': ranges_obj}
rule_type = config.get('type', 'boolean')
if rule_type == 'boolean':
boolean_rule = {}
condition = config.get('condition', {})
if condition:
boolean_rule['condition'] = condition
format_spec = config.get('format', {})
if format_spec:
boolean_rule['format'] = format_spec
rule['booleanRule'] = boolean_rule
elif rule_type == 'gradient':
gradient_rule = config.get('gradientRule', {})
if gradient_rule:
rule['gradientRule'] = gradient_rule
request_obj['updateConditionalFormatRule']['rule'] = rule
if new_index.strip():
request_obj['updateConditionalFormatRule']['newIndex'] = int(new_index)
body = {'requests': [request_obj]}
sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
return f"✅ Updated conditional formatting rule at index {idx}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
# === TABLE-LEVEL OPERATIONS ===
@mcp.tool()
async def list_tables(spreadsheet_id: str = "", sheet: str = "") -> str:
"""Lists all defined tables (named ranges or header-based logical tables) within a sheet or spreadsheet."""
logger.info(f"Executing list_tables with spreadsheet_id={spreadsheet_id}, sheet={sheet}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
result = sheets.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
named_ranges = result.get('namedRanges', [])
if not named_ranges:
return "📊 No tables (named ranges) found"
output = "📊 Tables:\n"
for nr in named_ranges:
name = nr.get('name', 'Untitled')
range_info = nr.get('range', {})
sheet_id = range_info.get('sheetId')
# Find sheet name
sheet_name = 'Unknown'
for s in result.get('sheets', []):
if s.get('properties', {}).get('sheetId') == sheet_id:
sheet_name = s.get('properties', {}).get('title', 'Unknown')
break
if sheet and sheet_name != sheet:
continue
output += f"- {name} (Sheet: {sheet_name})\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def create_table(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", headers: str = "", data: str = "") -> str:
"""Creates a new logical table with headers and optional initial data. Headers should be JSON array, data should be JSON 2D array."""
logger.info(f"Executing create_table with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not headers.strip():
return "❌ Error: headers is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse headers as JSON
try:
header_list = json.loads(headers)
except json.JSONDecodeError:
return "❌ Error: headers must be valid JSON array"
# Parse data if provided
data_list = []
if data.strip():
try:
data_list = json.loads(data)
except json.JSONDecodeError:
return "❌ Error: data must be valid JSON 2D array"
# Append header row and data
all_rows = [header_list] + data_list
body = {'values': all_rows}
result = sheets.spreadsheets().values().append(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
updated_range = result.get('updates', {}).get('updatedRange', '')
return f"✅ Created table '{table_name}' with {len(header_list)} columns and {len(data_list)} data rows\nRange: {updated_range}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def get_table_data(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", filters: str = "", limit: str = "", offset: str = "") -> str:
"""Reads all or part of a table by name. Filters should be JSON object like {\"column_name\":\"value\"}."""
logger.info(f"Executing get_table_data with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Get all data from sheet
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
values = result.get('values', [])
if not values:
return "📊 No data found in sheet"
headers = values[0]
data_rows = values[1:]
# Apply filters if provided
if filters.strip():
try:
filter_dict = json.loads(filters)
filtered_rows = []
for row in data_rows:
match = True
for col_name, col_value in filter_dict.items():
if col_name in headers:
col_idx = headers.index(col_name)
if col_idx < len(row) and str(row[col_idx]) != str(col_value):
match = False
break
if match:
filtered_rows.append(row)
data_rows = filtered_rows
except json.JSONDecodeError:
return "❌ Error: filters must be valid JSON object"
# Apply offset and limit
start = int(offset) if offset.strip() else 0
end = start + int(limit) if limit.strip() else len(data_rows)
data_rows = data_rows[start:end]
output = f"📊 Table '{table_name}' data ({len(data_rows)} rows):\n"
output += f"Headers: {', '.join(headers)}\n"
for row in data_rows:
output += f"{', '.join(str(cell) for cell in row)}\n"
return output
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def insert_table_rows(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", rows: str = "", position: str = "") -> str:
"""Inserts rows into a table. Rows should be JSON 2D array. Position is 1-based row index (excluding header, omit for end)."""
logger.info(f"Executing insert_table_rows with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not rows.strip():
return "❌ Error: rows is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse rows as JSON
try:
row_list = json.loads(rows)
except json.JSONDecodeError:
return "❌ Error: rows must be valid JSON 2D array"
# For simplicity, append to end (full implementation would handle position)
body = {'values': row_list}
result = sheets.spreadsheets().values().append(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Inserted {len(row_list)} row(s) into table '{table_name}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def update_table_rows(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", match_conditions: str = "", updates: str = "") -> str:
"""Updates rows in a table matching criteria. Both match_conditions and updates should be JSON objects like {\"column_name\":\"value\"}."""
logger.info(f"Executing update_table_rows with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not match_conditions.strip():
return "❌ Error: match_conditions is required"
if not updates.strip():
return "❌ Error: updates is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse JSON
try:
match_dict = json.loads(match_conditions)
update_dict = json.loads(updates)
except json.JSONDecodeError:
return "❌ Error: match_conditions and updates must be valid JSON objects"
# Get all data
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
values = result.get('values', [])
if not values:
return "📊 No data found in sheet"
headers = values[0]
data_rows = values[1:]
# Find and update matching rows
updated_count = 0
for i, row in enumerate(data_rows):
match = True
for col_name, col_value in match_dict.items():
if col_name in headers:
col_idx = headers.index(col_name)
if col_idx >= len(row) or str(row[col_idx]) != str(col_value):
match = False
break
if match:
for col_name, new_value in update_dict.items():
if col_name in headers:
col_idx = headers.index(col_name)
while len(row) <= col_idx:
row.append('')
row[col_idx] = new_value
updated_count += 1
# Write back all data
if updated_count > 0:
body = {'values': [headers] + data_rows}
sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Updated {updated_count} row(s) in table '{table_name}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def delete_table_rows(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", match_conditions: str = "") -> str:
"""Deletes rows in a table matching criteria. match_conditions should be JSON object like {\"column_name\":\"value\"}."""
logger.info(f"Executing delete_table_rows with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not match_conditions.strip():
return "❌ Error: match_conditions is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse JSON
try:
match_dict = json.loads(match_conditions)
except json.JSONDecodeError:
return "❌ Error: match_conditions must be valid JSON object"
# Get all data
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
values = result.get('values', [])
if not values:
return "📊 No data found in sheet"
headers = values[0]
data_rows = values[1:]
# Filter out matching rows
new_data_rows = []
deleted_count = 0
for row in data_rows:
match = True
for col_name, col_value in match_dict.items():
if col_name in headers:
col_idx = headers.index(col_name)
if col_idx >= len(row) or str(row[col_idx]) != str(col_value):
match = False
break
if match:
deleted_count += 1
else:
new_data_rows.append(row)
# Write back filtered data
if deleted_count > 0:
body = {'values': [headers] + new_data_rows}
sheets.spreadsheets().values().clear(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Deleted {deleted_count} row(s) from table '{table_name}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def add_table_columns(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", new_columns: str = "", position: str = "") -> str:
"""Adds new columns to an existing table. new_columns should be JSON array of column names."""
logger.info(f"Executing add_table_columns with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not new_columns.strip():
return "❌ Error: new_columns is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse new_columns as JSON
try:
col_list = json.loads(new_columns)
except json.JSONDecodeError:
return "❌ Error: new_columns must be valid JSON array"
# Get current headers
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=f"{sheet}!1:1"
).execute()
headers = result.get('values', [[]])[0]
# Add new columns to headers
if position.strip():
pos = int(position)
headers = headers[:pos] + col_list + headers[pos:]
else:
headers.extend(col_list)
# Update header row
body = {'values': [headers]}
sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=f"{sheet}!1:1",
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Added {len(col_list)} column(s) to table '{table_name}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def rename_table_column(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", old_name: str = "", new_name: str = "") -> str:
"""Renames a header in an existing table."""
logger.info(f"Executing rename_table_column with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not old_name.strip():
return "❌ Error: old_name is required"
if not new_name.strip():
return "❌ Error: new_name is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Get current headers
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=f"{sheet}!1:1"
).execute()
headers = result.get('values', [[]])[0]
if old_name not in headers:
return f"❌ Error: Column '{old_name}' not found"
# Rename column
idx = headers.index(old_name)
headers[idx] = new_name
# Update header row
body = {'values': [headers]}
sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=f"{sheet}!1:1",
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Renamed column '{old_name}' to '{new_name}' in table '{table_name}'"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def export_table_as_csv(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", include_headers: str = "true") -> str:
"""Exports a table to CSV format. Returns CSV data as string."""
logger.info(f"Executing export_table_as_csv with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Get all data
result = sheets.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
values = result.get('values', [])
if not values:
return "📊 No data found in sheet"
include_hdrs = include_headers.lower() == "true"
csv_lines = []
start_idx = 0 if include_hdrs else 1
for row in values[start_idx:]:
csv_line = ','.join(f'"{str(cell)}"' for cell in row)
csv_lines.append(csv_line)
csv_data = '\n'.join(csv_lines)
return f"✅ CSV export of table '{table_name}':\n{csv_data}"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
@mcp.tool()
async def import_csv_to_table(spreadsheet_id: str = "", sheet: str = "", table_name: str = "", csv_content: str = "", overwrite: str = "false") -> str:
"""Imports CSV content into a table. csv_content should be CSV formatted string."""
logger.info(f"Executing import_csv_to_table with spreadsheet_id={spreadsheet_id}, sheet={sheet}, table_name={table_name}")
if not spreadsheet_id.strip():
return "❌ Error: spreadsheet_id is required"
if not sheet.strip():
return "❌ Error: sheet is required"
if not table_name.strip():
return "❌ Error: table_name is required"
if not csv_content.strip():
return "❌ Error: csv_content is required"
try:
sheets = get_sheets_service()
if not sheets:
return "❌ Error: Unable to authenticate with Google API"
# Parse CSV content
import csv
from io import StringIO
csv_reader = csv.reader(StringIO(csv_content))
rows = list(csv_reader)
if not rows:
return "❌ Error: No rows in CSV content"
should_overwrite = overwrite.lower() == "true"
if should_overwrite:
# Clear existing data
sheets.spreadsheets().values().clear(
spreadsheetId=spreadsheet_id,
range=sheet
).execute()
# Write CSV data
body = {'values': rows}
if should_overwrite:
result = sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
else:
result = sheets.spreadsheets().values().append(
spreadsheetId=spreadsheet_id,
range=sheet,
valueInputOption='USER_ENTERED',
body=body
).execute()
return f"✅ Imported {len(rows)} rows into table '{table_name}' ({len(rows[0])} columns)"
except Exception as e:
logger.error(f"Error: {e}")
return f"❌ Error: {format_error(e)}"
# === SERVER STARTUP ===
if __name__ == "__main__":
logger.info("Starting Google Sheets MCP server...")
# Check credentials on startup
if not GOOGLE_APPLICATION_CREDENTIALS and not os.path.exists(os.path.expanduser("~/.google_sheets_token.json")):
logger.warning("No credentials found. Set GOOGLE_APPLICATION_CREDENTIALS or run OAuth flow.")
try:
mcp.run(transport='stdio')
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)