"""
Google Docs Integration Module
Create, read, update, and format Google Documents
"""
from googleapiclient.discovery import build
from src.google_auth import get_credentials
def get_docs_service():
creds = get_credentials()
return build('docs', 'v1', credentials=creds)
def get_drive_service():
creds = get_credentials()
return build('drive', 'v3', credentials=creds)
def create_document(title, initial_content=None):
"""Create a new Google Document with optional initial content"""
try:
service = get_docs_service()
# Create empty document
document = service.documents().create(body={'title': title}).execute()
document_id = document.get('documentId')
# Add initial content if provided
if initial_content:
requests = [{
'insertText': {
'location': {'index': 1},
'text': initial_content
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
return {
'success': True,
'documentId': document_id,
'title': document.get('title'),
'documentUrl': f"https://docs.google.com/document/d/{document_id}/edit"
}
except Exception as error:
return {'success': False, 'error': str(error)}
def read_document(document_id):
"""Read the content of a Google Document"""
try:
service = get_docs_service()
document = service.documents().get(documentId=document_id).execute()
# Extract text content from document body
content = document.get('body', {}).get('content', [])
text_content = _extract_text(content)
return {
'success': True,
'documentId': document_id,
'title': document.get('title', ''),
'content': text_content,
'revisionId': document.get('revisionId', '')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def _extract_text(content):
"""Extract plain text from document content structure"""
text_parts = []
for element in content:
if 'paragraph' in element:
paragraph = element['paragraph']
for elem in paragraph.get('elements', []):
if 'textRun' in elem:
text_parts.append(elem['textRun'].get('content', ''))
elif 'table' in element:
# Extract text from tables
table = element['table']
for row in table.get('tableRows', []):
for cell in row.get('tableCells', []):
cell_content = cell.get('content', [])
text_parts.append(_extract_text(cell_content))
return ''.join(text_parts)
def append_to_document(document_id, content):
"""Append text to the end of a document"""
try:
service = get_docs_service()
# Get current document to find end index
document = service.documents().get(documentId=document_id).execute()
body_content = document.get('body', {}).get('content', [])
# Find the end index (last element's endIndex - 1)
end_index = 1
if body_content:
last_element = body_content[-1]
end_index = last_element.get('endIndex', 1) - 1
requests = [{
'insertText': {
'location': {'index': end_index},
'text': content
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
return {
'success': True,
'documentId': document_id,
'appendedLength': len(content)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def insert_text_at(document_id, text, index):
"""Insert text at a specific index in the document"""
try:
service = get_docs_service()
requests = [{
'insertText': {
'location': {'index': index},
'text': text
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
return {
'success': True,
'documentId': document_id,
'insertedAt': index,
'textLength': len(text)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def format_text(document_id, start_index, end_index, format_options):
"""
Apply formatting to a range of text
format_options can include:
- bold: bool
- italic: bool
- underline: bool
- strikethrough: bool
- fontSize: int (in points)
- fontFamily: str
- foregroundColor: {red, green, blue} (0-1 values)
- backgroundColor: {red, green, blue} (0-1 values)
- link: str (URL to link to)
"""
try:
service = get_docs_service()
text_style = {}
fields = []
if 'bold' in format_options:
text_style['bold'] = format_options['bold']
fields.append('bold')
if 'italic' in format_options:
text_style['italic'] = format_options['italic']
fields.append('italic')
if 'underline' in format_options:
text_style['underline'] = format_options['underline']
fields.append('underline')
if 'strikethrough' in format_options:
text_style['strikethrough'] = format_options['strikethrough']
fields.append('strikethrough')
if 'fontSize' in format_options:
text_style['fontSize'] = {
'magnitude': format_options['fontSize'],
'unit': 'PT'
}
fields.append('fontSize')
if 'fontFamily' in format_options:
text_style['weightedFontFamily'] = {
'fontFamily': format_options['fontFamily']
}
fields.append('weightedFontFamily')
if 'foregroundColor' in format_options:
text_style['foregroundColor'] = {
'color': {'rgbColor': format_options['foregroundColor']}
}
fields.append('foregroundColor')
if 'backgroundColor' in format_options:
text_style['backgroundColor'] = {
'color': {'rgbColor': format_options['backgroundColor']}
}
fields.append('backgroundColor')
if 'link' in format_options:
text_style['link'] = {'url': format_options['link']}
fields.append('link')
requests = [{
'updateTextStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'textStyle': text_style,
'fields': ','.join(fields)
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
return {
'success': True,
'documentId': document_id,
'formattedRange': {'start': start_index, 'end': end_index},
'appliedFormat': format_options
}
except Exception as error:
return {'success': False, 'error': str(error)}
def apply_paragraph_style(document_id, start_index, end_index, style_options):
"""
Apply paragraph styling
style_options can include:
- namedStyleType: 'HEADING_1', 'HEADING_2', 'HEADING_3', 'TITLE', 'SUBTITLE', 'NORMAL_TEXT'
- alignment: 'START', 'CENTER', 'END', 'JUSTIFIED'
- lineSpacing: float (100 = single spacing)
- spaceAbove: int (in points)
- spaceBelow: int (in points)
- indentFirstLine: int (in points)
- indentStart: int (in points)
"""
try:
service = get_docs_service()
paragraph_style = {}
fields = []
if 'namedStyleType' in style_options:
paragraph_style['namedStyleType'] = style_options['namedStyleType']
fields.append('namedStyleType')
if 'alignment' in style_options:
paragraph_style['alignment'] = style_options['alignment']
fields.append('alignment')
if 'lineSpacing' in style_options:
paragraph_style['lineSpacing'] = style_options['lineSpacing']
fields.append('lineSpacing')
if 'spaceAbove' in style_options:
paragraph_style['spaceAbove'] = {
'magnitude': style_options['spaceAbove'],
'unit': 'PT'
}
fields.append('spaceAbove')
if 'spaceBelow' in style_options:
paragraph_style['spaceBelow'] = {
'magnitude': style_options['spaceBelow'],
'unit': 'PT'
}
fields.append('spaceBelow')
requests = [{
'updateParagraphStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'paragraphStyle': paragraph_style,
'fields': ','.join(fields)
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
return {
'success': True,
'documentId': document_id,
'styledRange': {'start': start_index, 'end': end_index},
'appliedStyle': style_options
}
except Exception as error:
return {'success': False, 'error': str(error)}
def insert_table(document_id, rows, cols, data=None, index=None):
"""
Insert a table into the document
data: 2D array of cell values (optional)
index: position to insert (if None, appends to end)
"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get('body', {}).get('content', [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get('endIndex', 1) - 1
# Insert table
requests = [{
'insertTable': {
'rows': rows,
'columns': cols,
'location': {'index': index}
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
# If data provided, populate table cells
if data:
# Re-fetch document to get table structure
document = service.documents().get(documentId=document_id).execute()
table_element = _find_table_at_index(document, index)
if table_element:
populate_requests = _create_table_populate_requests(table_element, data)
if populate_requests:
service.documents().batchUpdate(
documentId=document_id,
body={'requests': populate_requests}
).execute()
return {
'success': True,
'documentId': document_id,
'tableSize': {'rows': rows, 'cols': cols},
'insertedAt': index
}
except Exception as error:
return {'success': False, 'error': str(error)}
def _find_table_at_index(document, target_index):
"""Find a table near the target index"""
content = document.get('body', {}).get('content', [])
for element in content:
if 'table' in element:
start = element.get('startIndex', 0)
if start >= target_index - 1:
return element
return None
def _create_table_populate_requests(table_element, data):
"""Create requests to populate table cells with data"""
requests = []
table = table_element.get('table', {})
table_rows = table.get('tableRows', [])
for row_idx, row in enumerate(table_rows):
if row_idx >= len(data):
break
cells = row.get('tableCells', [])
for col_idx, cell in enumerate(cells):
if col_idx >= len(data[row_idx]):
break
cell_content = cell.get('content', [])
if cell_content:
# Find the paragraph in the cell
for elem in cell_content:
if 'paragraph' in elem:
start_index = elem.get('startIndex', 0)
text_value = str(data[row_idx][col_idx])
if text_value:
requests.append({
'insertText': {
'location': {'index': start_index},
'text': text_value
}
})
break
# Reverse to insert from end to start (to avoid index shifts)
return list(reversed(requests))
def list_documents(max_results=50):
"""List all Google Docs in Drive"""
try:
drive_service = get_drive_service()
results = drive_service.files().list(
q="mimeType='application/vnd.google-apps.document'",
spaces='drive',
fields='files(id, name, createdTime, modifiedTime, webViewLink)',
orderBy='modifiedTime desc',
pageSize=max_results
).execute()
files = results.get('files', [])
return {
'success': True,
'documents': [
{
'id': f['id'],
'name': f['name'],
'createdTime': f.get('createdTime', ''),
'modifiedTime': f.get('modifiedTime', ''),
'webViewLink': f.get('webViewLink', '')
}
for f in files
],
'total': len(files)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def share_document(document_id, email, role='reader'):
"""
Share a document with a user
role: 'reader', 'writer', 'commenter'
"""
try:
drive_service = get_drive_service()
permission = {
'type': 'user',
'role': role,
'emailAddress': email
}
result = drive_service.permissions().create(
fileId=document_id,
body=permission,
sendNotificationEmail=True
).execute()
return {
'success': True,
'permissionId': result.get('id'),
'sharedWith': email,
'role': role
}
except Exception as error:
return {'success': False, 'error': str(error)}
def insert_bullet_list(document_id, items, index=None):
"""Insert a bullet list at the specified position"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get('body', {}).get('content', [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get('endIndex', 1) - 1
# Insert all items as text first
text_to_insert = '\n'.join(items) + '\n'
insert_request = {
'insertText': {
'location': {'index': index},
'text': text_to_insert
}
}
service.documents().batchUpdate(
documentId=document_id,
body={'requests': [insert_request]}
).execute()
# Apply bullet formatting
end_index = index + len(text_to_insert)
bullet_request = {
'createParagraphBullets': {
'range': {
'startIndex': index,
'endIndex': end_index
},
'bulletPreset': 'BULLET_DISC_CIRCLE_SQUARE'
}
}
service.documents().batchUpdate(
documentId=document_id,
body={'requests': [bullet_request]}
).execute()
return {
'success': True,
'documentId': document_id,
'itemCount': len(items),
'insertedAt': index
}
except Exception as error:
return {'success': False, 'error': str(error)}
def insert_numbered_list(document_id, items, index=None):
"""Insert a numbered list at the specified position"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get('body', {}).get('content', [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get('endIndex', 1) - 1
# Insert all items as text first
text_to_insert = '\n'.join(items) + '\n'
insert_request = {
'insertText': {
'location': {'index': index},
'text': text_to_insert
}
}
service.documents().batchUpdate(
documentId=document_id,
body={'requests': [insert_request]}
).execute()
# Apply numbered list formatting
end_index = index + len(text_to_insert)
numbered_request = {
'createParagraphBullets': {
'range': {
'startIndex': index,
'endIndex': end_index
},
'bulletPreset': 'NUMBERED_DECIMAL_ALPHA_ROMAN'
}
}
service.documents().batchUpdate(
documentId=document_id,
body={'requests': [numbered_request]}
).execute()
return {
'success': True,
'documentId': document_id,
'itemCount': len(items),
'insertedAt': index
}
except Exception as error:
return {'success': False, 'error': str(error)}
# Helper function for creating reports
def create_daily_report(title=None, sections=None):
"""
Create a daily report document with predefined sections
sections: dict with keys like 'summary', 'tasks_completed', 'tasks_pending', 'notes'
"""
try:
from datetime import datetime
if title is None:
title = f"Daily Report - {datetime.now().strftime('%Y-%m-%d')}"
# Create document
result = create_document(title)
if not result['success']:
return result
document_id = result['documentId']
service = get_docs_service()
# Build content
content_parts = []
if sections is None:
sections = {
'summary': '',
'tasks_completed': [],
'tasks_pending': [],
'notes': ''
}
# Title
content_parts.append(f"{title}\n\n")
# Summary section
content_parts.append("Summary\n")
content_parts.append(f"{sections.get('summary', 'Enter summary here...')}\n\n")
# Tasks Completed
content_parts.append("Tasks Completed\n")
tasks_completed = sections.get('tasks_completed', [])
if tasks_completed:
for task in tasks_completed:
content_parts.append(f"- {task}\n")
else:
content_parts.append("- No tasks completed\n")
content_parts.append("\n")
# Tasks Pending
content_parts.append("Tasks Pending\n")
tasks_pending = sections.get('tasks_pending', [])
if tasks_pending:
for task in tasks_pending:
content_parts.append(f"- {task}\n")
else:
content_parts.append("- No pending tasks\n")
content_parts.append("\n")
# Notes
content_parts.append("Notes\n")
content_parts.append(f"{sections.get('notes', 'Additional notes...')}\n")
full_content = ''.join(content_parts)
# Insert all content
requests = [{
'insertText': {
'location': {'index': 1},
'text': full_content
}
}]
service.documents().batchUpdate(
documentId=document_id,
body={'requests': requests}
).execute()
# Apply heading styles
# Title (first line)
apply_paragraph_style(document_id, 1, len(title) + 1, {'namedStyleType': 'TITLE'})
return {
'success': True,
'documentId': document_id,
'documentUrl': result['documentUrl'],
'title': title,
'message': 'Daily report created with Summary, Tasks Completed, Tasks Pending, and Notes sections'
}
except Exception as error:
return {'success': False, 'error': str(error)}