"""
Google Drive Integration Module
Full file management operations
"""
import io
import base64
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload
from src.google_auth import get_credentials
def get_drive_service():
creds = get_credentials()
return build("drive", "v3", credentials=creds)
def list_files(folder_id=None, query=None, max_results=50):
"""
List files in Google Drive
folder_id: specific folder to list (None for root)
query: additional search query
"""
try:
service = get_drive_service()
# Build query
q_parts = []
if folder_id:
q_parts.append(f"'{folder_id}' in parents")
if query:
q_parts.append(query)
q = " and ".join(q_parts) if q_parts else None
results = (
service.files()
.list(
q=q,
spaces="drive",
fields="files(id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink)",
orderBy="modifiedTime desc",
pageSize=max_results,
)
.execute()
)
files = results.get("files", [])
return {
"success": True,
"files": [
{
"id": f["id"],
"name": f["name"],
"mimeType": f.get("mimeType", ""),
"size": f.get("size", "0"),
"createdTime": f.get("createdTime", ""),
"modifiedTime": f.get("modifiedTime", ""),
"parents": f.get("parents", []),
"webViewLink": f.get("webViewLink", ""),
"iconLink": f.get("iconLink", ""),
}
for f in files
],
"total": len(files),
}
except Exception as error:
return {"success": False, "error": str(error)}
def create_folder(name, parent_folder_id=None):
"""Create a new folder in Google Drive"""
try:
service = get_drive_service()
file_metadata = {"name": name, "mimeType": "application/vnd.google-apps.folder"}
if parent_folder_id:
file_metadata["parents"] = [parent_folder_id]
folder = (
service.files()
.create(body=file_metadata, fields="id, name, webViewLink")
.execute()
)
return {
"success": True,
"folderId": folder.get("id"),
"name": folder.get("name"),
"webViewLink": folder.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def upload_file(name, content, mime_type="text/plain", folder_id=None):
"""
Upload a file to Google Drive
content: file content as string or base64-encoded string for binary
mime_type: MIME type of the file
folder_id: destination folder (None for root)
"""
try:
service = get_drive_service()
file_metadata = {"name": name}
if folder_id:
file_metadata["parents"] = [folder_id]
# Handle content - check if it's base64 encoded
if mime_type.startswith("image/") or mime_type.startswith("application/"):
try:
# Try to decode as base64
file_content = base64.b64decode(content)
except Exception:
# If not base64, treat as raw string
file_content = (
content.encode("utf-8") if isinstance(content, str) else content
)
else:
file_content = (
content.encode("utf-8") if isinstance(content, str) else content
)
media = MediaIoBaseUpload(
io.BytesIO(file_content), mimetype=mime_type, resumable=True
)
file = (
service.files()
.create(
body=file_metadata,
media_body=media,
fields="id, name, mimeType, size, webViewLink",
)
.execute()
)
return {
"success": True,
"fileId": file.get("id"),
"name": file.get("name"),
"mimeType": file.get("mimeType"),
"size": file.get("size"),
"webViewLink": file.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def download_file(file_id):
"""
Download a file from Google Drive
Returns file content (base64 encoded for binary files)
"""
try:
service = get_drive_service()
# First get file metadata to determine type
file_metadata = (
service.files().get(fileId=file_id, fields="name, mimeType").execute()
)
mime_type = file_metadata.get("mimeType", "")
file_name = file_metadata.get("name", "")
# Handle Google Workspace files (need to export)
export_mime_types = {
"application/vnd.google-apps.document": "application/pdf",
"application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.google-apps.presentation": "application/pdf",
"application/vnd.google-apps.drawing": "image/png",
}
if mime_type in export_mime_types:
# Export Google Workspace file
request = service.files().export_media(
fileId=file_id, mimeType=export_mime_types[mime_type]
)
export_mime = export_mime_types[mime_type]
else:
# Download regular file
request = service.files().get_media(fileId=file_id)
export_mime = mime_type
# Download content
file_content = io.BytesIO()
downloader = MediaIoBaseDownload(file_content, request)
done = False
while not done:
_, done = downloader.next_chunk()
file_content.seek(0)
content_bytes = file_content.read()
# Encode as base64 for binary content
if export_mime.startswith("text/"):
content = content_bytes.decode("utf-8", errors="ignore")
is_binary = False
else:
content = base64.b64encode(content_bytes).decode("utf-8")
is_binary = True
return {
"success": True,
"fileId": file_id,
"name": file_name,
"mimeType": export_mime,
"content": content,
"isBinary": is_binary,
"size": len(content_bytes),
}
except Exception as error:
return {"success": False, "error": str(error)}
def move_file(file_id, new_folder_id):
"""Move a file to a different folder"""
try:
service = get_drive_service()
# Get current parents
file = service.files().get(fileId=file_id, fields="parents").execute()
previous_parents = ",".join(file.get("parents", []))
# Move file
file = (
service.files()
.update(
fileId=file_id,
addParents=new_folder_id,
removeParents=previous_parents,
fields="id, name, parents, webViewLink",
)
.execute()
)
return {
"success": True,
"fileId": file.get("id"),
"name": file.get("name"),
"newParents": file.get("parents", []),
"webViewLink": file.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def copy_file(file_id, new_name=None, folder_id=None):
"""Copy a file"""
try:
service = get_drive_service()
body = {}
if new_name:
body["name"] = new_name
if folder_id:
body["parents"] = [folder_id]
copied_file = (
service.files()
.copy(
fileId=file_id,
body=body if body else None,
fields="id, name, webViewLink",
)
.execute()
)
return {
"success": True,
"fileId": copied_file.get("id"),
"name": copied_file.get("name"),
"webViewLink": copied_file.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def delete_file(file_id):
"""Delete a file or folder (moves to trash)"""
try:
service = get_drive_service()
# Move to trash instead of permanent delete
service.files().update(fileId=file_id, body={"trashed": True}).execute()
return {"success": True, "fileId": file_id, "message": "File moved to trash"}
except Exception as error:
return {"success": False, "error": str(error)}
def permanently_delete_file(file_id):
"""Permanently delete a file or folder (cannot be recovered)"""
try:
service = get_drive_service()
service.files().delete(fileId=file_id).execute()
return {
"success": True,
"fileId": file_id,
"message": "File permanently deleted",
}
except Exception as error:
return {"success": False, "error": str(error)}
def search_files(query, max_results=50):
"""
Search for files in Google Drive
query: search query (supports Drive query syntax)
Examples:
- "name contains 'report'"
- "mimeType='application/pdf'"
- "fullText contains 'budget'"
- "modifiedTime > '2024-01-01'"
"""
try:
service = get_drive_service()
results = (
service.files()
.list(
q=query,
spaces="drive",
fields="files(id, name, mimeType, size, createdTime, modifiedTime, webViewLink)",
orderBy="modifiedTime desc",
pageSize=max_results,
)
.execute()
)
files = results.get("files", [])
return {
"success": True,
"query": query,
"files": [
{
"id": f["id"],
"name": f["name"],
"mimeType": f.get("mimeType", ""),
"size": f.get("size", "0"),
"createdTime": f.get("createdTime", ""),
"modifiedTime": f.get("modifiedTime", ""),
"webViewLink": f.get("webViewLink", ""),
}
for f in files
],
"total": len(files),
}
except Exception as error:
return {"success": False, "error": str(error)}
def share_file(file_id, email, role="reader", send_notification=True):
"""
Share a file or folder with a user
role: 'reader', 'writer', 'commenter'
For folders, 'reader' allows viewing, 'writer' allows editing
"""
try:
service = get_drive_service()
permission = {"type": "user", "role": role, "emailAddress": email}
result = (
service.permissions()
.create(
fileId=file_id, body=permission, sendNotificationEmail=send_notification
)
.execute()
)
return {
"success": True,
"permissionId": result.get("id"),
"fileId": file_id,
"sharedWith": email,
"role": role,
}
except Exception as error:
return {"success": False, "error": str(error)}
def share_file_with_anyone(file_id, role="reader"):
"""
Share a file with anyone who has the link
role: 'reader' or 'writer'
"""
try:
service = get_drive_service()
permission = {"type": "anyone", "role": role}
result = service.permissions().create(fileId=file_id, body=permission).execute()
# Get the shareable link
file = service.files().get(fileId=file_id, fields="webViewLink").execute()
return {
"success": True,
"permissionId": result.get("id"),
"fileId": file_id,
"role": role,
"shareableLink": file.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def get_file_metadata(file_id):
"""Get detailed metadata about a file"""
try:
service = get_drive_service()
file = (
service.files()
.get(
fileId=file_id,
fields="id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink, thumbnailLink, description, starred, trashed, owners, permissions, capabilities",
)
.execute()
)
return {
"success": True,
"file": {
"id": file.get("id"),
"name": file.get("name"),
"mimeType": file.get("mimeType", ""),
"size": file.get("size", "0"),
"createdTime": file.get("createdTime", ""),
"modifiedTime": file.get("modifiedTime", ""),
"parents": file.get("parents", []),
"webViewLink": file.get("webViewLink", ""),
"iconLink": file.get("iconLink", ""),
"thumbnailLink": file.get("thumbnailLink", ""),
"description": file.get("description", ""),
"starred": file.get("starred", False),
"trashed": file.get("trashed", False),
"owners": [
{
"displayName": owner.get("displayName", ""),
"email": owner.get("emailAddress", ""),
}
for owner in file.get("owners", [])
],
"permissions": [
{
"id": perm.get("id", ""),
"type": perm.get("type", ""),
"role": perm.get("role", ""),
"email": perm.get("emailAddress", ""),
}
for perm in file.get("permissions", [])
],
"capabilities": file.get("capabilities", {}),
},
}
except Exception as error:
return {"success": False, "error": str(error)}
def rename_file(file_id, new_name):
"""Rename a file or folder"""
try:
service = get_drive_service()
file = (
service.files()
.update(
fileId=file_id, body={"name": new_name}, fields="id, name, webViewLink"
)
.execute()
)
return {
"success": True,
"fileId": file.get("id"),
"name": file.get("name"),
"webViewLink": file.get("webViewLink"),
}
except Exception as error:
return {"success": False, "error": str(error)}
def list_folders(parent_folder_id=None, max_results=50):
"""List only folders in Google Drive"""
try:
service = get_drive_service()
q = "mimeType='application/vnd.google-apps.folder'"
if parent_folder_id:
q += f" and '{parent_folder_id}' in parents"
results = (
service.files()
.list(
q=q,
spaces="drive",
fields="files(id, name, createdTime, modifiedTime, webViewLink)",
orderBy="name",
pageSize=max_results,
)
.execute()
)
folders = results.get("files", [])
return {
"success": True,
"folders": [
{
"id": f["id"],
"name": f["name"],
"createdTime": f.get("createdTime", ""),
"modifiedTime": f.get("modifiedTime", ""),
"webViewLink": f.get("webViewLink", ""),
}
for f in folders
],
"total": len(folders),
}
except Exception as error:
return {"success": False, "error": str(error)}
def get_storage_quota():
"""Get storage quota information"""
try:
service = get_drive_service()
about = service.about().get(fields="storageQuota, user").execute()
quota = about.get("storageQuota", {})
user = about.get("user", {})
return {
"success": True,
"user": {
"displayName": user.get("displayName", ""),
"email": user.get("emailAddress", ""),
},
"quota": {
"limit": quota.get("limit", "0"),
"usage": quota.get("usage", "0"),
"usageInDrive": quota.get("usageInDrive", "0"),
"usageInDriveTrash": quota.get("usageInDriveTrash", "0"),
},
}
except Exception as error:
return {"success": False, "error": str(error)}
# Helper function to export emails to Drive
def export_to_drive(file_name, content, folder_id=None, mime_type="text/plain"):
"""
Export content to a file in Google Drive
Useful for exporting email summaries, reports, etc.
"""
try:
result = upload_file(
name=file_name, content=content, mime_type=mime_type, folder_id=folder_id
)
if result["success"]:
return {
"success": True,
"fileId": result["fileId"],
"name": result["name"],
"webViewLink": result["webViewLink"],
"message": f"Successfully exported to {file_name}",
}
return result
except Exception as error:
return {"success": False, "error": str(error)}