"""
Google Drive Integration Module
Full file management operations
"""
import io
import base64
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload
from src.google_auth import get_credentials
def get_drive_service():
creds = get_credentials()
return build('drive', 'v3', credentials=creds)
def list_files(folder_id=None, query=None, max_results=50):
"""
List files in Google Drive
folder_id: specific folder to list (None for root)
query: additional search query
"""
try:
service = get_drive_service()
# Build query
q_parts = []
if folder_id:
q_parts.append(f"'{folder_id}' in parents")
if query:
q_parts.append(query)
q = ' and '.join(q_parts) if q_parts else None
results = service.files().list(
q=q,
spaces='drive',
fields='files(id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink)',
orderBy='modifiedTime desc',
pageSize=max_results
).execute()
files = results.get('files', [])
return {
'success': True,
'files': [
{
'id': f['id'],
'name': f['name'],
'mimeType': f.get('mimeType', ''),
'size': f.get('size', '0'),
'createdTime': f.get('createdTime', ''),
'modifiedTime': f.get('modifiedTime', ''),
'parents': f.get('parents', []),
'webViewLink': f.get('webViewLink', ''),
'iconLink': f.get('iconLink', '')
}
for f in files
],
'total': len(files)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def create_folder(name, parent_folder_id=None):
"""Create a new folder in Google Drive"""
try:
service = get_drive_service()
file_metadata = {
'name': name,
'mimeType': 'application/vnd.google-apps.folder'
}
if parent_folder_id:
file_metadata['parents'] = [parent_folder_id]
folder = service.files().create(
body=file_metadata,
fields='id, name, webViewLink'
).execute()
return {
'success': True,
'folderId': folder.get('id'),
'name': folder.get('name'),
'webViewLink': folder.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def upload_file(name, content, mime_type='text/plain', folder_id=None):
"""
Upload a file to Google Drive
content: file content as string or base64-encoded string for binary
mime_type: MIME type of the file
folder_id: destination folder (None for root)
"""
try:
service = get_drive_service()
file_metadata = {'name': name}
if folder_id:
file_metadata['parents'] = [folder_id]
# Handle content - check if it's base64 encoded
if mime_type.startswith('image/') or mime_type.startswith('application/'):
try:
# Try to decode as base64
file_content = base64.b64decode(content)
except Exception:
# If not base64, treat as raw string
file_content = content.encode('utf-8') if isinstance(content, str) else content
else:
file_content = content.encode('utf-8') if isinstance(content, str) else content
media = MediaIoBaseUpload(
io.BytesIO(file_content),
mimetype=mime_type,
resumable=True
)
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id, name, mimeType, size, webViewLink'
).execute()
return {
'success': True,
'fileId': file.get('id'),
'name': file.get('name'),
'mimeType': file.get('mimeType'),
'size': file.get('size'),
'webViewLink': file.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def download_file(file_id):
"""
Download a file from Google Drive
Returns file content (base64 encoded for binary files)
"""
try:
service = get_drive_service()
# First get file metadata to determine type
file_metadata = service.files().get(
fileId=file_id,
fields='name, mimeType'
).execute()
mime_type = file_metadata.get('mimeType', '')
file_name = file_metadata.get('name', '')
# Handle Google Workspace files (need to export)
export_mime_types = {
'application/vnd.google-apps.document': 'application/pdf',
'application/vnd.google-apps.spreadsheet': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.google-apps.presentation': 'application/pdf',
'application/vnd.google-apps.drawing': 'image/png'
}
if mime_type in export_mime_types:
# Export Google Workspace file
request = service.files().export_media(
fileId=file_id,
mimeType=export_mime_types[mime_type]
)
export_mime = export_mime_types[mime_type]
else:
# Download regular file
request = service.files().get_media(fileId=file_id)
export_mime = mime_type
# Download content
file_content = io.BytesIO()
downloader = MediaIoBaseDownload(file_content, request)
done = False
while not done:
_, done = downloader.next_chunk()
file_content.seek(0)
content_bytes = file_content.read()
# Encode as base64 for binary content
if export_mime.startswith('text/'):
content = content_bytes.decode('utf-8', errors='ignore')
is_binary = False
else:
content = base64.b64encode(content_bytes).decode('utf-8')
is_binary = True
return {
'success': True,
'fileId': file_id,
'name': file_name,
'mimeType': export_mime,
'content': content,
'isBinary': is_binary,
'size': len(content_bytes)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def move_file(file_id, new_folder_id):
"""Move a file to a different folder"""
try:
service = get_drive_service()
# Get current parents
file = service.files().get(
fileId=file_id,
fields='parents'
).execute()
previous_parents = ','.join(file.get('parents', []))
# Move file
file = service.files().update(
fileId=file_id,
addParents=new_folder_id,
removeParents=previous_parents,
fields='id, name, parents, webViewLink'
).execute()
return {
'success': True,
'fileId': file.get('id'),
'name': file.get('name'),
'newParents': file.get('parents', []),
'webViewLink': file.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def copy_file(file_id, new_name=None, folder_id=None):
"""Copy a file"""
try:
service = get_drive_service()
body = {}
if new_name:
body['name'] = new_name
if folder_id:
body['parents'] = [folder_id]
copied_file = service.files().copy(
fileId=file_id,
body=body if body else None,
fields='id, name, webViewLink'
).execute()
return {
'success': True,
'fileId': copied_file.get('id'),
'name': copied_file.get('name'),
'webViewLink': copied_file.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def delete_file(file_id):
"""Delete a file or folder (moves to trash)"""
try:
service = get_drive_service()
# Move to trash instead of permanent delete
service.files().update(
fileId=file_id,
body={'trashed': True}
).execute()
return {
'success': True,
'fileId': file_id,
'message': 'File moved to trash'
}
except Exception as error:
return {'success': False, 'error': str(error)}
def permanently_delete_file(file_id):
"""Permanently delete a file or folder (cannot be recovered)"""
try:
service = get_drive_service()
service.files().delete(fileId=file_id).execute()
return {
'success': True,
'fileId': file_id,
'message': 'File permanently deleted'
}
except Exception as error:
return {'success': False, 'error': str(error)}
def search_files(query, max_results=50):
"""
Search for files in Google Drive
query: search query (supports Drive query syntax)
Examples:
- "name contains 'report'"
- "mimeType='application/pdf'"
- "fullText contains 'budget'"
- "modifiedTime > '2024-01-01'"
"""
try:
service = get_drive_service()
results = service.files().list(
q=query,
spaces='drive',
fields='files(id, name, mimeType, size, createdTime, modifiedTime, webViewLink)',
orderBy='modifiedTime desc',
pageSize=max_results
).execute()
files = results.get('files', [])
return {
'success': True,
'query': query,
'files': [
{
'id': f['id'],
'name': f['name'],
'mimeType': f.get('mimeType', ''),
'size': f.get('size', '0'),
'createdTime': f.get('createdTime', ''),
'modifiedTime': f.get('modifiedTime', ''),
'webViewLink': f.get('webViewLink', '')
}
for f in files
],
'total': len(files)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def share_file(file_id, email, role='reader', send_notification=True):
"""
Share a file or folder with a user
role: 'reader', 'writer', 'commenter'
For folders, 'reader' allows viewing, 'writer' allows editing
"""
try:
service = get_drive_service()
permission = {
'type': 'user',
'role': role,
'emailAddress': email
}
result = service.permissions().create(
fileId=file_id,
body=permission,
sendNotificationEmail=send_notification
).execute()
return {
'success': True,
'permissionId': result.get('id'),
'fileId': file_id,
'sharedWith': email,
'role': role
}
except Exception as error:
return {'success': False, 'error': str(error)}
def share_file_with_anyone(file_id, role='reader'):
"""
Share a file with anyone who has the link
role: 'reader' or 'writer'
"""
try:
service = get_drive_service()
permission = {
'type': 'anyone',
'role': role
}
result = service.permissions().create(
fileId=file_id,
body=permission
).execute()
# Get the shareable link
file = service.files().get(
fileId=file_id,
fields='webViewLink'
).execute()
return {
'success': True,
'permissionId': result.get('id'),
'fileId': file_id,
'role': role,
'shareableLink': file.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def get_file_metadata(file_id):
"""Get detailed metadata about a file"""
try:
service = get_drive_service()
file = service.files().get(
fileId=file_id,
fields='id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink, thumbnailLink, description, starred, trashed, owners, permissions, capabilities'
).execute()
return {
'success': True,
'file': {
'id': file.get('id'),
'name': file.get('name'),
'mimeType': file.get('mimeType', ''),
'size': file.get('size', '0'),
'createdTime': file.get('createdTime', ''),
'modifiedTime': file.get('modifiedTime', ''),
'parents': file.get('parents', []),
'webViewLink': file.get('webViewLink', ''),
'iconLink': file.get('iconLink', ''),
'thumbnailLink': file.get('thumbnailLink', ''),
'description': file.get('description', ''),
'starred': file.get('starred', False),
'trashed': file.get('trashed', False),
'owners': [
{
'displayName': owner.get('displayName', ''),
'email': owner.get('emailAddress', '')
}
for owner in file.get('owners', [])
],
'permissions': [
{
'id': perm.get('id', ''),
'type': perm.get('type', ''),
'role': perm.get('role', ''),
'email': perm.get('emailAddress', '')
}
for perm in file.get('permissions', [])
],
'capabilities': file.get('capabilities', {})
}
}
except Exception as error:
return {'success': False, 'error': str(error)}
def rename_file(file_id, new_name):
"""Rename a file or folder"""
try:
service = get_drive_service()
file = service.files().update(
fileId=file_id,
body={'name': new_name},
fields='id, name, webViewLink'
).execute()
return {
'success': True,
'fileId': file.get('id'),
'name': file.get('name'),
'webViewLink': file.get('webViewLink')
}
except Exception as error:
return {'success': False, 'error': str(error)}
def list_folders(parent_folder_id=None, max_results=50):
"""List only folders in Google Drive"""
try:
service = get_drive_service()
q = "mimeType='application/vnd.google-apps.folder'"
if parent_folder_id:
q += f" and '{parent_folder_id}' in parents"
results = service.files().list(
q=q,
spaces='drive',
fields='files(id, name, createdTime, modifiedTime, webViewLink)',
orderBy='name',
pageSize=max_results
).execute()
folders = results.get('files', [])
return {
'success': True,
'folders': [
{
'id': f['id'],
'name': f['name'],
'createdTime': f.get('createdTime', ''),
'modifiedTime': f.get('modifiedTime', ''),
'webViewLink': f.get('webViewLink', '')
}
for f in folders
],
'total': len(folders)
}
except Exception as error:
return {'success': False, 'error': str(error)}
def get_storage_quota():
"""Get storage quota information"""
try:
service = get_drive_service()
about = service.about().get(fields='storageQuota, user').execute()
quota = about.get('storageQuota', {})
user = about.get('user', {})
return {
'success': True,
'user': {
'displayName': user.get('displayName', ''),
'email': user.get('emailAddress', '')
},
'quota': {
'limit': quota.get('limit', '0'),
'usage': quota.get('usage', '0'),
'usageInDrive': quota.get('usageInDrive', '0'),
'usageInDriveTrash': quota.get('usageInDriveTrash', '0')
}
}
except Exception as error:
return {'success': False, 'error': str(error)}
# Helper function to export emails to Drive
def export_to_drive(file_name, content, folder_id=None, mime_type='text/plain'):
"""
Export content to a file in Google Drive
Useful for exporting email summaries, reports, etc.
"""
try:
result = upload_file(
name=file_name,
content=content,
mime_type=mime_type,
folder_id=folder_id
)
if result['success']:
return {
'success': True,
'fileId': result['fileId'],
'name': result['name'],
'webViewLink': result['webViewLink'],
'message': f'Successfully exported to {file_name}'
}
return result
except Exception as error:
return {'success': False, 'error': str(error)}