Skip to main content
Glama
google_drive.py17.6 kB
""" Google Drive Integration Module Full file management operations """ import io import base64 from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload from src.google_auth import get_credentials def get_drive_service(): creds = get_credentials() return build('drive', 'v3', credentials=creds) def list_files(folder_id=None, query=None, max_results=50): """ List files in Google Drive folder_id: specific folder to list (None for root) query: additional search query """ try: service = get_drive_service() # Build query q_parts = [] if folder_id: q_parts.append(f"'{folder_id}' in parents") if query: q_parts.append(query) q = ' and '.join(q_parts) if q_parts else None results = service.files().list( q=q, spaces='drive', fields='files(id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink)', orderBy='modifiedTime desc', pageSize=max_results ).execute() files = results.get('files', []) return { 'success': True, 'files': [ { 'id': f['id'], 'name': f['name'], 'mimeType': f.get('mimeType', ''), 'size': f.get('size', '0'), 'createdTime': f.get('createdTime', ''), 'modifiedTime': f.get('modifiedTime', ''), 'parents': f.get('parents', []), 'webViewLink': f.get('webViewLink', ''), 'iconLink': f.get('iconLink', '') } for f in files ], 'total': len(files) } except Exception as error: return {'success': False, 'error': str(error)} def create_folder(name, parent_folder_id=None): """Create a new folder in Google Drive""" try: service = get_drive_service() file_metadata = { 'name': name, 'mimeType': 'application/vnd.google-apps.folder' } if parent_folder_id: file_metadata['parents'] = [parent_folder_id] folder = service.files().create( body=file_metadata, fields='id, name, webViewLink' ).execute() return { 'success': True, 'folderId': folder.get('id'), 'name': folder.get('name'), 'webViewLink': folder.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def upload_file(name, content, mime_type='text/plain', folder_id=None): """ Upload a file to Google Drive content: file content as string or base64-encoded string for binary mime_type: MIME type of the file folder_id: destination folder (None for root) """ try: service = get_drive_service() file_metadata = {'name': name} if folder_id: file_metadata['parents'] = [folder_id] # Handle content - check if it's base64 encoded if mime_type.startswith('image/') or mime_type.startswith('application/'): try: # Try to decode as base64 file_content = base64.b64decode(content) except Exception: # If not base64, treat as raw string file_content = content.encode('utf-8') if isinstance(content, str) else content else: file_content = content.encode('utf-8') if isinstance(content, str) else content media = MediaIoBaseUpload( io.BytesIO(file_content), mimetype=mime_type, resumable=True ) file = service.files().create( body=file_metadata, media_body=media, fields='id, name, mimeType, size, webViewLink' ).execute() return { 'success': True, 'fileId': file.get('id'), 'name': file.get('name'), 'mimeType': file.get('mimeType'), 'size': file.get('size'), 'webViewLink': file.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def download_file(file_id): """ Download a file from Google Drive Returns file content (base64 encoded for binary files) """ try: service = get_drive_service() # First get file metadata to determine type file_metadata = service.files().get( fileId=file_id, fields='name, mimeType' ).execute() mime_type = file_metadata.get('mimeType', '') file_name = file_metadata.get('name', '') # Handle Google Workspace files (need to export) export_mime_types = { 'application/vnd.google-apps.document': 'application/pdf', 'application/vnd.google-apps.spreadsheet': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.google-apps.presentation': 'application/pdf', 'application/vnd.google-apps.drawing': 'image/png' } if mime_type in export_mime_types: # Export Google Workspace file request = service.files().export_media( fileId=file_id, mimeType=export_mime_types[mime_type] ) export_mime = export_mime_types[mime_type] else: # Download regular file request = service.files().get_media(fileId=file_id) export_mime = mime_type # Download content file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while not done: _, done = downloader.next_chunk() file_content.seek(0) content_bytes = file_content.read() # Encode as base64 for binary content if export_mime.startswith('text/'): content = content_bytes.decode('utf-8', errors='ignore') is_binary = False else: content = base64.b64encode(content_bytes).decode('utf-8') is_binary = True return { 'success': True, 'fileId': file_id, 'name': file_name, 'mimeType': export_mime, 'content': content, 'isBinary': is_binary, 'size': len(content_bytes) } except Exception as error: return {'success': False, 'error': str(error)} def move_file(file_id, new_folder_id): """Move a file to a different folder""" try: service = get_drive_service() # Get current parents file = service.files().get( fileId=file_id, fields='parents' ).execute() previous_parents = ','.join(file.get('parents', [])) # Move file file = service.files().update( fileId=file_id, addParents=new_folder_id, removeParents=previous_parents, fields='id, name, parents, webViewLink' ).execute() return { 'success': True, 'fileId': file.get('id'), 'name': file.get('name'), 'newParents': file.get('parents', []), 'webViewLink': file.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def copy_file(file_id, new_name=None, folder_id=None): """Copy a file""" try: service = get_drive_service() body = {} if new_name: body['name'] = new_name if folder_id: body['parents'] = [folder_id] copied_file = service.files().copy( fileId=file_id, body=body if body else None, fields='id, name, webViewLink' ).execute() return { 'success': True, 'fileId': copied_file.get('id'), 'name': copied_file.get('name'), 'webViewLink': copied_file.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def delete_file(file_id): """Delete a file or folder (moves to trash)""" try: service = get_drive_service() # Move to trash instead of permanent delete service.files().update( fileId=file_id, body={'trashed': True} ).execute() return { 'success': True, 'fileId': file_id, 'message': 'File moved to trash' } except Exception as error: return {'success': False, 'error': str(error)} def permanently_delete_file(file_id): """Permanently delete a file or folder (cannot be recovered)""" try: service = get_drive_service() service.files().delete(fileId=file_id).execute() return { 'success': True, 'fileId': file_id, 'message': 'File permanently deleted' } except Exception as error: return {'success': False, 'error': str(error)} def search_files(query, max_results=50): """ Search for files in Google Drive query: search query (supports Drive query syntax) Examples: - "name contains 'report'" - "mimeType='application/pdf'" - "fullText contains 'budget'" - "modifiedTime > '2024-01-01'" """ try: service = get_drive_service() results = service.files().list( q=query, spaces='drive', fields='files(id, name, mimeType, size, createdTime, modifiedTime, webViewLink)', orderBy='modifiedTime desc', pageSize=max_results ).execute() files = results.get('files', []) return { 'success': True, 'query': query, 'files': [ { 'id': f['id'], 'name': f['name'], 'mimeType': f.get('mimeType', ''), 'size': f.get('size', '0'), 'createdTime': f.get('createdTime', ''), 'modifiedTime': f.get('modifiedTime', ''), 'webViewLink': f.get('webViewLink', '') } for f in files ], 'total': len(files) } except Exception as error: return {'success': False, 'error': str(error)} def share_file(file_id, email, role='reader', send_notification=True): """ Share a file or folder with a user role: 'reader', 'writer', 'commenter' For folders, 'reader' allows viewing, 'writer' allows editing """ try: service = get_drive_service() permission = { 'type': 'user', 'role': role, 'emailAddress': email } result = service.permissions().create( fileId=file_id, body=permission, sendNotificationEmail=send_notification ).execute() return { 'success': True, 'permissionId': result.get('id'), 'fileId': file_id, 'sharedWith': email, 'role': role } except Exception as error: return {'success': False, 'error': str(error)} def share_file_with_anyone(file_id, role='reader'): """ Share a file with anyone who has the link role: 'reader' or 'writer' """ try: service = get_drive_service() permission = { 'type': 'anyone', 'role': role } result = service.permissions().create( fileId=file_id, body=permission ).execute() # Get the shareable link file = service.files().get( fileId=file_id, fields='webViewLink' ).execute() return { 'success': True, 'permissionId': result.get('id'), 'fileId': file_id, 'role': role, 'shareableLink': file.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def get_file_metadata(file_id): """Get detailed metadata about a file""" try: service = get_drive_service() file = service.files().get( fileId=file_id, fields='id, name, mimeType, size, createdTime, modifiedTime, parents, webViewLink, iconLink, thumbnailLink, description, starred, trashed, owners, permissions, capabilities' ).execute() return { 'success': True, 'file': { 'id': file.get('id'), 'name': file.get('name'), 'mimeType': file.get('mimeType', ''), 'size': file.get('size', '0'), 'createdTime': file.get('createdTime', ''), 'modifiedTime': file.get('modifiedTime', ''), 'parents': file.get('parents', []), 'webViewLink': file.get('webViewLink', ''), 'iconLink': file.get('iconLink', ''), 'thumbnailLink': file.get('thumbnailLink', ''), 'description': file.get('description', ''), 'starred': file.get('starred', False), 'trashed': file.get('trashed', False), 'owners': [ { 'displayName': owner.get('displayName', ''), 'email': owner.get('emailAddress', '') } for owner in file.get('owners', []) ], 'permissions': [ { 'id': perm.get('id', ''), 'type': perm.get('type', ''), 'role': perm.get('role', ''), 'email': perm.get('emailAddress', '') } for perm in file.get('permissions', []) ], 'capabilities': file.get('capabilities', {}) } } except Exception as error: return {'success': False, 'error': str(error)} def rename_file(file_id, new_name): """Rename a file or folder""" try: service = get_drive_service() file = service.files().update( fileId=file_id, body={'name': new_name}, fields='id, name, webViewLink' ).execute() return { 'success': True, 'fileId': file.get('id'), 'name': file.get('name'), 'webViewLink': file.get('webViewLink') } except Exception as error: return {'success': False, 'error': str(error)} def list_folders(parent_folder_id=None, max_results=50): """List only folders in Google Drive""" try: service = get_drive_service() q = "mimeType='application/vnd.google-apps.folder'" if parent_folder_id: q += f" and '{parent_folder_id}' in parents" results = service.files().list( q=q, spaces='drive', fields='files(id, name, createdTime, modifiedTime, webViewLink)', orderBy='name', pageSize=max_results ).execute() folders = results.get('files', []) return { 'success': True, 'folders': [ { 'id': f['id'], 'name': f['name'], 'createdTime': f.get('createdTime', ''), 'modifiedTime': f.get('modifiedTime', ''), 'webViewLink': f.get('webViewLink', '') } for f in folders ], 'total': len(folders) } except Exception as error: return {'success': False, 'error': str(error)} def get_storage_quota(): """Get storage quota information""" try: service = get_drive_service() about = service.about().get(fields='storageQuota, user').execute() quota = about.get('storageQuota', {}) user = about.get('user', {}) return { 'success': True, 'user': { 'displayName': user.get('displayName', ''), 'email': user.get('emailAddress', '') }, 'quota': { 'limit': quota.get('limit', '0'), 'usage': quota.get('usage', '0'), 'usageInDrive': quota.get('usageInDrive', '0'), 'usageInDriveTrash': quota.get('usageInDriveTrash', '0') } } except Exception as error: return {'success': False, 'error': str(error)} # Helper function to export emails to Drive def export_to_drive(file_name, content, folder_id=None, mime_type='text/plain'): """ Export content to a file in Google Drive Useful for exporting email summaries, reports, etc. """ try: result = upload_file( name=file_name, content=content, mime_type=mime_type, folder_id=folder_id ) if result['success']: return { 'success': True, 'fileId': result['fileId'], 'name': result['name'], 'webViewLink': result['webViewLink'], 'message': f'Successfully exported to {file_name}' } return result except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server