Skip to main content
Glama

Google Drive MCP Server

by danbugs
google_drive_mcp_server.py11.4 kB
#!/usr/bin/env python3 """ Google Drive MCP Server Provides file management capabilities for Google Drive through MCP protocol. """ import os import json import asyncio from typing import Any, Optional from pathlib import Path from mcp.server import Server from mcp.types import Tool, TextContent from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # If modifying these scopes, delete the token.json file SCOPES = ['https://www.googleapis.com/auth/drive'] # Global service instance drive_service = None def get_drive_service(): """Authenticates and returns Google Drive service instance.""" global drive_service if drive_service is not None: return drive_service creds = None token_path = Path.home() / '.google-drive-mcp' / 'token.json' credentials_path = Path.home() / '.google-drive-mcp' / 'credentials.json' # Create directory if it doesn't exist token_path.parent.mkdir(parents=True, exist_ok=True) # Check for existing token if token_path.exists(): creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) # If no valid credentials, authenticate if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: if not credentials_path.exists(): raise FileNotFoundError( f"Credentials file not found at {credentials_path}\n" "Please download your OAuth credentials from Google Cloud Console " "and save as ~/.google-drive-mcp/credentials.json" ) flow = InstalledAppFlow.from_client_secrets_file( str(credentials_path), SCOPES) creds = flow.run_local_server(port=0) # Save credentials for next run with open(token_path, 'w') as token: token.write(creds.to_json()) drive_service = build('drive', 'v3', credentials=creds) return drive_service def create_folder(name: str, parent_id: Optional[str] = None) -> dict: """Create a new folder in Google Drive.""" service = get_drive_service() file_metadata = { 'name': name, 'mimeType': 'application/vnd.google-apps.folder' } if parent_id: file_metadata['parents'] = [parent_id] try: folder = service.files().create( body=file_metadata, fields='id, name, webViewLink' ).execute() return { 'success': True, 'folder_id': folder.get('id'), 'folder_name': folder.get('name'), 'url': folder.get('webViewLink') } except HttpError as error: return { 'success': False, 'error': str(error) } def move_file(file_id: str, new_parent_id: str) -> dict: """Move a file to a different folder.""" service = get_drive_service() try: # Get current parents file = service.files().get( fileId=file_id, fields='parents, name' ).execute() previous_parents = ','.join(file.get('parents', [])) # Move the file file = service.files().update( fileId=file_id, addParents=new_parent_id, removeParents=previous_parents, fields='id, name, parents, webViewLink' ).execute() return { 'success': True, 'file_id': file.get('id'), 'file_name': file.get('name'), 'new_parent': file.get('parents'), 'url': file.get('webViewLink') } except HttpError as error: return { 'success': False, 'error': str(error) } def rename_file(file_id: str, new_name: str) -> dict: """Rename a file or folder.""" service = get_drive_service() try: file = service.files().update( fileId=file_id, body={'name': new_name}, fields='id, name, webViewLink' ).execute() return { 'success': True, 'file_id': file.get('id'), 'new_name': file.get('name'), 'url': file.get('webViewLink') } except HttpError as error: return { 'success': False, 'error': str(error) } def get_file_info(file_id: str) -> dict: """Get information about a file.""" service = get_drive_service() try: file = service.files().get( fileId=file_id, fields='id, name, mimeType, parents, webViewLink, createdTime, modifiedTime' ).execute() return { 'success': True, 'file_id': file.get('id'), 'name': file.get('name'), 'mime_type': file.get('mimeType'), 'parents': file.get('parents', []), 'url': file.get('webViewLink'), 'created': file.get('createdTime'), 'modified': file.get('modifiedTime') } except HttpError as error: return { 'success': False, 'error': str(error) } def list_folder_contents(folder_id: str = 'root', page_size: int = 100) -> dict: """List contents of a folder.""" service = get_drive_service() try: results = service.files().list( q=f"'{folder_id}' in parents and trashed=false", pageSize=page_size, fields="files(id, name, mimeType, webViewLink)" ).execute() items = results.get('files', []) return { 'success': True, 'count': len(items), 'items': items } except HttpError as error: return { 'success': False, 'error': str(error) } # Create MCP server server = Server("google-drive-mcp") @server.list_tools() async def list_tools() -> list[Tool]: """List available Google Drive management tools.""" return [ Tool( name="create_drive_folder", description="Create a new folder in Google Drive. You can optionally specify a parent folder ID.", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name of the folder to create" }, "parent_id": { "type": "string", "description": "Optional: ID of the parent folder. If not provided, creates in root." } }, "required": ["name"] } ), Tool( name="move_drive_file", description="Move a file or folder to a different parent folder in Google Drive.", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file or folder to move" }, "new_parent_id": { "type": "string", "description": "ID of the destination folder" } }, "required": ["file_id", "new_parent_id"] } ), Tool( name="rename_drive_file", description="Rename a file or folder in Google Drive.", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file or folder to rename" }, "new_name": { "type": "string", "description": "New name for the file or folder" } }, "required": ["file_id", "new_name"] } ), Tool( name="get_drive_file_info", description="Get detailed information about a file or folder.", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file or folder" } }, "required": ["file_id"] } ), Tool( name="list_drive_folder_contents", description="List all files and folders within a specific folder.", inputSchema={ "type": "object", "properties": { "folder_id": { "type": "string", "description": "ID of the folder to list. Use 'root' for root directory." }, "page_size": { "type": "integer", "description": "Number of items to return (default: 100)", "default": 100 } }, "required": ["folder_id"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls.""" try: if name == "create_drive_folder": result = create_folder( name=arguments["name"], parent_id=arguments.get("parent_id") ) elif name == "move_drive_file": result = move_file( file_id=arguments["file_id"], new_parent_id=arguments["new_parent_id"] ) elif name == "rename_drive_file": result = rename_file( file_id=arguments["file_id"], new_name=arguments["new_name"] ) elif name == "get_drive_file_info": result = get_file_info( file_id=arguments["file_id"] ) elif name == "list_drive_folder_contents": result = list_folder_contents( folder_id=arguments["folder_id"], page_size=arguments.get("page_size", 100) ) else: result = {"success": False, "error": f"Unknown tool: {name}"} return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "success": False, "error": str(e) }, indent=2) )] async def main(): """Run the MCP server.""" from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danbugs/google-drive-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server