publish_draft
Publishes a saved draft tweet or thread to X (Twitter) using its unique draft ID.
Instructions
Publish a draft tweet or thread
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| draft_id | Yes | ID of the draft to publish |
Implementation Reference
- src/x_mcp/server.py:142-154 (registration)Registration of the 'publish_draft' tool in the MCP server's list_tools() method. Defines the tool name, description, and input schema which requires a 'draft_id' string.name="publish_draft", description="Publish a draft tweet or thread", inputSchema={ "type": "object", "properties": { "draft_id": { "type": "string", "description": "ID of the draft to publish", }, }, "required": ["draft_id"], }, ),
- src/x_mcp/server.py:144-153 (schema)Input schema for the publish_draft tool: an object requiring 'draft_id' as a string.inputSchema={ "type": "object", "properties": { "draft_id": { "type": "string", "description": "ID of the draft to publish", }, }, "required": ["draft_id"], },
- src/x_mcp/server.py:659-818 (handler)Main handler implementation for publish_draft tool. Loads draft JSON file, handles various draft types (single tweet, reply, quote tweet, media tweet, thread), publishes via Tweepy Twitter API v2 client, uploads media if needed using v1.1 API, deletes draft on success or configurable failure, returns published tweet/thread IDs.async def handle_publish_draft(arguments: Any) -> Sequence[TextContent]: if not isinstance(arguments, dict) or "draft_id" not in arguments: raise ValueError("Invalid arguments for publish_draft") draft_id = arguments["draft_id"] filepath = os.path.join("drafts", draft_id) if not os.path.exists(filepath): raise ValueError(f"Draft {draft_id} does not exist") # Read the draft first try: with open(filepath, "r") as f: draft = json.load(f) except Exception as e: logger.error(f"Error reading draft {draft_id}: {str(e)}") raise RuntimeError(f"Error reading draft {draft_id}: {str(e)}") # Try to publish the draft try: if "content" in draft: content = draft["content"] # Check if this is a reply draft if draft.get("type") == "reply" and "reply_to_tweet_id" in draft: # Reply to existing tweet reply_to_tweet_id = draft["reply_to_tweet_id"] response = get_write_client().create_tweet(text=content, in_reply_to_tweet_id=reply_to_tweet_id) tweet_id = response.data['id'] logger.info(f"Published reply tweet ID {tweet_id} to tweet {reply_to_tweet_id}") # Only delete the draft after successful publishing os.remove(filepath) return [ TextContent( type="text", text=f"Draft {draft_id} published as reply tweet ID {tweet_id} to tweet {reply_to_tweet_id}", ) ] else: # Single tweet response = get_write_client().create_tweet(text=content) tweet_id = response.data['id'] logger.info(f"Published tweet ID {tweet_id}") # Only delete the draft after successful publishing os.remove(filepath) return [ TextContent( type="text", text=f"Draft {draft_id} published as tweet ID {tweet_id}", ) ] elif "comment" in draft and draft.get("type") == "quote_tweet": # Quote tweet draft comment = draft["comment"] quote_tweet_id = draft["quote_tweet_id"] response = get_write_client().create_tweet(text=comment, quote_tweet_id=quote_tweet_id) tweet_id = response.data['id'] logger.info(f"Published quote tweet ID {tweet_id} quoting tweet {quote_tweet_id}") # Only delete the draft after successful publishing os.remove(filepath) return [ TextContent( type="text", text=f"Draft {draft_id} published as quote tweet ID {tweet_id} quoting tweet {quote_tweet_id}", ) ] elif "media_files" in draft and draft.get("type") == "tweet_with_media": # Tweet with media draft content = draft["content"] media_files = draft["media_files"] # Upload media files and collect media IDs media_ids = [] for media_file in media_files: file_path = media_file["file_path"] media_type = media_file["media_type"] alt_text = media_file.get("alt_text") # Check if file exists if not os.path.exists(file_path): raise ValueError(f"Media file not found: {file_path}") # Upload media media_upload = api.media_upload(filename=file_path) media_id = media_upload.media_id_string media_ids.append(media_id) # Add alt text if provided and media is an image if alt_text and media_type in ["image", "gif"]: api.create_media_metadata(media_id=media_id, alt_text=alt_text) logger.info(f"Uploaded {media_type} for draft: {media_id}") # Create tweet with media response = get_write_client().create_tweet(text=content, media_ids=media_ids) tweet_id = response.data['id'] logger.info(f"Published tweet with media ID {tweet_id}, media IDs: {media_ids}") # Only delete the draft after successful publishing os.remove(filepath) return [ TextContent( type="text", text=f"Draft {draft_id} published as tweet with media ID {tweet_id} ({len(media_ids)} media files)", ) ] elif "contents" in draft: # Thread contents = draft["contents"] # Publish the thread published_tweet_ids = [] last_tweet_id = None try: for i, content in enumerate(contents): if last_tweet_id is None: response = get_write_client().create_tweet(text=content) else: response = get_write_client().create_tweet(text=content, in_reply_to_tweet_id=last_tweet_id) last_tweet_id = response.data['id'] published_tweet_ids.append(last_tweet_id) await asyncio.sleep(1) # Avoid hitting rate limits logger.info(f"Published thread with {len(published_tweet_ids)} tweets, starting with ID {published_tweet_ids[0]}") # Only delete the draft after successful publishing of entire thread os.remove(filepath) return [ TextContent( type="text", text=f"Draft {draft_id} published as thread with {len(published_tweet_ids)} tweets, starting with tweet ID {published_tweet_ids[0]}", ) ] except Exception as thread_error: # If thread publishing fails partway through, log which tweets were published if published_tweet_ids: logger.error(f"Thread publishing failed after {len(published_tweet_ids)} tweets. Published tweet IDs: {published_tweet_ids}") # Delete the draft even if thread partially published delete_draft_on_failure(draft_id, filepath) status_msg = "Draft has been deleted." if AUTO_DELETE_FAILED_DRAFTS else "Draft preserved for retry." raise RuntimeError(f"Thread publishing failed after {len(published_tweet_ids)} tweets. Published tweets: {published_tweet_ids}. {status_msg} Error: {thread_error}") else: # No tweets were published, the error will be handled by the outer exception handler raise thread_error else: raise ValueError(f"Invalid draft format for {draft_id}") except tweepy.TweepError as e: logger.error(f"Twitter API error publishing draft {draft_id}: {e}") delete_draft_on_failure(draft_id, filepath) status_msg = "Draft has been deleted." if AUTO_DELETE_FAILED_DRAFTS else "Draft preserved for retry." raise RuntimeError(f"Twitter API error publishing draft {draft_id}: {e}. {status_msg}") except Exception as e: logger.error(f"Error publishing draft {draft_id}: {str(e)}") delete_draft_on_failure(draft_id, filepath) status_msg = "Draft has been deleted." if AUTO_DELETE_FAILED_DRAFTS else "Draft preserved for retry." raise RuntimeError(f"Error publishing draft {draft_id}: {str(e)}. {status_msg}")
- src/x_mcp/server.py:87-97 (helper)Helper utility called by publish_draft handler on errors to conditionally delete the draft file based on AUTO_DELETE_FAILED_DRAFTS configuration.def delete_draft_on_failure(draft_id: str, filepath: str) -> None: """Delete draft file if auto-delete is enabled""" if AUTO_DELETE_FAILED_DRAFTS: try: os.remove(filepath) logger.info(f"Deleted draft {draft_id} due to publishing failure (auto-delete enabled)") except Exception as delete_error: logger.error(f"Failed to delete draft {draft_id} after publishing error: {delete_error}") else: logger.info(f"Draft {draft_id} preserved for retry (auto-delete disabled)")
- src/x_mcp/server.py:544-545 (registration)Dispatch registration in the @server.call_tool() handler that routes 'publish_draft' calls to the handle_publish_draft function.elif name == "publish_draft": return await handle_publish_draft(arguments)