Skip to main content
Glama

mcp-server-gdrive

by asadudin
main.py16.9 kB
import os import json import time import base64 from typing import Any, Dict, List, Optional, Union from pathlib import Path from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP import httpx from google.oauth2 import service_account from google.auth.transport.requests import Request # Load environment variables load_dotenv() HOST = os.environ.get("HOST", "0.0.0.0") PORT = int(os.environ.get("PORT", 8055)) SERVICE_ACCOUNT_FILE = os.environ.get("GOOGLE_SERVICE_ACCOUNT_FILE", "mcp-project-458109-5e6bacb68907.json") SCOPES = os.environ.get("GOOGLE_DRIVE_SCOPES", "https://www.googleapis.com/auth/drive") # Initialize FastMCP server mcp = FastMCP( "gdrive", host=HOST, port=PORT ) # Helper for Google Sheets API async def make_sheets_request(endpoint: str, method: str = "GET", params: Optional[dict] = None, data: Optional[dict] = None) -> dict: """ Make a request to the Google Sheets API with proper error handling. Args: endpoint: The API endpoint to call (relative to /v4/) method: HTTP method (GET, POST, PUT, PATCH) params: Query parameters data: JSON data for the request body Returns: Response from the API as a dictionary """ creds = get_google_creds() creds.refresh(Request()) headers = { "Authorization": f"Bearer {creds.token}", "Accept": "application/json", "Content-Type": "application/json" } url = f"https://sheets.googleapis.com/v4/{endpoint}" async with httpx.AsyncClient() as client: try: if method.upper() == "GET": resp = await client.get(url, headers=headers, params=params, timeout=30.0) elif method.upper() == "POST": resp = await client.post(url, headers=headers, params=params, json=data, timeout=30.0) elif method.upper() == "PUT": resp = await client.put(url, headers=headers, params=params, json=data, timeout=30.0) elif method.upper() == "PATCH": resp = await client.patch(url, headers=headers, params=params, json=data, timeout=30.0) else: return {"error": f"Unsupported method: {method}"} resp.raise_for_status() if resp.headers.get("content-type", "").startswith("application/json"): return resp.json() else: return {"content": resp.content, "headers": dict(resp.headers)} except httpx.HTTPStatusError as e: error_detail = {} try: error_detail = e.response.json() except: error_detail = {"response_text": e.response.text} return { "error": f"API Error: {e.response.status_code} - {str(e)}", "status_code": e.response.status_code, "details": error_detail } except httpx.RequestError as e: return {"error": f"Request Error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} # ---- Google Sheets Tools ---- @mcp.tool() async def create_spreadsheet(title: str) -> str: """ Create a new Google Spreadsheet. Args: title: Title of the new spreadsheet Returns: JSON string with spreadsheetId """ data = {"properties": {"title": title}} response = await make_sheets_request("spreadsheets", method="POST", data=data) return json.dumps(response, indent=2) @mcp.tool() async def read_sheet_values(spreadsheet_id: str, range: str) -> str: """ Read cell values from a Google Spreadsheet. Args: spreadsheet_id: Spreadsheet (document) ID range: A1 notation range (e.g., "Sheet1!A1:C2") Returns: JSON string with cell values """ endpoint = f"spreadsheets/{spreadsheet_id}/values/{range}" response = await make_sheets_request(endpoint, method="GET") return json.dumps(response, indent=2) @mcp.tool() async def update_sheet_values(spreadsheet_id: str, range: str, values: list, value_input_option: str = "USER_ENTERED") -> str: """ Update cell values in a Google Spreadsheet. Args: spreadsheet_id: Spreadsheet (document) ID range: A1 notation range (e.g., "Sheet1!A1:C2") values: 2D array of values value_input_option: "RAW" or "USER_ENTERED" (default) Returns: JSON string with update result """ endpoint = f"spreadsheets/{spreadsheet_id}/values/{range}" params = {"valueInputOption": value_input_option} data = {"values": values} response = await make_sheets_request(endpoint, method="PUT", params=params, data=data) return json.dumps(response, indent=2) @mcp.tool() async def append_sheet_values(spreadsheet_id: str, range: str, values: list, value_input_option: str = "USER_ENTERED") -> str: """ Append values to a Google Spreadsheet. Args: spreadsheet_id: Spreadsheet (document) ID range: A1 notation range (e.g., "Sheet1!A1:C2") values: 2D array of values value_input_option: "RAW" or "USER_ENTERED" (default) Returns: JSON string with append result """ endpoint = f"spreadsheets/{spreadsheet_id}/values/{range}:append" params = {"valueInputOption": value_input_option} data = {"values": values} response = await make_sheets_request(endpoint, method="POST", params=params, data=data) return json.dumps(response, indent=2) @mcp.tool() async def batch_update_sheet(spreadsheet_id: str, requests: list) -> str: """ Batch update a Google Spreadsheet (formatting, find/replace, etc). Args: spreadsheet_id: Spreadsheet (document) ID requests: List of batch update requests (see Sheets API docs) Returns: JSON string with batch update result """ endpoint = f"spreadsheets/{spreadsheet_id}:batchUpdate" data = {"requests": requests} response = await make_sheets_request(endpoint, method="POST", data=data) return json.dumps(response, indent=2) def get_google_creds(): creds = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=[SCOPES] if isinstance(SCOPES, str) else SCOPES ) return creds async def make_gdrive_request(endpoint: str, method: str = "GET", params: Optional[dict] = None, data: Optional[dict] = None, files: Optional[dict] = None, multipart: bool = False) -> dict: """Make a request to the Google Drive API with proper error handling. Args: endpoint: The API endpoint to call (e.g., 'files', 'files/{fileId}', etc.) method: HTTP method (GET, POST, PATCH, DELETE) params: Query parameters data: JSON data for the request body files: Files to upload (for multipart requests) multipart: Whether this is a multipart request Returns: Response from the API as a dictionary """ creds = get_google_creds() creds.refresh(Request()) headers = { "Authorization": f"Bearer {creds.token}", "Accept": "application/json" } url = f"https://www.googleapis.com/drive/v3/{endpoint}" async with httpx.AsyncClient() as client: try: if method.upper() == "GET": resp = await client.get(url, headers=headers, params=params, timeout=30.0) elif method.upper() == "POST": if multipart and files: # For file uploads resp = await client.post(url, headers=headers, params=params, files=files, data=data, timeout=60.0) else: resp = await client.post(url, headers=headers, json=data, params=params, timeout=30.0) elif method.upper() == "PATCH": if multipart and files: resp = await client.patch(url, headers=headers, params=params, files=files, data=data, timeout=60.0) else: resp = await client.patch(url, headers=headers, json=data, params=params, timeout=30.0) elif method.upper() == "DELETE": resp = await client.delete(url, headers=headers, params=params, timeout=30.0) else: return {"error": f"Unsupported method: {method}"} resp.raise_for_status() # Some endpoints might not return JSON (like file downloads) if resp.headers.get("content-type", "").startswith("application/json"): return resp.json() else: return {"content": resp.content, "headers": dict(resp.headers)} except httpx.HTTPStatusError as e: error_detail = {} try: error_detail = e.response.json() except: error_detail = {"response_text": e.response.text} return { "error": f"API Error: {e.response.status_code} - {str(e)}", "status_code": e.response.status_code, "details": error_detail } except httpx.RequestError as e: return {"error": f"Request Error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} @mcp.tool() async def list_files(page_size: int = 10, q: Optional[str] = None, page_token: Optional[str] = None) -> str: """List files in Google Drive with pagination support. Args: page_size: Maximum number of files to return (default: 10) q: Search query in Google Drive query format (e.g., "name contains 'report'") page_token: Token for pagination (from a previous list_files call) Returns: JSON string with files and pagination info """ params = { "pageSize": page_size, "fields": "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink)" } if q: params["q"] = q if page_token: params["pageToken"] = page_token response = await make_gdrive_request("files", params=params) if "error" in response: return json.dumps({"error": response["error"]}, indent=2) result = { "files": response.get("files", []), "nextPageToken": response.get("nextPageToken", None) } return json.dumps(result, indent=2) @mcp.tool() async def debug_api_connection() -> str: """Debug the Google Drive API connection to help diagnose issues.""" try: creds = get_google_creds() creds.refresh(Request()) # Test a simple API call about_response = await make_gdrive_request("about", params={"fields": "user,storageQuota"}) return json.dumps({ "token_valid": creds.valid, "token_expiry": str(creds.expiry), "scopes": creds.scopes, "api_test": "success" if "error" not in about_response else "failed", "user_info": about_response.get("user", {}), "storage_quota": about_response.get("storageQuota", {}) }, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp.tool() async def get_file_info(file_id: str) -> str: """Get detailed information about a specific file in Google Drive. Args: file_id: The ID of the file to get information about Returns: JSON string with file metadata """ params = {"fields": "id,name,mimeType,size,webViewLink,createdTime,modifiedTime,owners,shared,parents"} response = await make_gdrive_request(f"files/{file_id}", params=params) if "error" in response: return json.dumps({"error": response["error"]}, indent=2) return json.dumps(response, indent=2) @mcp.tool() async def create_folder(name: str, parent_id: Optional[str] = None) -> str: """Create a new folder in Google Drive. Args: name: Name of the folder to create parent_id: Optional ID of the parent folder (if not specified, folder will be created in root) Returns: JSON string with created folder metadata """ data = { "name": name, "mimeType": "application/vnd.google-apps.folder" } if parent_id: data["parents"] = [parent_id] response = await make_gdrive_request("files", method="POST", data=data) if "error" in response: return json.dumps({"error": response["error"]}, indent=2) return json.dumps(response, indent=2) @mcp.tool() async def upload_file(name: str, content: str, mime_type: str = "text/plain", parent_id: Optional[str] = None) -> str: """Upload a file to Google Drive. Args: name: Name of the file to create content: Base64-encoded content of the file mime_type: MIME type of the file (default: text/plain) parent_id: Optional ID of the parent folder (if not specified, file will be created in root) Returns: JSON string with uploaded file metadata """ try: # Decode the base64 content file_content = base64.b64decode(content) # Metadata for the file metadata = { "name": name } if parent_id: metadata["parents"] = [parent_id] # Parameters for the request params = { "uploadType": "multipart", "fields": "id,name,mimeType,size,webViewLink" } # Create the multipart request files = { "metadata": ("metadata", json.dumps(metadata), "application/json"), "file": (name, file_content, mime_type) } response = await make_gdrive_request("files", method="POST", params=params, files=files, multipart=True) if "error" in response: return json.dumps({"error": response["error"]}, indent=2) return json.dumps(response, indent=2) except Exception as e: return json.dumps({"error": f"Upload failed: {str(e)}"}, indent=2) @mcp.tool() async def download_file(file_id: str) -> str: """Download a file from Google Drive. Args: file_id: The ID of the file to download Returns: JSON string with file content (base64 encoded) and metadata """ try: # First get file metadata to get the name and mime type file_info = await make_gdrive_request(f"files/{file_id}", params={"fields": "name,mimeType,size"}) if "error" in file_info: return json.dumps({"error": file_info["error"]}, indent=2) # Check if file is too large (limit to 10MB for safety) size = int(file_info.get("size", 0)) if size > 10 * 1024 * 1024: # 10MB return json.dumps({"error": f"File too large to download via MCP: {size} bytes"}, indent=2) # Download the file content response = await make_gdrive_request(f"files/{file_id}?alt=media") if "error" in response: return json.dumps({"error": response["error"]}, indent=2) # The content is in binary, encode as base64 content = response.get("content", b"") encoded_content = base64.b64encode(content).decode("utf-8") return json.dumps({ "name": file_info.get("name"), "mimeType": file_info.get("mimeType"), "size": file_info.get("size"), "content": encoded_content }, indent=2) except Exception as e: return json.dumps({"error": f"Download failed: {str(e)}"}, indent=2) @mcp.tool() async def share_file(file_id: str, email: str, role: str = "reader") -> str: """Share a file or folder with another user. Args: file_id: The ID of the file or folder to share email: Email address of the user to share with role: Permission role to grant (reader, writer, commenter, owner) Returns: JSON string with created permission """ data = { "type": "user", "role": role, "emailAddress": email } params = { "sendNotificationEmail": "true", "fields": "id,type,role,emailAddress" } response = await make_gdrive_request(f"files/{file_id}/permissions", method="POST", data=data, params=params) if "error" in response: return json.dumps({"error": response["error"]}, indent=2) return json.dumps(response, indent=2) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument('--transport', type=str, default='sse', help='Transport type (stdio, sse, etc.)') args = parser.parse_args() mcp.run(transport=args.transport)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/asadudin/mcp-server-gdrive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server