Skip to main content
Glama
api.py39 kB
"""Google Forms API wrapper. Provides high-level interface to Google Forms and Drive APIs with error handling. """ import csv import io import copy import time import logging from typing import Dict, List, Optional, Any from googleapiclient.errors import HttpError from ..core.base import BaseAPI logger = logging.getLogger("gtools") class FormsAPI(BaseAPI): """Google Forms API client with comprehensive form management capabilities.""" API_NAME = "forms" API_VERSION = "v1" def _sync_drive_name(self, form_id: str, title: str) -> None: """Sync Drive document name with form title. Google Forms API sets info.title but NOT the Drive document name. This method ensures both are synchronized. """ try: self.drive_service.files().update( fileId=form_id, body={'name': title} ).execute() except HttpError as e: logger.warning(f"Failed to sync Drive name: {e}") def create_form(self, title: str, description: str = "") -> Dict[str, Any]: """Create and publish a new form. Args: title: Form title description: Form description (optional) Returns: Dict with formId, responderUri, and editUri Raises: HttpError: If form creation fails """ try: # Create form with ONLY title (API requirement) form = { "info": { "title": title } } result = self.service.forms().create(body=form).execute() form_id = result['formId'] # Add description and settings via batchUpdate if provided if description: requests = [] # Add description update requests.append({ "updateFormInfo": { "info": { "description": description }, "updateMask": "description" } }) # Add default quiz settings (required by API) requests.append({ "updateSettings": { "settings": { "quizSettings": { "isQuiz": False # Default to non-quiz form } }, "updateMask": "quizSettings.isQuiz" } }) update = {"requests": requests} self.service.forms().batchUpdate(formId=form_id, body=update).execute() # Sync Drive document name with form title self._sync_drive_name(form_id, title) return { "formId": form_id, "responderUri": result.get('responderUri'), "editUri": f"https://docs.google.com/forms/d/{form_id}/edit" } except HttpError as e: raise Exception(f"Failed to create form: {e}") def list_forms(self, page_size: int = 50, page_token: Optional[str] = None) -> Dict[str, Any]: """List all forms owned by the user via Drive API. Args: page_size: Maximum number of forms to return page_token: Token for pagination (optional) Returns: Dict with 'forms' list and optional 'nextPageToken' Raises: HttpError: If listing fails """ try: query = "mimeType='application/vnd.google-apps.form'" params = { 'q': query, 'pageSize': page_size, 'fields': 'files(id,name,webViewLink),nextPageToken' } if page_token: params['pageToken'] = page_token results = self.drive_service.files().list(**params).execute() files = results.get('files', []) # Get response counts for each form forms = [] for file in files: form_id = file['id'] try: form = self.service.forms().get(formId=form_id).execute() responses = self.service.forms().responses().list(formId=form_id).execute() response_count = len(responses.get('responses', [])) except Exception: response_count = 0 forms.append({ "formId": form_id, "title": file['name'], "responderUri": form.get('responderUri', ''), "responseCount": response_count }) result = {"forms": forms} if 'nextPageToken' in results: result['nextPageToken'] = results['nextPageToken'] return result except HttpError as e: raise Exception(f"Failed to list forms: {e}") def get_form(self, form_id: str) -> Dict[str, Any]: """Get complete form details. Args: form_id: The form ID Returns: Complete Form object Raises: HttpError: If form retrieval fails """ try: return self.service.forms().get(formId=form_id).execute() except HttpError as e: raise Exception(f"Failed to get form: {e}") def update_form(self, form_id: str, title: Optional[str] = None, description: Optional[str] = None) -> Dict[str, Any]: """Update form title and/or description. Args: form_id: The form ID title: New title (optional) description: New description (optional) Returns: Updated Form object Raises: HttpError: If update fails """ try: requests = [] if title is not None: requests.append({ "updateFormInfo": { "info": { "title": title }, "updateMask": "title" } }) if description is not None: requests.append({ "updateFormInfo": { "info": { "description": description }, "updateMask": "description" } }) if not requests: raise ValueError("Must provide either title or description to update") body = {"requests": requests} self.service.forms().batchUpdate(formId=form_id, body=body).execute() # Sync Drive document name when title changes if title is not None: self._sync_drive_name(form_id, title) return self.get_form(form_id) except HttpError as e: raise Exception(f"Failed to update form: {e}") def delete_form(self, form_id: str) -> Dict[str, bool]: """Delete a form via Drive API. Args: form_id: The form ID Returns: Dict with 'deleted': True Raises: HttpError: If deletion fails """ try: self.drive_service.files().delete(fileId=form_id).execute() return {"deleted": True} except HttpError as e: raise Exception(f"Failed to delete form: {e}") def _get_item_index(self, form_id: str, item_id: str) -> int: """Helper method to get an item's index by its itemId. Args: form_id: The form ID item_id: The item ID to find Returns: The index of the item Raises: ValueError: If item not found """ form = self.get_form(form_id) items = form.get('items', []) for index, item in enumerate(items): if item.get('itemId') == item_id: return index raise ValueError(f"Item with ID '{item_id}' not found in form") def add_question(self, form_id: str, question_type: str, title: str, **kwargs) -> Dict[str, Any]: """Add a question to a form. Supports all 12 Google Forms question types with appropriate parameters. Args: form_id: The form ID question_type: Type of question (SHORT_ANSWER, PARAGRAPH, MULTIPLE_CHOICE, etc.) title: Question text **kwargs: Additional parameters based on question type Returns: Dict with created item details Raises: HttpError: If question creation fails """ try: location = {"index": kwargs.get('position', 0)} # Build question object with correct structure question = { "required": kwargs.get('required', False) } # Build question type-specific fields if question_type == "SHORT_ANSWER": question["textQuestion"] = {"paragraph": False} elif question_type == "PARAGRAPH": question["textQuestion"] = {"paragraph": True} elif question_type == "MULTIPLE_CHOICE": options = kwargs.get('options', []) question["choiceQuestion"] = { "type": "RADIO", "options": [{"value": opt} for opt in options] } elif question_type == "CHECKBOXES": options = kwargs.get('options', []) question["choiceQuestion"] = { "type": "CHECKBOX", "options": [{"value": opt} for opt in options] } elif question_type == "DROPDOWN": options = kwargs.get('options', []) question["choiceQuestion"] = { "type": "DROP_DOWN", "options": [{"value": opt} for opt in options] } elif question_type == "LINEAR_SCALE": question["scaleQuestion"] = { "low": kwargs.get('low', 1), "high": kwargs.get('high', 5), "lowLabel": kwargs.get('lowLabel', ''), "highLabel": kwargs.get('highLabel', '') } elif question_type == "DATE": question["dateQuestion"] = { "includeTime": False, "includeYear": True } elif question_type == "TIME": question["timeQuestion"] = {"duration": False} elif question_type == "FILE_UPLOAD": question["fileUploadQuestion"] = { "folderId": kwargs.get('folderId', ''), "maxFiles": kwargs.get('maxFiles', 1), "maxFileSize": kwargs.get('maxFileSize', 10485760), "types": kwargs.get('allowedTypes', []) } elif question_type == "MULTIPLE_CHOICE_GRID": rows = kwargs.get('rows', []) columns = kwargs.get('columns', []) question["questionGroupItem"] = { "questions": [{"rowQuestion": {"title": row}} for row in rows], "grid": { "columns": { "type": "RADIO", "options": [{"value": col} for col in columns] } } } elif question_type == "CHECKBOX_GRID": rows = kwargs.get('rows', []) columns = kwargs.get('columns', []) question["questionGroupItem"] = { "questions": [{"rowQuestion": {"title": row}} for row in rows], "grid": { "columns": { "type": "CHECKBOX", "options": [{"value": col} for col in columns] } } } elif question_type == "RATING": question["scaleQuestion"] = { "low": 1, "high": kwargs.get('high', 5), "lowLabel": '', "highLabel": '' } else: raise ValueError(f"Unsupported question type: {question_type}") # Correct API structure: title at item level, question details nested request = { "requests": [{ "createItem": { "item": { "title": title, "questionItem": { "question": question } }, "location": location } }] } result = self.service.forms().batchUpdate(formId=form_id, body=request).execute() return result except HttpError as e: raise Exception(f"Failed to add question: {e}") def update_item(self, form_id: str, item_id: str, **kwargs) -> Dict[str, Any]: """Update an existing item (question, section, etc.). The Google Forms API requires the item type structure to be included in update requests, even when only updating the title. Args: form_id: The form ID item_id: The item ID to update **kwargs: Fields to update (title, description, required, etc.) Returns: Result of batch update Raises: HttpError: If update fails """ try: # Get current form to find item structure form = self.get_form(form_id) items = form.get('items', []) # Find item by ID item_index = None original_item = None for idx, item in enumerate(items): if item.get('itemId') == item_id: item_index = idx original_item = item break if item_index is None: raise ValueError(f"Item with ID '{item_id}' not found in form") # Build update item - must include item type structure update_item = copy.deepcopy(original_item) update_item.pop('itemId', None) # Remove read-only field # Remove questionId from nested structures if 'questionItem' in update_item: if 'question' in update_item['questionItem']: update_item['questionItem']['question'].pop('questionId', None) if 'questionGroupItem' in update_item: if 'questions' in update_item['questionGroupItem']: for q in update_item['questionGroupItem']['questions']: q.pop('questionId', None) update_mask = [] if 'title' in kwargs: update_item['title'] = kwargs['title'] update_mask.append('title') if 'description' in kwargs: update_item['description'] = kwargs['description'] update_mask.append('description') if 'required' in kwargs and 'questionItem' in update_item: update_item['questionItem']['question']['required'] = kwargs['required'] update_mask.append('questionItem.question.required') if not update_mask: raise ValueError("No fields to update") request = { "requests": [{ "updateItem": { "item": update_item, "location": {"index": item_index}, "updateMask": ','.join(update_mask) } }] } return self.service.forms().batchUpdate(formId=form_id, body=request).execute() except HttpError as e: raise Exception(f"Failed to update item: {e}") def update_question(self, form_id: str, item_id: str, **kwargs) -> Dict[str, Any]: """Update an existing question. Alias for update_item for backwards compatibility.""" return self.update_item(form_id, item_id, **kwargs) def delete_question(self, form_id: str, item_id: str) -> Dict[str, Any]: """Delete a question from a form. Args: form_id: The form ID item_id: The item ID to delete Returns: Result of batch update Raises: HttpError: If deletion fails """ try: # Convert itemId to index item_index = self._get_item_index(form_id, item_id) request = { "requests": [{ "deleteItem": { "location": {"index": item_index} } }] } return self.service.forms().batchUpdate(formId=form_id, body=request).execute() except HttpError as e: raise Exception(f"Failed to delete question: {e}") def move_question(self, form_id: str, item_id: str, new_position: int) -> Dict[str, Any]: """Move a question to a new position. Args: form_id: The form ID item_id: The item ID to move new_position: New position index Returns: Result of batch update Raises: HttpError: If move fails """ try: # Convert itemId to index item_index = self._get_item_index(form_id, item_id) request = { "requests": [{ "moveItem": { "originalLocation": {"index": item_index}, "newLocation": {"index": new_position} } }] } return self.service.forms().batchUpdate(formId=form_id, body=request).execute() except HttpError as e: raise Exception(f"Failed to move question: {e}") def add_section(self, form_id: str, title: str, description: str = "", position: Optional[int] = None) -> Dict[str, Any]: """Add a section break (page break) to the form. Args: form_id: The form ID title: Section title description: Section description (optional) position: Position index (optional, defaults to end of form) Returns: Result of batch update Raises: HttpError: If section creation fails """ try: # If position not specified, add to end of form if position is None: form = self.get_form(form_id) position = len(form.get('items', [])) request = { "requests": [{ "createItem": { "item": { "title": title, "description": description, "pageBreakItem": {} }, "location": {"index": position} } }] } return self.service.forms().batchUpdate(formId=form_id, body=request).execute() except HttpError as e: raise Exception(f"Failed to add section: {e}") def list_responses(self, form_id: str, page_size: int = 100, page_token: Optional[str] = None) -> Dict[str, Any]: """List all responses for a form. Args: form_id: The form ID page_size: Maximum responses to return page_token: Pagination token (optional) Returns: Dict with 'responses' list and optional 'nextPageToken' Raises: HttpError: If listing fails """ try: params = {'pageSize': page_size} if page_token: params['pageToken'] = page_token result = self.service.forms().responses().list(formId=form_id, **params).execute() return result except HttpError as e: raise Exception(f"Failed to list responses: {e}") def get_response(self, form_id: str, response_id: str) -> Dict[str, Any]: """Get a specific response. Args: form_id: The form ID response_id: The response ID Returns: Complete FormResponse object Raises: HttpError: If retrieval fails """ try: return self.service.forms().responses().get(formId=form_id, responseId=response_id).execute() except HttpError as e: raise Exception(f"Failed to get response: {e}") def export_responses_csv(self, form_id: str, include_timestamps: bool = True, include_email: bool = True) -> Dict[str, Any]: """Export responses to CSV format. Args: form_id: The form ID include_timestamps: Include submission timestamps include_email: Include respondent emails Returns: Dict with 'csv' string and 'rowCount' Raises: HttpError: If export fails """ try: # Get form structure form = self.get_form(form_id) responses_data = self.list_responses(form_id, page_size=1000) responses = responses_data.get('responses', []) if not responses: return {"csv": "", "rowCount": 0} # Build CSV output = io.StringIO() writer = csv.writer(output) # Build header header = [] if include_timestamps: header.append('Timestamp') if include_email: header.append('Email') # Extract question titles using questionId (NOT itemId) # Response answers use questionId, not itemId items = form.get('items', []) question_map = {} for item in items: if 'questionItem' in item: # Regular question question_id = item['questionItem'].get('question', {}).get('questionId') if question_id: question_map[question_id] = item.get('title', f'Question {question_id}') header.append(question_map[question_id]) elif 'questionGroupItem' in item: # Grid question - has multiple sub-questions grid_title = item.get('title', 'Grid') for q in item['questionGroupItem'].get('questions', []): question_id = q.get('questionId') row_title = q.get('rowQuestion', {}).get('title', '') if question_id: full_title = f"{grid_title} - {row_title}" if row_title else grid_title question_map[question_id] = full_title header.append(full_title) writer.writerow(header) # Write responses for response in responses: row = [] if include_timestamps: row.append(response.get('createTime', '')) if include_email: row.append(response.get('respondentEmail', '')) answers = response.get('answers', {}) for question_id in question_map.keys(): if question_id in answers: answer = answers[question_id] if 'textAnswers' in answer: # Text-based answers (SHORT_ANSWER, PARAGRAPH, CHOICE, SCALE, DATE, TIME, GRID) text_values = [a.get('value', '') for a in answer['textAnswers'].get('answers', [])] row.append('; '.join(text_values)) elif 'fileUploadAnswers' in answer: # File upload answers - export as "filename (Drive ID)" files = answer['fileUploadAnswers'].get('answers', []) file_info = [f"{f.get('fileName', 'file')} ({f.get('fileId', '')})" for f in files] row.append('; '.join(file_info)) else: row.append('') else: row.append('') writer.writerow(row) csv_content = output.getvalue() output.close() return { "csv": csv_content, "rowCount": len(responses) } except HttpError as e: raise Exception(f"Failed to export responses: {e}") def duplicate_form(self, form_id: str, new_title: str, use_batch: bool = True, chunk_size: int = 100) -> Dict[str, Any]: """Duplicate an existing form with optional batch optimization. Args: form_id: The form ID to copy new_title: Title for the new form use_batch: Use optimized batch API (default True, 87-94% faster) chunk_size: Items per batch for large forms (default 100) Returns: Dict with newFormId, responderUri, copiedItems count, and method used Raises: HttpError: If duplication fails """ if use_batch: return self._duplicate_form_batch(form_id, new_title, chunk_size) else: return self._duplicate_form_legacy(form_id, new_title) def _duplicate_form_batch(self, form_id: str, new_title: str, chunk_size: int = 100) -> Dict[str, Any]: """Duplicate a form using optimized batch API (87-94% faster).""" start_time = time.time() try: # Step 1: Get original form structure (1 API call) original = self.get_form(form_id) items = original.get('items', []) # Step 2: Create new empty form (1 API call) description = original.get('info', {}).get('description', '') new_form = self.create_form(new_title, description) new_form_id = new_form['formId'] # Step 3: Build batch request with settings + all items batch_requests = [] # Add settings update if exists if 'settings' in original: settings = original['settings'].copy() if 'quizSettings' not in settings: settings['quizSettings'] = {'isQuiz': False} batch_requests.append({ "updateSettings": { "settings": settings, "updateMask": "*" } }) # Add all item creation requests for i, original_item in enumerate(items): clean_item = self._clean_item_for_duplication(original_item) batch_requests.append({ "createItem": { "item": clean_item, "location": {"index": i} } }) # Step 4: Execute batch operations api_calls = 2 # get_form + create_form total_copied = 0 if batch_requests: if len(batch_requests) <= chunk_size: # Single batch execution try: self.service.forms().batchUpdate( formId=new_form_id, body={"requests": batch_requests} ).execute() api_calls += 1 total_copied = len(items) except HttpError as e: raise Exception(f"Failed to batch duplicate form: {e}") else: # Chunked batch execution chunks = [] settings_chunk = [] if 'settings' in original: settings_chunk = [batch_requests[0]] item_requests = batch_requests[1:] else: item_requests = batch_requests for i in range(0, len(item_requests), chunk_size): chunk = item_requests[i:i + chunk_size] if i == 0 and settings_chunk: chunk = settings_chunk + chunk chunks.append(chunk) for chunk_idx, chunk in enumerate(chunks): try: self.service.forms().batchUpdate( formId=new_form_id, body={"requests": chunk} ).execute() api_calls += 1 items_in_chunk = len([r for r in chunk if 'createItem' in r]) total_copied += items_in_chunk except HttpError as e: raise Exception(f"Failed to duplicate chunk {chunk_idx + 1}: {e}") elapsed_time = time.time() - start_time return { "newFormId": new_form_id, "responderUri": new_form['responderUri'], "editUri": f"https://docs.google.com/forms/d/{new_form_id}/edit", "copiedItems": total_copied, "totalItems": len(items), "apiCalls": api_calls, "executionTime": f"{elapsed_time:.2f}s", "method": "batch", "chunked": len(batch_requests) > chunk_size } except HttpError as e: raise Exception(f"Failed to duplicate form (batch method): {e}") def _duplicate_form_legacy(self, form_id: str, new_title: str) -> Dict[str, Any]: """Duplicate a form using legacy item-by-item method.""" start_time = time.time() try: # Step 1: Get original form structure original = self.get_form(form_id) # Step 2: Create new empty form description = original.get('info', {}).get('description', '') new_form = self.create_form(new_title, description) new_form_id = new_form['formId'] api_calls = 2 # Step 3: Copy settings if 'settings' in original: settings = original['settings'].copy() if 'quizSettings' not in settings: settings['quizSettings'] = {'isQuiz': False} settings_request = { "requests": [{ "updateSettings": { "settings": settings, "updateMask": "*" } }] } self.service.forms().batchUpdate( formId=new_form_id, body=settings_request ).execute() api_calls += 1 # Step 4: Copy all items items = original.get('items', []) copied_count = 0 for i, original_item in enumerate(items): clean_item = self._clean_item_for_duplication(original_item) request = { "requests": [{ "createItem": { "item": clean_item, "location": {"index": i} } }] } try: self.service.forms().batchUpdate( formId=new_form_id, body=request ).execute() api_calls += 1 copied_count += 1 except HttpError as item_error: logger.warning(f"Failed to copy item {i}: {item_error}") continue elapsed_time = time.time() - start_time return { "newFormId": new_form_id, "responderUri": new_form['responderUri'], "editUri": f"https://docs.google.com/forms/d/{new_form_id}/edit", "copiedItems": copied_count, "totalItems": len(items), "apiCalls": api_calls, "executionTime": f"{elapsed_time:.2f}s", "method": "legacy" } except HttpError as e: raise Exception(f"Failed to duplicate form: {e}") def _clean_item_for_duplication(self, original_item: Dict[str, Any]) -> Dict[str, Any]: """Remove IDs and clean invalid characters from item for safe duplication.""" clean_item = {} # Copy basic fields and clean newlines if 'title' in original_item: clean_item['title'] = original_item['title'].replace('\n', ' ').replace('\r', '') if 'description' in original_item: clean_item['description'] = original_item['description'].replace('\n', ' ').replace('\r', '') # Copy item type if 'questionItem' in original_item: question = copy.deepcopy(original_item['questionItem']['question']) question.pop('questionId', None) # Clean newlines from scale labels if 'scaleQuestion' in question: scale = question['scaleQuestion'] if 'lowLabel' in scale: scale['lowLabel'] = scale['lowLabel'].replace('\n', ' ').replace('\r', '') if 'highLabel' in scale: scale['highLabel'] = scale['highLabel'].replace('\n', ' ').replace('\r', '') # Clean newlines from choice options if 'choiceQuestion' in question: if 'options' in question['choiceQuestion']: for option in question['choiceQuestion']['options']: if 'value' in option: option['value'] = option['value'].replace('\n', ' ').replace('\r', '') clean_item['questionItem'] = { 'question': question } if 'image' in original_item['questionItem']: clean_item['questionItem']['image'] = original_item['questionItem']['image'] elif 'questionGroupItem' in original_item: group = copy.deepcopy(original_item['questionGroupItem']) if 'questions' in group: for q in group['questions']: q.pop('questionId', None) if 'grid' in group and 'columns' in group['grid']: if 'options' in group['grid']['columns']: for option in group['grid']['columns']['options']: if 'value' in option: option['value'] = option['value'].replace('\n', ' ').replace('\r', '') clean_item['questionGroupItem'] = group elif 'pageBreakItem' in original_item: clean_item['pageBreakItem'] = original_item['pageBreakItem'] elif 'textItem' in original_item: clean_item['textItem'] = original_item['textItem'] elif 'imageItem' in original_item: clean_item['imageItem'] = original_item['imageItem'] elif 'videoItem' in original_item: clean_item['videoItem'] = original_item['videoItem'] return clean_item def personalize_form(self, form_id: str, replacements: Dict[str, str]) -> Dict[str, Any]: """Personalize a form by replacing placeholders in titles and descriptions. Replaces placeholder text in form title, description, and all item titles/descriptions. Args: form_id: The form ID to personalize replacements: Dict of placeholder -> replacement text (e.g. {"NAME": "John"}) Returns: Dict with form_id, items_updated count, and details Raises: HttpError: If personalization fails """ try: # Get current form form = self.get_form(form_id) items_updated = 0 updates_made = [] # Update form title and description form_title = form.get('info', {}).get('title', '') form_desc = form.get('info', {}).get('description', '') new_title = form_title new_desc = form_desc for placeholder, replacement in replacements.items(): new_title = new_title.replace(placeholder, replacement) new_desc = new_desc.replace(placeholder, replacement) if new_title != form_title or new_desc != form_desc: self.update_form(form_id, title=new_title, description=new_desc) updates_made.append(f"Form title/description updated") # Update all items for item in form.get('items', []): item_id = item.get('itemId') title = item.get('title', '') description = item.get('description', '') new_item_title = title new_item_desc = description for placeholder, replacement in replacements.items(): new_item_title = new_item_title.replace(placeholder, replacement) new_item_desc = new_item_desc.replace(placeholder, replacement) # Clean newlines (Google API limitation) new_item_title = new_item_title.replace('\n', ' ').replace('\r', '').strip() new_item_desc = new_item_desc.replace('\n', ' ').replace('\r', '').strip() if new_item_title != title.replace('\n', ' ').replace('\r', '').strip() or \ new_item_desc != description.replace('\n', ' ').replace('\r', '').strip(): try: update_kwargs = {'title': new_item_title} if description: update_kwargs['description'] = new_item_desc self.update_item(form_id, item_id, **update_kwargs) items_updated += 1 updates_made.append(f"Updated: {new_item_title[:40]}...") except Exception as e: logger.warning(f"Failed to update item {item_id}: {e}") return { "formId": form_id, "itemsUpdated": items_updated, "totalItems": len(form.get('items', [])), "updates": updates_made } except HttpError as e: raise Exception(f"Failed to personalize form: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/maksdizzy/google-forms-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server