"""
Google Docs Integration Module
Create, read, update, and format Google Documents
"""
from googleapiclient.discovery import build
from src.google_auth import get_credentials
def get_docs_service():
creds = get_credentials()
return build("docs", "v1", credentials=creds)
def get_drive_service():
creds = get_credentials()
return build("drive", "v3", credentials=creds)
def create_document(title, initial_content=None):
"""Create a new Google Document with optional initial content"""
try:
service = get_docs_service()
# Create empty document
document = service.documents().create(body={"title": title}).execute()
document_id = document.get("documentId")
# Add initial content if provided
if initial_content:
requests = [
{"insertText": {"location": {"index": 1}, "text": initial_content}}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"documentId": document_id,
"title": document.get("title"),
"documentUrl": f"https://docs.google.com/document/d/{document_id}/edit",
}
except Exception as error:
return {"success": False, "error": str(error)}
def read_document(document_id):
"""Read the content of a Google Document"""
try:
service = get_docs_service()
document = service.documents().get(documentId=document_id).execute()
# Extract text content from document body
content = document.get("body", {}).get("content", [])
text_content = _extract_text(content)
return {
"success": True,
"documentId": document_id,
"title": document.get("title", ""),
"content": text_content,
"revisionId": document.get("revisionId", ""),
}
except Exception as error:
return {"success": False, "error": str(error)}
def _extract_text(content):
"""Extract plain text from document content structure"""
text_parts = []
for element in content:
if "paragraph" in element:
paragraph = element["paragraph"]
for elem in paragraph.get("elements", []):
if "textRun" in elem:
text_parts.append(elem["textRun"].get("content", ""))
elif "table" in element:
# Extract text from tables
table = element["table"]
for row in table.get("tableRows", []):
for cell in row.get("tableCells", []):
cell_content = cell.get("content", [])
text_parts.append(_extract_text(cell_content))
return "".join(text_parts)
def append_to_document(document_id, content):
"""Append text to the end of a document"""
try:
service = get_docs_service()
# Get current document to find end index
document = service.documents().get(documentId=document_id).execute()
body_content = document.get("body", {}).get("content", [])
# Find the end index (last element's endIndex - 1)
end_index = 1
if body_content:
last_element = body_content[-1]
end_index = last_element.get("endIndex", 1) - 1
requests = [{"insertText": {"location": {"index": end_index}, "text": content}}]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"documentId": document_id,
"appendedLength": len(content),
}
except Exception as error:
return {"success": False, "error": str(error)}
def insert_text_at(document_id, text, index):
"""Insert text at a specific index in the document"""
try:
service = get_docs_service()
requests = [{"insertText": {"location": {"index": index}, "text": text}}]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"documentId": document_id,
"insertedAt": index,
"textLength": len(text),
}
except Exception as error:
return {"success": False, "error": str(error)}
def format_text(document_id, start_index, end_index, format_options):
"""
Apply formatting to a range of text
format_options can include:
- bold: bool
- italic: bool
- underline: bool
- strikethrough: bool
- fontSize: int (in points)
- fontFamily: str
- foregroundColor: {red, green, blue} (0-1 values)
- backgroundColor: {red, green, blue} (0-1 values)
- link: str (URL to link to)
"""
try:
service = get_docs_service()
text_style = {}
fields = []
if "bold" in format_options:
text_style["bold"] = format_options["bold"]
fields.append("bold")
if "italic" in format_options:
text_style["italic"] = format_options["italic"]
fields.append("italic")
if "underline" in format_options:
text_style["underline"] = format_options["underline"]
fields.append("underline")
if "strikethrough" in format_options:
text_style["strikethrough"] = format_options["strikethrough"]
fields.append("strikethrough")
if "fontSize" in format_options:
text_style["fontSize"] = {
"magnitude": format_options["fontSize"],
"unit": "PT",
}
fields.append("fontSize")
if "fontFamily" in format_options:
text_style["weightedFontFamily"] = {
"fontFamily": format_options["fontFamily"]
}
fields.append("weightedFontFamily")
if "foregroundColor" in format_options:
text_style["foregroundColor"] = {
"color": {"rgbColor": format_options["foregroundColor"]}
}
fields.append("foregroundColor")
if "backgroundColor" in format_options:
text_style["backgroundColor"] = {
"color": {"rgbColor": format_options["backgroundColor"]}
}
fields.append("backgroundColor")
if "link" in format_options:
text_style["link"] = {"url": format_options["link"]}
fields.append("link")
requests = [
{
"updateTextStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"textStyle": text_style,
"fields": ",".join(fields),
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"documentId": document_id,
"formattedRange": {"start": start_index, "end": end_index},
"appliedFormat": format_options,
}
except Exception as error:
return {"success": False, "error": str(error)}
def apply_paragraph_style(document_id, start_index, end_index, style_options):
"""
Apply paragraph styling
style_options can include:
- namedStyleType: 'HEADING_1', 'HEADING_2', 'HEADING_3', 'TITLE', 'SUBTITLE', 'NORMAL_TEXT'
- alignment: 'START', 'CENTER', 'END', 'JUSTIFIED'
- lineSpacing: float (100 = single spacing)
- spaceAbove: int (in points)
- spaceBelow: int (in points)
- indentFirstLine: int (in points)
- indentStart: int (in points)
"""
try:
service = get_docs_service()
paragraph_style = {}
fields = []
if "namedStyleType" in style_options:
paragraph_style["namedStyleType"] = style_options["namedStyleType"]
fields.append("namedStyleType")
if "alignment" in style_options:
paragraph_style["alignment"] = style_options["alignment"]
fields.append("alignment")
if "lineSpacing" in style_options:
paragraph_style["lineSpacing"] = style_options["lineSpacing"]
fields.append("lineSpacing")
if "spaceAbove" in style_options:
paragraph_style["spaceAbove"] = {
"magnitude": style_options["spaceAbove"],
"unit": "PT",
}
fields.append("spaceAbove")
if "spaceBelow" in style_options:
paragraph_style["spaceBelow"] = {
"magnitude": style_options["spaceBelow"],
"unit": "PT",
}
fields.append("spaceBelow")
requests = [
{
"updateParagraphStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"paragraphStyle": paragraph_style,
"fields": ",".join(fields),
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"documentId": document_id,
"styledRange": {"start": start_index, "end": end_index},
"appliedStyle": style_options,
}
except Exception as error:
return {"success": False, "error": str(error)}
def insert_table(document_id, rows, cols, data=None, index=None):
"""
Insert a table into the document
data: 2D array of cell values (optional)
index: position to insert (if None, appends to end)
"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get("body", {}).get("content", [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get("endIndex", 1) - 1
# Insert table
requests = [
{
"insertTable": {
"rows": rows,
"columns": cols,
"location": {"index": index},
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
# If data provided, populate table cells
if data:
# Re-fetch document to get table structure
document = service.documents().get(documentId=document_id).execute()
table_element = _find_table_at_index(document, index)
if table_element:
populate_requests = _create_table_populate_requests(table_element, data)
if populate_requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": populate_requests}
).execute()
return {
"success": True,
"documentId": document_id,
"tableSize": {"rows": rows, "cols": cols},
"insertedAt": index,
}
except Exception as error:
return {"success": False, "error": str(error)}
def _find_table_at_index(document, target_index):
"""Find a table near the target index"""
content = document.get("body", {}).get("content", [])
for element in content:
if "table" in element:
start = element.get("startIndex", 0)
if start >= target_index - 1:
return element
return None
def _create_table_populate_requests(table_element, data):
"""Create requests to populate table cells with data"""
requests = []
table = table_element.get("table", {})
table_rows = table.get("tableRows", [])
for row_idx, row in enumerate(table_rows):
if row_idx >= len(data):
break
cells = row.get("tableCells", [])
for col_idx, cell in enumerate(cells):
if col_idx >= len(data[row_idx]):
break
cell_content = cell.get("content", [])
if cell_content:
# Find the paragraph in the cell
for elem in cell_content:
if "paragraph" in elem:
start_index = elem.get("startIndex", 0)
text_value = str(data[row_idx][col_idx])
if text_value:
requests.append(
{
"insertText": {
"location": {"index": start_index},
"text": text_value,
}
}
)
break
# Reverse to insert from end to start (to avoid index shifts)
return list(reversed(requests))
def list_documents(max_results=50):
"""List all Google Docs in Drive"""
try:
drive_service = get_drive_service()
results = (
drive_service.files()
.list(
q="mimeType='application/vnd.google-apps.document'",
spaces="drive",
fields="files(id, name, createdTime, modifiedTime, webViewLink)",
orderBy="modifiedTime desc",
pageSize=max_results,
)
.execute()
)
files = results.get("files", [])
return {
"success": True,
"documents": [
{
"id": f["id"],
"name": f["name"],
"createdTime": f.get("createdTime", ""),
"modifiedTime": f.get("modifiedTime", ""),
"webViewLink": f.get("webViewLink", ""),
}
for f in files
],
"total": len(files),
}
except Exception as error:
return {"success": False, "error": str(error)}
def share_document(document_id, email, role="reader"):
"""
Share a document with a user
role: 'reader', 'writer', 'commenter'
"""
try:
drive_service = get_drive_service()
permission = {"type": "user", "role": role, "emailAddress": email}
result = (
drive_service.permissions()
.create(fileId=document_id, body=permission, sendNotificationEmail=True)
.execute()
)
return {
"success": True,
"permissionId": result.get("id"),
"sharedWith": email,
"role": role,
}
except Exception as error:
return {"success": False, "error": str(error)}
def insert_bullet_list(document_id, items, index=None):
"""Insert a bullet list at the specified position"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get("body", {}).get("content", [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get("endIndex", 1) - 1
# Insert all items as text first
text_to_insert = "\n".join(items) + "\n"
insert_request = {
"insertText": {"location": {"index": index}, "text": text_to_insert}
}
service.documents().batchUpdate(
documentId=document_id, body={"requests": [insert_request]}
).execute()
# Apply bullet formatting
end_index = index + len(text_to_insert)
bullet_request = {
"createParagraphBullets": {
"range": {"startIndex": index, "endIndex": end_index},
"bulletPreset": "BULLET_DISC_CIRCLE_SQUARE",
}
}
service.documents().batchUpdate(
documentId=document_id, body={"requests": [bullet_request]}
).execute()
return {
"success": True,
"documentId": document_id,
"itemCount": len(items),
"insertedAt": index,
}
except Exception as error:
return {"success": False, "error": str(error)}
def insert_numbered_list(document_id, items, index=None):
"""Insert a numbered list at the specified position"""
try:
service = get_docs_service()
# Get current document to find insert position
if index is None:
document = service.documents().get(documentId=document_id).execute()
body_content = document.get("body", {}).get("content", [])
index = 1
if body_content:
last_element = body_content[-1]
index = last_element.get("endIndex", 1) - 1
# Insert all items as text first
text_to_insert = "\n".join(items) + "\n"
insert_request = {
"insertText": {"location": {"index": index}, "text": text_to_insert}
}
service.documents().batchUpdate(
documentId=document_id, body={"requests": [insert_request]}
).execute()
# Apply numbered list formatting
end_index = index + len(text_to_insert)
numbered_request = {
"createParagraphBullets": {
"range": {"startIndex": index, "endIndex": end_index},
"bulletPreset": "NUMBERED_DECIMAL_ALPHA_ROMAN",
}
}
service.documents().batchUpdate(
documentId=document_id, body={"requests": [numbered_request]}
).execute()
return {
"success": True,
"documentId": document_id,
"itemCount": len(items),
"insertedAt": index,
}
except Exception as error:
return {"success": False, "error": str(error)}
# Helper function for creating reports
def create_daily_report(title=None, sections=None):
"""
Create a daily report document with predefined sections
sections: dict with keys like 'summary', 'tasks_completed', 'tasks_pending', 'notes'
"""
try:
from datetime import datetime
if title is None:
title = f"Daily Report - {datetime.now().strftime('%Y-%m-%d')}"
# Create document
result = create_document(title)
if not result["success"]:
return result
document_id = result["documentId"]
service = get_docs_service()
# Build content
content_parts = []
if sections is None:
sections = {
"summary": "",
"tasks_completed": [],
"tasks_pending": [],
"notes": "",
}
# Title
content_parts.append(f"{title}\n\n")
# Summary section
content_parts.append("Summary\n")
content_parts.append(f"{sections.get('summary', 'Enter summary here...')}\n\n")
# Tasks Completed
content_parts.append("Tasks Completed\n")
tasks_completed = sections.get("tasks_completed", [])
if tasks_completed:
for task in tasks_completed:
content_parts.append(f"- {task}\n")
else:
content_parts.append("- No tasks completed\n")
content_parts.append("\n")
# Tasks Pending
content_parts.append("Tasks Pending\n")
tasks_pending = sections.get("tasks_pending", [])
if tasks_pending:
for task in tasks_pending:
content_parts.append(f"- {task}\n")
else:
content_parts.append("- No pending tasks\n")
content_parts.append("\n")
# Notes
content_parts.append("Notes\n")
content_parts.append(f"{sections.get('notes', 'Additional notes...')}\n")
full_content = "".join(content_parts)
# Insert all content
requests = [{"insertText": {"location": {"index": 1}, "text": full_content}}]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
# Apply heading styles
# Title (first line)
apply_paragraph_style(
document_id, 1, len(title) + 1, {"namedStyleType": "TITLE"}
)
return {
"success": True,
"documentId": document_id,
"documentUrl": result["documentUrl"],
"title": title,
"message": "Daily report created with Summary, Tasks Completed, Tasks Pending, and Notes sections",
}
except Exception as error:
return {"success": False, "error": str(error)}