Skip to main content
Glama

Google Drive MCP Server

by ducla5
python-client.py8.86 kB
#!/usr/bin/env python3 """ Python MCP Client Example for Google Drive MCP Server This example demonstrates how to connect to and use the Google Drive MCP Server from a Python application using the MCP SDK. """ import asyncio import json import os import sys from typing import Dict, List, Optional, Any # Note: This assumes you have the Python MCP SDK installed # pip install mcp from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class GoogleDriveMCPClient: def __init__(self, server_path: str, env: Optional[Dict[str, str]] = None): self.server_path = server_path self.env = env or {} self.session = None async def connect(self): """Connect to the Google Drive MCP Server""" server_params = StdioServerParameters( command="node", args=[self.server_path], env={**os.environ, **self.env} ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: self.session = session # Initialize the connection await session.initialize() # List available tools tools = await session.list_tools() print(f"Connected to Google Drive MCP Server") print(f"Available tools: {[tool.name for tool in tools.tools]}") return session async def search_files(self, query: str, **options) -> Dict[str, Any]: """Search for files on Google Drive""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_search_files", { "query": query, **options }) return json.loads(result.content[0].text) async def get_file(self, file_id: str, include_content: bool = True) -> Dict[str, Any]: """Get file information and content""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_get_file", { "fileId": file_id, "includeContent": include_content }) return json.loads(result.content[0].text) async def get_content_chunk(self, file_id: str, **options) -> Dict[str, Any]: """Get a chunk of file content""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_get_content_chunk", { "fileId": file_id, **options }) return json.loads(result.content[0].text) async def get_file_metadata(self, file_id: str, include_permissions: bool = False) -> Dict[str, Any]: """Get file metadata""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_get_file_metadata", { "fileId": file_id, "includePermissions": include_permissions }) return json.loads(result.content[0].text) async def list_folder(self, folder_id: str, **options) -> Dict[str, Any]: """List folder contents""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_list_folder", { "folderId": folder_id, **options }) return json.loads(result.content[0].text) async def get_comments(self, file_id: str, **options) -> Dict[str, Any]: """Get file comments""" if not self.session: raise RuntimeError("Not connected to server") result = await self.session.call_tool("drive_get_comments", { "fileId": file_id, **options }) return json.loads(result.content[0].text) async def example_usage(): """Example usage of the Google Drive MCP Client""" # Configuration server_path = "/path/to/google-drive-mcp-server/dist/index.js" env = { "GOOGLE_CLIENT_ID": "your-client-id.googleusercontent.com", "GOOGLE_CLIENT_SECRET": "your-client-secret", "LOG_LEVEL": "info" } client = GoogleDriveMCPClient(server_path, env) try: # Connect to server session = await client.connect() client.session = session # Example 1: Search for files print("\n--- Searching for files ---") search_results = await client.search_files("presentation", limit=5) files = search_results.get("files", []) print(f"Found {len(files)} files") if files: first_file = files[0] print(f"First file: {first_file['name']} ({first_file['id']})") # Example 2: Get file metadata print("\n--- Getting file metadata ---") metadata = await client.get_file_metadata(first_file['id'], include_permissions=True) print(f"File size: {metadata.get('size', 'Unknown')} bytes") print(f"Modified: {metadata.get('modifiedTime', 'Unknown')}") owners = metadata.get('owners', []) if owners: print(f"Owner: {owners[0].get('displayName', 'Unknown')}") # Example 3: Get content chunk print("\n--- Getting content chunk ---") chunk = await client.get_content_chunk( first_file['id'], startChar=0, endChar=1000 ) content = chunk.get('content', '') if content: preview = content[:200] + "..." if len(content) > 200 else content print(f"Content preview: {preview}") # Example 4: List root folder print("\n--- Listing root folder ---") folder_contents = await client.list_folder("root", limit=10) folder_files = folder_contents.get("files", []) print(f"Root folder contains {len(folder_files)} items") for file in folder_files[:3]: # Show first 3 items print(f" - {file['name']} ({file['mimeType']})") except Exception as error: print(f"Error: {error}") sys.exit(1) async def batch_process_example(): """Example of batch processing multiple files""" server_path = "/path/to/google-drive-mcp-server/dist/index.js" env = { "GOOGLE_CLIENT_ID": "your-client-id.googleusercontent.com", "GOOGLE_CLIENT_SECRET": "your-client-secret" } client = GoogleDriveMCPClient(server_path, env) try: session = await client.connect() client.session = session # Search for documents to process search_results = await client.search_files( "report", fileType=["application/pdf", "application/vnd.google-apps.document"], limit=10 ) files = search_results.get("files", []) print(f"Processing {len(files)} files...") results = [] for file in files: try: # Get file metadata metadata = await client.get_file_metadata(file['id']) # Skip very large files if metadata.get('size', 0) > 50 * 1024 * 1024: # 50MB print(f"Skipping large file: {file['name']}") continue # Get content summary (first 2000 characters) chunk = await client.get_content_chunk( file['id'], startChar=0, endChar=2000 ) results.append({ 'name': file['name'], 'id': file['id'], 'size': metadata.get('size', 0), 'modified': metadata.get('modifiedTime'), 'summary': chunk.get('content', '')[:500] + "..." }) print(f"Processed: {file['name']}") except Exception as e: print(f"Error processing {file['name']}: {e}") # Save results with open('batch_results.json', 'w') as f: json.dump(results, f, indent=2) print(f"Batch processing complete. Results saved to batch_results.json") except Exception as error: print(f"Batch processing error: {error}") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "batch": asyncio.run(batch_process_example()) else: asyncio.run(example_usage())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ducla5/gdriver-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server