Skip to main content
Glama

Google Workspace MCP Connector

by bd01010
connector_hybrid.py20.3 kB
""" ChatGPT Google Workspace MCP Connector - Hybrid Version Includes both required search/fetch tools AND Google Workspace tools """ import os, json, asyncio, typing, httpx from uuid import uuid4 from fastapi import FastAPI, Request, Depends, HTTPException, Form from fastapi.responses import StreamingResponse, RedirectResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.sessions import SessionMiddleware from google_auth_oauthlib.flow import Flow from google.oauth2.credentials import Credentials from googleapiclient.discovery import build # ─── Configuration ──────────────────────────────────────────────────────── GOOGLE_CLIENT_ID = "72500811727-amrpcqfmfqc6jd9q0qq64lobrbt7mm7s.apps.googleusercontent.com" GOOGLE_CLIENT_SECRET = "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" BASE_URL = "https://chatgpt-mcp-connector.vercel.app" SESSION_SECRET = "1234" REDIRECT_URI = f"{BASE_URL}/oauth/callback" SCOPES = [ "https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/documents", "https://www.googleapis.com/auth/presentations", "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive.file", ] # ─── FastAPI setup ─────────────────────────────────────────────────────── app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) app.add_middleware(SessionMiddleware, secret_key=SESSION_SECRET) # ─── Google OAuth flow helper ──────────────────────────────────────────── def google_flow() -> Flow: return Flow.from_client_config( { "web": { "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [REDIRECT_URI], } }, scopes=SCOPES, redirect_uri=REDIRECT_URI, ) # ─── Health route ───────────────────────────────────────────────────────── @app.get("/") def health(): return {"status": "ok"} # ─── Drive helpers ──────────────────────────────────────────────────────── def _creds(req: Request) -> Credentials: if "creds" not in req.session: raise HTTPException(401, "missing_credentials") return Credentials.from_authorized_user_info(req.session["creds"]) def _drive(c: Credentials): return build("drive", "v3", credentials=c, cache_discovery=False) def _docs(c: Credentials): return build("docs", "v1", credentials=c, cache_discovery=False) def _sheets(c: Credentials): return build("sheets", "v4", credentials=c, cache_discovery=False) def _slides(c: Credentials): return build("slides", "v1", credentials=c, cache_discovery=False) # ─── Required search/fetch tools for ChatGPT ───────────────────────────── def search_fn(creds: Credentials, query: str): """Required search tool that returns document IDs""" try: # Search Google Drive files results = _drive(creds).files().list( pageSize=10, q=f"name contains '{query}' or fullText contains '{query}'", fields="files(id,name,mimeType,modifiedTime)" ).execute() files = results.get('files', []) # Return in the required format return {"ids": [f["id"] for f in files]} except: return {"ids": []} def fetch_fn(creds: Credentials, id: str): """Required fetch tool that returns document details""" try: # Get file metadata file = _drive(creds).files().get(fileId=id, fields="id,name,mimeType,modifiedTime,webViewLink").execute() # Try to get content for text files content = "" if file.get('mimeType', '').startswith('text/'): try: content = _drive(creds).files().get_media(fileId=id).execute() content = content.decode('utf-8') if isinstance(content, bytes) else str(content) except: content = "Content not available" # Return in the required format return { "id": file['id'], "title": file['name'], "text": content or f"File: {file['name']} (Type: {file.get('mimeType', 'unknown')})", "metadata": { "mimeType": file.get('mimeType'), "modifiedTime": file.get('modifiedTime'), "url": file.get('webViewLink', '') } } except: return {"error": f"Document not found: {id}"} # ─── Google Workspace specific tools ────────────────────────────────────── def drive_list_fn(creds: Credentials): return _drive(creds).files().list( pageSize=20, fields="files(id,name,mimeType,modifiedTime)" ).execute()["files"] def drive_create_folder_fn(creds: Credentials, name: str = "New Folder"): meta = {"name": name, "mimeType": "application/vnd.google-apps.folder"} return _drive(creds).files().create(body=meta, fields="id,name").execute() def drive_create_file_fn(creds: Credentials, name: str, content: str = "", mimeType: str = "text/plain"): import io from googleapiclient.http import MediaIoBaseUpload file_metadata = {"name": name} media = MediaIoBaseUpload(io.BytesIO(content.encode()), mimetype=mimeType) return _drive(creds).files().create(body=file_metadata, media_body=media, fields="id,name").execute() def drive_read_file_fn(creds: Credentials, fileId: str): try: content = _drive(creds).files().get_media(fileId=fileId).execute() return {"content": content.decode('utf-8') if isinstance(content, bytes) else str(content)} except: return {"error": "Could not read file"} def drive_update_file_fn(creds: Credentials, fileId: str, content: str): import io from googleapiclient.http import MediaIoBaseUpload media = MediaIoBaseUpload(io.BytesIO(content.encode()), mimetype="text/plain") return _drive(creds).files().update(fileId=fileId, media_body=media, fields="id,name,modifiedTime").execute() def drive_delete_file_fn(creds: Credentials, fileId: str): _drive(creds).files().delete(fileId=fileId).execute() return {"deleted": True, "fileId": fileId} def docs_create_fn(creds: Credentials, title: str = "New Document"): body = {"title": title} return _docs(creds).documents().create(body=body).execute() def sheets_create_fn(creds: Credentials, title: str = "New Spreadsheet"): body = {"properties": {"title": title}} return _sheets(creds).spreadsheets().create(body=body).execute() def slides_create_fn(creds: Credentials, title: str = "New Presentation"): body = {"title": title} return _slides(creds).presentations().create(body=body).execute() # ─── Tools registry ─────────────────────────────────────────────────────── TOOLS_REGISTRY = { # Required tools for ChatGPT "search": { "func": search_fn, "parameters": {"query": "string"}, }, "fetch": { "func": fetch_fn, "parameters": {"id": "string"}, }, # Google Workspace tools "drive_list": { "func": drive_list_fn, "parameters": {}, }, "drive_create_folder": { "func": drive_create_folder_fn, "parameters": {"name": "string"}, }, "drive_create_file": { "func": drive_create_file_fn, "parameters": {"name": "string", "content": "string", "mimeType": "string"}, }, "drive_read_file": { "func": drive_read_file_fn, "parameters": {"fileId": "string"}, }, "drive_update_file": { "func": drive_update_file_fn, "parameters": {"fileId": "string", "content": "string"}, }, "drive_delete_file": { "func": drive_delete_file_fn, "parameters": {"fileId": "string"}, }, "docs_create": { "func": docs_create_fn, "parameters": {"title": "string"}, }, "sheets_create": { "func": sheets_create_fn, "parameters": {"title": "string"}, }, "slides_create": { "func": slides_create_fn, "parameters": {"title": "string"}, }, } # ─── SSE first frame (tools list) ──────────────────────────────────────── TOOLS_EVENT = ( "event: tools\n" + "data: " + json.dumps({ "tools": [ # Required tools for ChatGPT { "name": "search", "description": "Search for documents matching the query", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search terms"} }, "required": ["query"] } }, { "name": "fetch", "description": "Fetch a document by ID", "inputSchema": { "type": "object", "properties": { "id": {"type": "string", "description": "Document ID"} }, "required": ["id"] } }, # Google Workspace tools { "name": "drive_list", "description": "List files in Google Drive", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "drive_create_folder", "description": "Create a folder in Google Drive", "inputSchema": { "type": "object", "properties": { "name": {"type": "string", "description": "Folder name"} }, "required": [] } }, { "name": "drive_create_file", "description": "Create a file in Google Drive", "inputSchema": { "type": "object", "properties": { "name": {"type": "string", "description": "File name"}, "content": {"type": "string", "description": "File content"}, "mimeType": {"type": "string", "description": "MIME type"} }, "required": ["name"] } }, { "name": "drive_read_file", "description": "Read a file from Google Drive", "inputSchema": { "type": "object", "properties": { "fileId": {"type": "string", "description": "File ID"} }, "required": ["fileId"] } }, { "name": "drive_update_file", "description": "Update a file in Google Drive", "inputSchema": { "type": "object", "properties": { "fileId": {"type": "string", "description": "File ID"}, "content": {"type": "string", "description": "New content"} }, "required": ["fileId", "content"] } }, { "name": "drive_delete_file", "description": "Delete a file from Google Drive", "inputSchema": { "type": "object", "properties": { "fileId": {"type": "string", "description": "File ID"} }, "required": ["fileId"] } }, { "name": "docs_create", "description": "Create a new Google Doc", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Document title"} }, "required": [] } }, { "name": "sheets_create", "description": "Create a new Google Sheet", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Spreadsheet title"} }, "required": [] } }, { "name": "slides_create", "description": "Create a new Google Slides presentation", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Presentation title"} }, "required": [] } } ] }) + "\n\n" ) # ─── SSE GET handler ───────────────────────────────────────────────────── @app.get("/sse") async def sse(req: Request): async def gen(): yield TOOLS_EVENT while not await req.is_disconnected(): yield "event: ping\ndata: {}\n\n" await asyncio.sleep(25) return StreamingResponse(gen(), media_type="text/event-stream") @app.get("/sse/") async def sse_slash(req: Request): async def gen(): yield TOOLS_EVENT while not await req.is_disconnected(): yield "event: ping\ndata: {}\n\n" await asyncio.sleep(25) return StreamingResponse(gen(), media_type="text/event-stream") # ─── Streamable‑HTTP POST handler ──────────────────────────────────────── @app.post("/sse") async def sse_invoke(req: Request): payload = await req.json() call_id = payload.get("id") tool = payload.get("tool", payload.get("name")) # Support both formats args = payload.get("args", payload.get("arguments", {})) if tool not in TOOLS_REGISTRY: raise HTTPException(400, f"Unknown tool: {tool}") creds = _creds(req) try: # Handle different tool signatures if tool in ["search", "fetch"]: # These tools expect specific parameters if tool == "search": result = TOOLS_REGISTRY[tool]["func"](creds, query=args.get("query", "")) else: # fetch result = TOOLS_REGISTRY[tool]["func"](creds, id=args.get("id", "")) elif tool == "drive_list": result = TOOLS_REGISTRY[tool]["func"](creds) else: result = TOOLS_REGISTRY[tool]["func"](creds, **args) except Exception as e: result = {"error": str(e)} response_body = {"id": call_id, "result": result} return response_body @app.post("/sse/") async def sse_invoke_slash(req: Request): payload = await req.json() call_id = payload.get("id") tool = payload.get("tool", payload.get("name")) # Support both formats args = payload.get("args", payload.get("arguments", {})) if tool not in TOOLS_REGISTRY: raise HTTPException(400, f"Unknown tool: {tool}") creds = _creds(req) try: # Handle different tool signatures if tool in ["search", "fetch"]: # These tools expect specific parameters if tool == "search": result = TOOLS_REGISTRY[tool]["func"](creds, query=args.get("query", "")) else: # fetch result = TOOLS_REGISTRY[tool]["func"](creds, id=args.get("id", "")) elif tool == "drive_list": result = TOOLS_REGISTRY[tool]["func"](creds) else: result = TOOLS_REGISTRY[tool]["func"](creds, **args) except Exception as e: result = {"error": str(e)} response_body = {"id": call_id, "result": result} return response_body # ─── OAuth endpoints ───────────────────────────────────────────────────── @app.get("/oauth/authorize") def oauth_authorize(): url, _ = google_flow().authorization_url(access_type="offline", prompt="consent", state=uuid4().hex) return RedirectResponse(url, status_code=302) @app.get("/oauth/callback") def oauth_callback(req: Request, code: str): flow = google_flow() flow.fetch_token(code=code) req.session["creds"] = json.loads(flow.credentials.to_json()) return JSONResponse({"ok": True}) @app.post("/oauth/token") def oauth_token(grant_type: str = Form(...), code: str | None = Form(None), refresh_token: str | None = Form(None)): if grant_type == "authorization_code": flow = google_flow() flow.fetch_token(code=code) creds = flow.credentials elif grant_type == "refresh_token": resp = httpx.post( "https://oauth2.googleapis.com/token", data={ "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "refresh_token": refresh_token, "grant_type": "refresh_token", }, ).json() if "access_token" not in resp: raise HTTPException(400, resp.get("error", "refresh_failed")) creds = Credentials(token=resp["access_token"], refresh_token=refresh_token, client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, token_uri="https://oauth2.googleapis.com/token") else: raise HTTPException(400, "unsupported grant_type") return { "access_token": creds.token, "refresh_token": creds.refresh_token, "token_type": "Bearer", "expires_in": 3600, } # ─── Discovery docs ────────────────────────────────────────────────────── @app.get("/.well-known/oauth-authorization-server") def disc_auth(req: Request): base = f"{req.url.scheme}://{req.url.netloc}" return { "issuer": base, "authorization_endpoint": f"{base}/oauth/authorize", "token_endpoint": f"{base}/oauth/token", "response_types_supported": ["code"], "grant_types_supported": ["authorization_code", "refresh_token"], "token_endpoint_auth_methods_supported": ["client_secret_post"], "code_challenge_methods_supported": ["S256"], } @app.get("/.well-known/mcp.json") def well_known_mcp(): return { "mcpServers": {"gptdrive": {"url": f"{BASE_URL}/sse"}}, "schemaVersion": 1, } @app.get("/mcp/manifest.json") def manifest(): return { "id": "gptdrive", "name": "Google Workspace", "version": "1.0.0", "description": "Access Google Drive, Docs, Sheets, and Slides", "mcp_server_url": f"{BASE_URL}/sse", "capabilities": {"tools": {}, "prompts": {}, "resources": {}}, "jsonrpc": "2.0", } @app.get("/openapi.json") def openapi(): return {"openapi": "3.0.0", "info": {"title": "Google Workspace MCP", "version": "1.0.0"}}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bd01010/chatgpt-workspace-connector'

If you have feedback or need assistance with the MCP directory API, please join our Discord server