Skip to main content
Glama
google_sheets.py15.1 kB
""" Google Sheets Integration Module Full CRUD operations with formatting support """ from googleapiclient.discovery import build from src.google_auth import get_credentials def get_sheets_service(): creds = get_credentials() return build('sheets', 'v4', credentials=creds) def get_drive_service(): creds = get_credentials() return build('drive', 'v3', credentials=creds) def create_spreadsheet(title, sheet_names=None): """Create a new spreadsheet with optional multiple sheets/tabs""" try: service = get_sheets_service() spreadsheet_body = { 'properties': {'title': title} } if sheet_names: spreadsheet_body['sheets'] = [ {'properties': {'title': name}} for name in sheet_names ] spreadsheet = service.spreadsheets().create( body=spreadsheet_body, fields='spreadsheetId,spreadsheetUrl,sheets' ).execute() return { 'success': True, 'spreadsheetId': spreadsheet.get('spreadsheetId'), 'spreadsheetUrl': spreadsheet.get('spreadsheetUrl'), 'sheets': [ { 'sheetId': sheet['properties']['sheetId'], 'title': sheet['properties']['title'] } for sheet in spreadsheet.get('sheets', []) ] } except Exception as error: return {'success': False, 'error': str(error)} def read_sheet_data(spreadsheet_id, range_name): """Read data from a Google Sheet range""" try: service = get_sheets_service() result = service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_name ).execute() values = result.get('values', []) return { 'success': True, 'data': values, 'range': result.get('range', range_name) } except Exception as error: return {'success': False, 'error': str(error)} def write_sheet_data(spreadsheet_id, range_name, values): """Write data to a Google Sheet range""" try: service = get_sheets_service() result = service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=range_name, valueInputOption='USER_ENTERED', body={'values': values} ).execute() return { 'success': True, 'updatedCells': result.get('updatedCells', 0), 'updatedRange': result.get('updatedRange', range_name) } except Exception as error: return {'success': False, 'error': str(error)} def append_sheet_data(spreadsheet_id, range_name, values): """Append data to a Google Sheet""" try: service = get_sheets_service() result = service.spreadsheets().values().append( spreadsheetId=spreadsheet_id, range=range_name, valueInputOption='USER_ENTERED', insertDataOption='INSERT_ROWS', body={'values': values} ).execute() updates = result.get('updates', {}) return { 'success': True, 'updatedCells': updates.get('updatedCells', 0), 'updatedRange': updates.get('updatedRange', range_name) } except Exception as error: return {'success': False, 'error': str(error)} def get_sheet_info(spreadsheet_id): """Get information about a spreadsheet""" try: service = get_sheets_service() result = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() properties = result.get('properties', {}) sheets_info = [] for sheet_info in result.get('sheets', []): sheet_props = sheet_info.get('properties', {}) grid_props = sheet_props.get('gridProperties', {}) sheets_info.append({ 'title': sheet_props.get('title', ''), 'sheetId': sheet_props.get('sheetId', ''), 'rowCount': grid_props.get('rowCount', 0), 'columnCount': grid_props.get('columnCount', 0) }) return { 'success': True, 'title': properties.get('title', ''), 'spreadsheetId': spreadsheet_id, 'sheets': sheets_info } except Exception as error: return {'success': False, 'error': str(error)} def create_sheet_tab(spreadsheet_id, tab_name): """Create a new tab/worksheet in an existing spreadsheet""" try: service = get_sheets_service() request_body = { 'requests': [{ 'addSheet': { 'properties': {'title': tab_name} } }] } response = service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body ).execute() reply = response.get('replies', [{}])[0] sheet_props = reply.get('addSheet', {}).get('properties', {}) return { 'success': True, 'sheetId': sheet_props.get('sheetId'), 'title': sheet_props.get('title') } except Exception as error: return {'success': False, 'error': str(error)} def list_spreadsheets(max_results=50): """List all spreadsheets in Google Drive""" try: drive_service = get_drive_service() results = drive_service.files().list( q="mimeType='application/vnd.google-apps.spreadsheet'", spaces='drive', fields='files(id, name, createdTime, modifiedTime, webViewLink)', orderBy='modifiedTime desc', pageSize=max_results ).execute() files = results.get('files', []) return { 'success': True, 'spreadsheets': [ { 'id': f['id'], 'name': f['name'], 'createdTime': f.get('createdTime', ''), 'modifiedTime': f.get('modifiedTime', ''), 'webViewLink': f.get('webViewLink', '') } for f in files ], 'total': len(files) } except Exception as error: return {'success': False, 'error': str(error)} def format_cells(spreadsheet_id, range_name, format_options): """ Format cells in a spreadsheet format_options can include: - bold: bool - italic: bool - fontSize: int - foregroundColor: {red, green, blue} (0-1 values) - backgroundColor: {red, green, blue} (0-1 values) - horizontalAlignment: 'LEFT', 'CENTER', 'RIGHT' - verticalAlignment: 'TOP', 'MIDDLE', 'BOTTOM' - wrapStrategy: 'OVERFLOW_CELL', 'CLIP', 'WRAP' - borders: {top, bottom, left, right} with {style, color} """ try: service = get_sheets_service() # First get sheet info to resolve range to grid coordinates sheet_info = service.spreadsheets().get( spreadsheetId=spreadsheet_id, ranges=[range_name], includeGridData=False ).execute() # Parse range to get sheet ID and grid range sheets = sheet_info.get('sheets', []) if not sheets: return {'success': False, 'error': 'Sheet not found'} sheet_id = sheets[0]['properties']['sheetId'] # Parse range string (e.g., "Sheet1!A1:B5" or "A1:B5") grid_range = _parse_range(range_name, sheet_id) # Build format request cell_format = {} # Text format text_format = {} if 'bold' in format_options: text_format['bold'] = format_options['bold'] if 'italic' in format_options: text_format['italic'] = format_options['italic'] if 'fontSize' in format_options: text_format['fontSize'] = format_options['fontSize'] if 'foregroundColor' in format_options: text_format['foregroundColor'] = format_options['foregroundColor'] if text_format: cell_format['textFormat'] = text_format # Background color if 'backgroundColor' in format_options: cell_format['backgroundColor'] = format_options['backgroundColor'] # Alignment if 'horizontalAlignment' in format_options: cell_format['horizontalAlignment'] = format_options['horizontalAlignment'] if 'verticalAlignment' in format_options: cell_format['verticalAlignment'] = format_options['verticalAlignment'] # Wrap strategy if 'wrapStrategy' in format_options: cell_format['wrapStrategy'] = format_options['wrapStrategy'] # Build the request requests = [{ 'repeatCell': { 'range': grid_range, 'cell': {'userEnteredFormat': cell_format}, 'fields': 'userEnteredFormat' } }] # Add borders if specified if 'borders' in format_options: border_request = _build_border_request(grid_range, format_options['borders']) if border_request: requests.append(border_request) response = service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body={'requests': requests} ).execute() return { 'success': True, 'updatedRange': range_name, 'appliedFormat': format_options } except Exception as error: return {'success': False, 'error': str(error)} def _parse_range(range_str, sheet_id): """Parse a range string like 'Sheet1!A1:B5' into grid coordinates""" # Remove sheet name if present if '!' in range_str: range_str = range_str.split('!')[1] # Split into start and end cells if ':' in range_str: start, end = range_str.split(':') else: start = end = range_str start_col, start_row = _parse_cell(start) end_col, end_row = _parse_cell(end) return { 'sheetId': sheet_id, 'startRowIndex': start_row - 1, 'endRowIndex': end_row, 'startColumnIndex': start_col, 'endColumnIndex': end_col + 1 } def _parse_cell(cell_str): """Parse a cell reference like 'A1' into (column_index, row_number)""" col_str = '' row_str = '' for char in cell_str: if char.isalpha(): col_str += char.upper() else: row_str += char # Convert column letters to index (A=0, B=1, ..., AA=26, etc.) col_index = 0 for char in col_str: col_index = col_index * 26 + (ord(char) - ord('A') + 1) col_index -= 1 row_num = int(row_str) if row_str else 1 return col_index, row_num def _build_border_request(grid_range, borders): """Build a border update request""" border_style = { 'style': 'SOLID', 'color': {'red': 0, 'green': 0, 'blue': 0} } update_borders = {} for side in ['top', 'bottom', 'left', 'right']: if side in borders: side_config = borders[side] if isinstance(side_config, bool) and side_config: update_borders[side] = border_style elif isinstance(side_config, dict): update_borders[side] = { 'style': side_config.get('style', 'SOLID'), 'color': side_config.get('color', {'red': 0, 'green': 0, 'blue': 0}) } if update_borders: return { 'updateBorders': { 'range': grid_range, **update_borders } } return None def add_formula(spreadsheet_id, cell, formula): """Add a formula to a specific cell""" try: service = get_sheets_service() result = service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=cell, valueInputOption='USER_ENTERED', body={'values': [[formula]]} ).execute() return { 'success': True, 'updatedCell': cell, 'formula': formula } except Exception as error: return {'success': False, 'error': str(error)} def share_spreadsheet(spreadsheet_id, email, role='reader'): """ Share a spreadsheet with a user role: 'reader', 'writer', 'commenter' """ try: drive_service = get_drive_service() permission = { 'type': 'user', 'role': role, 'emailAddress': email } result = drive_service.permissions().create( fileId=spreadsheet_id, body=permission, sendNotificationEmail=True ).execute() return { 'success': True, 'permissionId': result.get('id'), 'sharedWith': email, 'role': role } except Exception as error: return {'success': False, 'error': str(error)} # Helper function for creating job tracker def create_job_tracker(title="Job Applications Tracker"): """Create a job tracker spreadsheet with predefined columns""" try: # Create spreadsheet result = create_spreadsheet(title, ['Applications', 'Interviews', 'Offers']) if not result['success']: return result spreadsheet_id = result['spreadsheetId'] # Add headers to Applications sheet headers = [ ['Company', 'Position', 'Location', 'Salary Range', 'Date Applied', 'Status', 'Contact', 'Email', 'Notes', 'Follow-up Date'] ] write_sheet_data(spreadsheet_id, 'Applications!A1:J1', headers) # Add headers to Interviews sheet interview_headers = [ ['Company', 'Position', 'Interview Date', 'Interview Type', 'Interviewer', 'Status', 'Notes', 'Next Steps'] ] write_sheet_data(spreadsheet_id, 'Interviews!A1:H1', interview_headers) # Add headers to Offers sheet offer_headers = [ ['Company', 'Position', 'Base Salary', 'Bonus', 'Benefits', 'Start Date', 'Deadline', 'Status', 'Notes'] ] write_sheet_data(spreadsheet_id, 'Offers!A1:I1', offer_headers) # Format headers (bold) format_cells(spreadsheet_id, 'Applications!A1:J1', {'bold': True, 'backgroundColor': {'red': 0.9, 'green': 0.9, 'blue': 0.9}}) format_cells(spreadsheet_id, 'Interviews!A1:H1', {'bold': True, 'backgroundColor': {'red': 0.9, 'green': 0.9, 'blue': 0.9}}) format_cells(spreadsheet_id, 'Offers!A1:I1', {'bold': True, 'backgroundColor': {'red': 0.9, 'green': 0.9, 'blue': 0.9}}) return { 'success': True, 'spreadsheetId': spreadsheet_id, 'spreadsheetUrl': result['spreadsheetUrl'], 'message': 'Job tracker created with Applications, Interviews, and Offers sheets' } except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server