Skip to main content
Glama
google_docs.py21.4 kB
""" Google Docs Integration Module Create, read, update, and format Google Documents """ from googleapiclient.discovery import build from src.google_auth import get_credentials def get_docs_service(): creds = get_credentials() return build('docs', 'v1', credentials=creds) def get_drive_service(): creds = get_credentials() return build('drive', 'v3', credentials=creds) def create_document(title, initial_content=None): """Create a new Google Document with optional initial content""" try: service = get_docs_service() # Create empty document document = service.documents().create(body={'title': title}).execute() document_id = document.get('documentId') # Add initial content if provided if initial_content: requests = [{ 'insertText': { 'location': {'index': 1}, 'text': initial_content } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() return { 'success': True, 'documentId': document_id, 'title': document.get('title'), 'documentUrl': f"https://docs.google.com/document/d/{document_id}/edit" } except Exception as error: return {'success': False, 'error': str(error)} def read_document(document_id): """Read the content of a Google Document""" try: service = get_docs_service() document = service.documents().get(documentId=document_id).execute() # Extract text content from document body content = document.get('body', {}).get('content', []) text_content = _extract_text(content) return { 'success': True, 'documentId': document_id, 'title': document.get('title', ''), 'content': text_content, 'revisionId': document.get('revisionId', '') } except Exception as error: return {'success': False, 'error': str(error)} def _extract_text(content): """Extract plain text from document content structure""" text_parts = [] for element in content: if 'paragraph' in element: paragraph = element['paragraph'] for elem in paragraph.get('elements', []): if 'textRun' in elem: text_parts.append(elem['textRun'].get('content', '')) elif 'table' in element: # Extract text from tables table = element['table'] for row in table.get('tableRows', []): for cell in row.get('tableCells', []): cell_content = cell.get('content', []) text_parts.append(_extract_text(cell_content)) return ''.join(text_parts) def append_to_document(document_id, content): """Append text to the end of a document""" try: service = get_docs_service() # Get current document to find end index document = service.documents().get(documentId=document_id).execute() body_content = document.get('body', {}).get('content', []) # Find the end index (last element's endIndex - 1) end_index = 1 if body_content: last_element = body_content[-1] end_index = last_element.get('endIndex', 1) - 1 requests = [{ 'insertText': { 'location': {'index': end_index}, 'text': content } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() return { 'success': True, 'documentId': document_id, 'appendedLength': len(content) } except Exception as error: return {'success': False, 'error': str(error)} def insert_text_at(document_id, text, index): """Insert text at a specific index in the document""" try: service = get_docs_service() requests = [{ 'insertText': { 'location': {'index': index}, 'text': text } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() return { 'success': True, 'documentId': document_id, 'insertedAt': index, 'textLength': len(text) } except Exception as error: return {'success': False, 'error': str(error)} def format_text(document_id, start_index, end_index, format_options): """ Apply formatting to a range of text format_options can include: - bold: bool - italic: bool - underline: bool - strikethrough: bool - fontSize: int (in points) - fontFamily: str - foregroundColor: {red, green, blue} (0-1 values) - backgroundColor: {red, green, blue} (0-1 values) - link: str (URL to link to) """ try: service = get_docs_service() text_style = {} fields = [] if 'bold' in format_options: text_style['bold'] = format_options['bold'] fields.append('bold') if 'italic' in format_options: text_style['italic'] = format_options['italic'] fields.append('italic') if 'underline' in format_options: text_style['underline'] = format_options['underline'] fields.append('underline') if 'strikethrough' in format_options: text_style['strikethrough'] = format_options['strikethrough'] fields.append('strikethrough') if 'fontSize' in format_options: text_style['fontSize'] = { 'magnitude': format_options['fontSize'], 'unit': 'PT' } fields.append('fontSize') if 'fontFamily' in format_options: text_style['weightedFontFamily'] = { 'fontFamily': format_options['fontFamily'] } fields.append('weightedFontFamily') if 'foregroundColor' in format_options: text_style['foregroundColor'] = { 'color': {'rgbColor': format_options['foregroundColor']} } fields.append('foregroundColor') if 'backgroundColor' in format_options: text_style['backgroundColor'] = { 'color': {'rgbColor': format_options['backgroundColor']} } fields.append('backgroundColor') if 'link' in format_options: text_style['link'] = {'url': format_options['link']} fields.append('link') requests = [{ 'updateTextStyle': { 'range': { 'startIndex': start_index, 'endIndex': end_index }, 'textStyle': text_style, 'fields': ','.join(fields) } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() return { 'success': True, 'documentId': document_id, 'formattedRange': {'start': start_index, 'end': end_index}, 'appliedFormat': format_options } except Exception as error: return {'success': False, 'error': str(error)} def apply_paragraph_style(document_id, start_index, end_index, style_options): """ Apply paragraph styling style_options can include: - namedStyleType: 'HEADING_1', 'HEADING_2', 'HEADING_3', 'TITLE', 'SUBTITLE', 'NORMAL_TEXT' - alignment: 'START', 'CENTER', 'END', 'JUSTIFIED' - lineSpacing: float (100 = single spacing) - spaceAbove: int (in points) - spaceBelow: int (in points) - indentFirstLine: int (in points) - indentStart: int (in points) """ try: service = get_docs_service() paragraph_style = {} fields = [] if 'namedStyleType' in style_options: paragraph_style['namedStyleType'] = style_options['namedStyleType'] fields.append('namedStyleType') if 'alignment' in style_options: paragraph_style['alignment'] = style_options['alignment'] fields.append('alignment') if 'lineSpacing' in style_options: paragraph_style['lineSpacing'] = style_options['lineSpacing'] fields.append('lineSpacing') if 'spaceAbove' in style_options: paragraph_style['spaceAbove'] = { 'magnitude': style_options['spaceAbove'], 'unit': 'PT' } fields.append('spaceAbove') if 'spaceBelow' in style_options: paragraph_style['spaceBelow'] = { 'magnitude': style_options['spaceBelow'], 'unit': 'PT' } fields.append('spaceBelow') requests = [{ 'updateParagraphStyle': { 'range': { 'startIndex': start_index, 'endIndex': end_index }, 'paragraphStyle': paragraph_style, 'fields': ','.join(fields) } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() return { 'success': True, 'documentId': document_id, 'styledRange': {'start': start_index, 'end': end_index}, 'appliedStyle': style_options } except Exception as error: return {'success': False, 'error': str(error)} def insert_table(document_id, rows, cols, data=None, index=None): """ Insert a table into the document data: 2D array of cell values (optional) index: position to insert (if None, appends to end) """ try: service = get_docs_service() # Get current document to find insert position if index is None: document = service.documents().get(documentId=document_id).execute() body_content = document.get('body', {}).get('content', []) index = 1 if body_content: last_element = body_content[-1] index = last_element.get('endIndex', 1) - 1 # Insert table requests = [{ 'insertTable': { 'rows': rows, 'columns': cols, 'location': {'index': index} } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() # If data provided, populate table cells if data: # Re-fetch document to get table structure document = service.documents().get(documentId=document_id).execute() table_element = _find_table_at_index(document, index) if table_element: populate_requests = _create_table_populate_requests(table_element, data) if populate_requests: service.documents().batchUpdate( documentId=document_id, body={'requests': populate_requests} ).execute() return { 'success': True, 'documentId': document_id, 'tableSize': {'rows': rows, 'cols': cols}, 'insertedAt': index } except Exception as error: return {'success': False, 'error': str(error)} def _find_table_at_index(document, target_index): """Find a table near the target index""" content = document.get('body', {}).get('content', []) for element in content: if 'table' in element: start = element.get('startIndex', 0) if start >= target_index - 1: return element return None def _create_table_populate_requests(table_element, data): """Create requests to populate table cells with data""" requests = [] table = table_element.get('table', {}) table_rows = table.get('tableRows', []) for row_idx, row in enumerate(table_rows): if row_idx >= len(data): break cells = row.get('tableCells', []) for col_idx, cell in enumerate(cells): if col_idx >= len(data[row_idx]): break cell_content = cell.get('content', []) if cell_content: # Find the paragraph in the cell for elem in cell_content: if 'paragraph' in elem: start_index = elem.get('startIndex', 0) text_value = str(data[row_idx][col_idx]) if text_value: requests.append({ 'insertText': { 'location': {'index': start_index}, 'text': text_value } }) break # Reverse to insert from end to start (to avoid index shifts) return list(reversed(requests)) def list_documents(max_results=50): """List all Google Docs in Drive""" try: drive_service = get_drive_service() results = drive_service.files().list( q="mimeType='application/vnd.google-apps.document'", spaces='drive', fields='files(id, name, createdTime, modifiedTime, webViewLink)', orderBy='modifiedTime desc', pageSize=max_results ).execute() files = results.get('files', []) return { 'success': True, 'documents': [ { 'id': f['id'], 'name': f['name'], 'createdTime': f.get('createdTime', ''), 'modifiedTime': f.get('modifiedTime', ''), 'webViewLink': f.get('webViewLink', '') } for f in files ], 'total': len(files) } except Exception as error: return {'success': False, 'error': str(error)} def share_document(document_id, email, role='reader'): """ Share a document with a user role: 'reader', 'writer', 'commenter' """ try: drive_service = get_drive_service() permission = { 'type': 'user', 'role': role, 'emailAddress': email } result = drive_service.permissions().create( fileId=document_id, body=permission, sendNotificationEmail=True ).execute() return { 'success': True, 'permissionId': result.get('id'), 'sharedWith': email, 'role': role } except Exception as error: return {'success': False, 'error': str(error)} def insert_bullet_list(document_id, items, index=None): """Insert a bullet list at the specified position""" try: service = get_docs_service() # Get current document to find insert position if index is None: document = service.documents().get(documentId=document_id).execute() body_content = document.get('body', {}).get('content', []) index = 1 if body_content: last_element = body_content[-1] index = last_element.get('endIndex', 1) - 1 # Insert all items as text first text_to_insert = '\n'.join(items) + '\n' insert_request = { 'insertText': { 'location': {'index': index}, 'text': text_to_insert } } service.documents().batchUpdate( documentId=document_id, body={'requests': [insert_request]} ).execute() # Apply bullet formatting end_index = index + len(text_to_insert) bullet_request = { 'createParagraphBullets': { 'range': { 'startIndex': index, 'endIndex': end_index }, 'bulletPreset': 'BULLET_DISC_CIRCLE_SQUARE' } } service.documents().batchUpdate( documentId=document_id, body={'requests': [bullet_request]} ).execute() return { 'success': True, 'documentId': document_id, 'itemCount': len(items), 'insertedAt': index } except Exception as error: return {'success': False, 'error': str(error)} def insert_numbered_list(document_id, items, index=None): """Insert a numbered list at the specified position""" try: service = get_docs_service() # Get current document to find insert position if index is None: document = service.documents().get(documentId=document_id).execute() body_content = document.get('body', {}).get('content', []) index = 1 if body_content: last_element = body_content[-1] index = last_element.get('endIndex', 1) - 1 # Insert all items as text first text_to_insert = '\n'.join(items) + '\n' insert_request = { 'insertText': { 'location': {'index': index}, 'text': text_to_insert } } service.documents().batchUpdate( documentId=document_id, body={'requests': [insert_request]} ).execute() # Apply numbered list formatting end_index = index + len(text_to_insert) numbered_request = { 'createParagraphBullets': { 'range': { 'startIndex': index, 'endIndex': end_index }, 'bulletPreset': 'NUMBERED_DECIMAL_ALPHA_ROMAN' } } service.documents().batchUpdate( documentId=document_id, body={'requests': [numbered_request]} ).execute() return { 'success': True, 'documentId': document_id, 'itemCount': len(items), 'insertedAt': index } except Exception as error: return {'success': False, 'error': str(error)} # Helper function for creating reports def create_daily_report(title=None, sections=None): """ Create a daily report document with predefined sections sections: dict with keys like 'summary', 'tasks_completed', 'tasks_pending', 'notes' """ try: from datetime import datetime if title is None: title = f"Daily Report - {datetime.now().strftime('%Y-%m-%d')}" # Create document result = create_document(title) if not result['success']: return result document_id = result['documentId'] service = get_docs_service() # Build content content_parts = [] if sections is None: sections = { 'summary': '', 'tasks_completed': [], 'tasks_pending': [], 'notes': '' } # Title content_parts.append(f"{title}\n\n") # Summary section content_parts.append("Summary\n") content_parts.append(f"{sections.get('summary', 'Enter summary here...')}\n\n") # Tasks Completed content_parts.append("Tasks Completed\n") tasks_completed = sections.get('tasks_completed', []) if tasks_completed: for task in tasks_completed: content_parts.append(f"- {task}\n") else: content_parts.append("- No tasks completed\n") content_parts.append("\n") # Tasks Pending content_parts.append("Tasks Pending\n") tasks_pending = sections.get('tasks_pending', []) if tasks_pending: for task in tasks_pending: content_parts.append(f"- {task}\n") else: content_parts.append("- No pending tasks\n") content_parts.append("\n") # Notes content_parts.append("Notes\n") content_parts.append(f"{sections.get('notes', 'Additional notes...')}\n") full_content = ''.join(content_parts) # Insert all content requests = [{ 'insertText': { 'location': {'index': 1}, 'text': full_content } }] service.documents().batchUpdate( documentId=document_id, body={'requests': requests} ).execute() # Apply heading styles # Title (first line) apply_paragraph_style(document_id, 1, len(title) + 1, {'namedStyleType': 'TITLE'}) return { 'success': True, 'documentId': document_id, 'documentUrl': result['documentUrl'], 'title': title, 'message': 'Daily report created with Summary, Tasks Completed, Tasks Pending, and Notes sections' } except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server