python-client.py•8.86 kB
#!/usr/bin/env python3
"""
Python MCP Client Example for Google Drive MCP Server
This example demonstrates how to connect to and use the Google Drive MCP Server
from a Python application using the MCP SDK.
"""
import asyncio
import json
import os
import sys
from typing import Dict, List, Optional, Any
# Note: This assumes you have the Python MCP SDK installed
# pip install mcp
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class GoogleDriveMCPClient:
def __init__(self, server_path: str, env: Optional[Dict[str, str]] = None):
self.server_path = server_path
self.env = env or {}
self.session = None
async def connect(self):
"""Connect to the Google Drive MCP Server"""
server_params = StdioServerParameters(
command="node",
args=[self.server_path],
env={**os.environ, **self.env}
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
# Initialize the connection
await session.initialize()
# List available tools
tools = await session.list_tools()
print(f"Connected to Google Drive MCP Server")
print(f"Available tools: {[tool.name for tool in tools.tools]}")
return session
async def search_files(self, query: str, **options) -> Dict[str, Any]:
"""Search for files on Google Drive"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_search_files", {
"query": query,
**options
})
return json.loads(result.content[0].text)
async def get_file(self, file_id: str, include_content: bool = True) -> Dict[str, Any]:
"""Get file information and content"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_get_file", {
"fileId": file_id,
"includeContent": include_content
})
return json.loads(result.content[0].text)
async def get_content_chunk(self, file_id: str, **options) -> Dict[str, Any]:
"""Get a chunk of file content"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_get_content_chunk", {
"fileId": file_id,
**options
})
return json.loads(result.content[0].text)
async def get_file_metadata(self, file_id: str, include_permissions: bool = False) -> Dict[str, Any]:
"""Get file metadata"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_get_file_metadata", {
"fileId": file_id,
"includePermissions": include_permissions
})
return json.loads(result.content[0].text)
async def list_folder(self, folder_id: str, **options) -> Dict[str, Any]:
"""List folder contents"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_list_folder", {
"folderId": folder_id,
**options
})
return json.loads(result.content[0].text)
async def get_comments(self, file_id: str, **options) -> Dict[str, Any]:
"""Get file comments"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool("drive_get_comments", {
"fileId": file_id,
**options
})
return json.loads(result.content[0].text)
async def example_usage():
"""Example usage of the Google Drive MCP Client"""
# Configuration
server_path = "/path/to/google-drive-mcp-server/dist/index.js"
env = {
"GOOGLE_CLIENT_ID": "your-client-id.googleusercontent.com",
"GOOGLE_CLIENT_SECRET": "your-client-secret",
"LOG_LEVEL": "info"
}
client = GoogleDriveMCPClient(server_path, env)
try:
# Connect to server
session = await client.connect()
client.session = session
# Example 1: Search for files
print("\n--- Searching for files ---")
search_results = await client.search_files("presentation", limit=5)
files = search_results.get("files", [])
print(f"Found {len(files)} files")
if files:
first_file = files[0]
print(f"First file: {first_file['name']} ({first_file['id']})")
# Example 2: Get file metadata
print("\n--- Getting file metadata ---")
metadata = await client.get_file_metadata(first_file['id'], include_permissions=True)
print(f"File size: {metadata.get('size', 'Unknown')} bytes")
print(f"Modified: {metadata.get('modifiedTime', 'Unknown')}")
owners = metadata.get('owners', [])
if owners:
print(f"Owner: {owners[0].get('displayName', 'Unknown')}")
# Example 3: Get content chunk
print("\n--- Getting content chunk ---")
chunk = await client.get_content_chunk(
first_file['id'],
startChar=0,
endChar=1000
)
content = chunk.get('content', '')
if content:
preview = content[:200] + "..." if len(content) > 200 else content
print(f"Content preview: {preview}")
# Example 4: List root folder
print("\n--- Listing root folder ---")
folder_contents = await client.list_folder("root", limit=10)
folder_files = folder_contents.get("files", [])
print(f"Root folder contains {len(folder_files)} items")
for file in folder_files[:3]: # Show first 3 items
print(f" - {file['name']} ({file['mimeType']})")
except Exception as error:
print(f"Error: {error}")
sys.exit(1)
async def batch_process_example():
"""Example of batch processing multiple files"""
server_path = "/path/to/google-drive-mcp-server/dist/index.js"
env = {
"GOOGLE_CLIENT_ID": "your-client-id.googleusercontent.com",
"GOOGLE_CLIENT_SECRET": "your-client-secret"
}
client = GoogleDriveMCPClient(server_path, env)
try:
session = await client.connect()
client.session = session
# Search for documents to process
search_results = await client.search_files(
"report",
fileType=["application/pdf", "application/vnd.google-apps.document"],
limit=10
)
files = search_results.get("files", [])
print(f"Processing {len(files)} files...")
results = []
for file in files:
try:
# Get file metadata
metadata = await client.get_file_metadata(file['id'])
# Skip very large files
if metadata.get('size', 0) > 50 * 1024 * 1024: # 50MB
print(f"Skipping large file: {file['name']}")
continue
# Get content summary (first 2000 characters)
chunk = await client.get_content_chunk(
file['id'],
startChar=0,
endChar=2000
)
results.append({
'name': file['name'],
'id': file['id'],
'size': metadata.get('size', 0),
'modified': metadata.get('modifiedTime'),
'summary': chunk.get('content', '')[:500] + "..."
})
print(f"Processed: {file['name']}")
except Exception as e:
print(f"Error processing {file['name']}: {e}")
# Save results
with open('batch_results.json', 'w') as f:
json.dump(results, f, indent=2)
print(f"Batch processing complete. Results saved to batch_results.json")
except Exception as error:
print(f"Batch processing error: {error}")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "batch":
asyncio.run(batch_process_example())
else:
asyncio.run(example_usage())