google_drive_mcp_server.py•11.4 kB
#!/usr/bin/env python3
"""
Google Drive MCP Server
Provides file management capabilities for Google Drive through MCP protocol.
"""
import os
import json
import asyncio
from typing import Any, Optional
from pathlib import Path
from mcp.server import Server
from mcp.types import Tool, TextContent
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# If modifying these scopes, delete the token.json file
SCOPES = ['https://www.googleapis.com/auth/drive']
# Global service instance
drive_service = None
def get_drive_service():
"""Authenticates and returns Google Drive service instance."""
global drive_service
if drive_service is not None:
return drive_service
creds = None
token_path = Path.home() / '.google-drive-mcp' / 'token.json'
credentials_path = Path.home() / '.google-drive-mcp' / 'credentials.json'
# Create directory if it doesn't exist
token_path.parent.mkdir(parents=True, exist_ok=True)
# Check for existing token
if token_path.exists():
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
# If no valid credentials, authenticate
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not credentials_path.exists():
raise FileNotFoundError(
f"Credentials file not found at {credentials_path}\n"
"Please download your OAuth credentials from Google Cloud Console "
"and save as ~/.google-drive-mcp/credentials.json"
)
flow = InstalledAppFlow.from_client_secrets_file(
str(credentials_path), SCOPES)
creds = flow.run_local_server(port=0)
# Save credentials for next run
with open(token_path, 'w') as token:
token.write(creds.to_json())
drive_service = build('drive', 'v3', credentials=creds)
return drive_service
def create_folder(name: str, parent_id: Optional[str] = None) -> dict:
"""Create a new folder in Google Drive."""
service = get_drive_service()
file_metadata = {
'name': name,
'mimeType': 'application/vnd.google-apps.folder'
}
if parent_id:
file_metadata['parents'] = [parent_id]
try:
folder = service.files().create(
body=file_metadata,
fields='id, name, webViewLink'
).execute()
return {
'success': True,
'folder_id': folder.get('id'),
'folder_name': folder.get('name'),
'url': folder.get('webViewLink')
}
except HttpError as error:
return {
'success': False,
'error': str(error)
}
def move_file(file_id: str, new_parent_id: str) -> dict:
"""Move a file to a different folder."""
service = get_drive_service()
try:
# Get current parents
file = service.files().get(
fileId=file_id,
fields='parents, name'
).execute()
previous_parents = ','.join(file.get('parents', []))
# Move the file
file = service.files().update(
fileId=file_id,
addParents=new_parent_id,
removeParents=previous_parents,
fields='id, name, parents, webViewLink'
).execute()
return {
'success': True,
'file_id': file.get('id'),
'file_name': file.get('name'),
'new_parent': file.get('parents'),
'url': file.get('webViewLink')
}
except HttpError as error:
return {
'success': False,
'error': str(error)
}
def rename_file(file_id: str, new_name: str) -> dict:
"""Rename a file or folder."""
service = get_drive_service()
try:
file = service.files().update(
fileId=file_id,
body={'name': new_name},
fields='id, name, webViewLink'
).execute()
return {
'success': True,
'file_id': file.get('id'),
'new_name': file.get('name'),
'url': file.get('webViewLink')
}
except HttpError as error:
return {
'success': False,
'error': str(error)
}
def get_file_info(file_id: str) -> dict:
"""Get information about a file."""
service = get_drive_service()
try:
file = service.files().get(
fileId=file_id,
fields='id, name, mimeType, parents, webViewLink, createdTime, modifiedTime'
).execute()
return {
'success': True,
'file_id': file.get('id'),
'name': file.get('name'),
'mime_type': file.get('mimeType'),
'parents': file.get('parents', []),
'url': file.get('webViewLink'),
'created': file.get('createdTime'),
'modified': file.get('modifiedTime')
}
except HttpError as error:
return {
'success': False,
'error': str(error)
}
def list_folder_contents(folder_id: str = 'root', page_size: int = 100) -> dict:
"""List contents of a folder."""
service = get_drive_service()
try:
results = service.files().list(
q=f"'{folder_id}' in parents and trashed=false",
pageSize=page_size,
fields="files(id, name, mimeType, webViewLink)"
).execute()
items = results.get('files', [])
return {
'success': True,
'count': len(items),
'items': items
}
except HttpError as error:
return {
'success': False,
'error': str(error)
}
# Create MCP server
server = Server("google-drive-mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available Google Drive management tools."""
return [
Tool(
name="create_drive_folder",
description="Create a new folder in Google Drive. You can optionally specify a parent folder ID.",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the folder to create"
},
"parent_id": {
"type": "string",
"description": "Optional: ID of the parent folder. If not provided, creates in root."
}
},
"required": ["name"]
}
),
Tool(
name="move_drive_file",
description="Move a file or folder to a different parent folder in Google Drive.",
inputSchema={
"type": "object",
"properties": {
"file_id": {
"type": "string",
"description": "ID of the file or folder to move"
},
"new_parent_id": {
"type": "string",
"description": "ID of the destination folder"
}
},
"required": ["file_id", "new_parent_id"]
}
),
Tool(
name="rename_drive_file",
description="Rename a file or folder in Google Drive.",
inputSchema={
"type": "object",
"properties": {
"file_id": {
"type": "string",
"description": "ID of the file or folder to rename"
},
"new_name": {
"type": "string",
"description": "New name for the file or folder"
}
},
"required": ["file_id", "new_name"]
}
),
Tool(
name="get_drive_file_info",
description="Get detailed information about a file or folder.",
inputSchema={
"type": "object",
"properties": {
"file_id": {
"type": "string",
"description": "ID of the file or folder"
}
},
"required": ["file_id"]
}
),
Tool(
name="list_drive_folder_contents",
description="List all files and folders within a specific folder.",
inputSchema={
"type": "object",
"properties": {
"folder_id": {
"type": "string",
"description": "ID of the folder to list. Use 'root' for root directory."
},
"page_size": {
"type": "integer",
"description": "Number of items to return (default: 100)",
"default": 100
}
},
"required": ["folder_id"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "create_drive_folder":
result = create_folder(
name=arguments["name"],
parent_id=arguments.get("parent_id")
)
elif name == "move_drive_file":
result = move_file(
file_id=arguments["file_id"],
new_parent_id=arguments["new_parent_id"]
)
elif name == "rename_drive_file":
result = rename_file(
file_id=arguments["file_id"],
new_name=arguments["new_name"]
)
elif name == "get_drive_file_info":
result = get_file_info(
file_id=arguments["file_id"]
)
elif name == "list_drive_folder_contents":
result = list_folder_contents(
folder_id=arguments["folder_id"],
page_size=arguments.get("page_size", 100)
)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e)
}, indent=2)
)]
async def main():
"""Run the MCP server."""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())