Skip to main content
Glama
stories.py26.9 kB
import json from typing import Any, Optional, Dict, List, Union from mcp.server.fastmcp import FastMCP from httpx import AsyncClient from utils.api import ( build_management_url, get_management_headers, _handle_response, APIError, ) from tools.components import get_component_schema_by_name def register_stories(mcp: FastMCP, client: AsyncClient) -> None: @mcp.tool() async def fetch_stories( page: Optional[int] = 1, per_page: Optional[int] = 25, contain_component: Optional[str] = None, text_search: Optional[str] = None, sort_by: Optional[str] = None, pinned: Optional[bool] = None, excluding_ids: Optional[str] = None, by_ids: Optional[str] = None, by_uuids: Optional[str] = None, with_tag: Optional[str] = None, folder_only: Optional[bool] = None, story_only: Optional[bool] = None, with_parent: Optional[int] = None, starts_with: Optional[str] = None, in_trash: Optional[bool] = None, search: Optional[str] = None, filter_query: Optional[Union[str, Dict[str, Any]]] = None, in_release: Optional[int] = None, is_published: Optional[bool] = None, by_slugs: Optional[str] = None, mine: Optional[bool] = None, excluding_slugs: Optional[str] = None, in_workflow_stages: Optional[str] = None, by_uuids_ordered: Optional[str] = None, with_slug: Optional[str] = None, with_summary: Optional[bool] = None, scheduled_at_gt: Optional[str] = None, scheduled_at_lt: Optional[str] = None, favourite: Optional[bool] = None, reference_search: Optional[str] = None, ) -> Dict[str, Any]: """ Fetch multiple stories from Storyblok with advanced filtering and pagination. """ try: url = build_management_url("/stories") # Build query parameters raw_params = locals() params = {"page": page, "per_page": per_page} for key, val in raw_params.items(): if key in ["mcp", "client"] or val is None: continue if isinstance(val, bool): params[key] = 1 if val else 0 elif isinstance(val, dict): params[key] = json.dumps(val) else: params[key] = val resp = await client.get(url, headers=get_management_headers(), params=params) data = _handle_response(resp, url) return { "stories": data.get("stories", []), "total": len(data.get("stories", [])), "page": page, "per_page": per_page } except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def get_story( story_id: int ) -> Any: """ Retrieves a specific story by its ID. """ try: url = build_management_url(f"/stories/{story_id}") resp = await client.get(url, headers=get_management_headers()) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def create_story( name: str, slug: str, content: Dict[str, Any], parent_id: Optional[int] = None, group_id: Optional[str] = None, sort_by_date: Optional[str] = None, is_folder: bool = False, default_root: Optional[str] = None, disable_fe_editor: Optional[bool] = None, is_startpage: bool = False, meta_data: Optional[Dict[str, Any]] = None, pinned: Optional[bool] = None, translated_slugs_attributes: Optional[List[Dict[str, Any]]] = None, position: Optional[int] = None, publish: Optional[bool] = False, release_id: Optional[int] = None, ) -> Any: """ Creates a new Storyblok story. Supports all documented fields including publishing. """ try: # Construct story object with optional fields story_payload: Dict[str, Any] = { "name": name, "slug": slug, "content": content, } # Only include optional properties if provided for key, val in { "parent_id": parent_id, "group_id": group_id, "sort_by_date": sort_by_date, "is_folder": is_folder, "default_root": default_root, "disable_fe_editor": disable_fe_editor, "is_startpage": is_startpage, "meta_data": meta_data, "pinned": pinned, "translated_slugs_attributes": translated_slugs_attributes, "position": position, "release_id": release_id }.items(): if val is not None: story_payload[key] = val # Prepare request payload payload = {"story": story_payload} if publish: payload["publish"] = 1 url = build_management_url("/stories") resp = await client.post( url, headers=get_management_headers(), json=payload ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def update_story( story_id: int, name: Optional[str] = None, slug: Optional[str] = None, content: Optional[Dict[str, Any]] = None, parent_id: Optional[int] = None, group_id: Optional[str] = None, sort_by_date: Optional[str] = None, tag_list: Optional[List[str]] = None, is_folder: Optional[bool] = None, path: Optional[str] = None, default_root: Optional[str] = None, disable_fe_editor: Optional[bool] = None, is_startpage: Optional[bool] = None, meta_data: Optional[Dict[str, Any]] = None, pinned: Optional[bool] = None, first_published_at: Optional[str] = None, translated_slugs_attributes: Optional[List[Dict[str, Any]]] = None, position: Optional[int] = None, force_update: Optional[Union[bool, int]] = None, release_id: Optional[int] = None, publish: Optional[bool] = False, lang: Optional[str] = None ) -> Any: """ Updates an existing Storyblok story by ID. Supports all documented fields including publishing. """ try: if not name and not slug and not content and not publish: return {"isError": True, "content": [{"type": "text", "text": "No update fields or publish flag provided."}]} payload_story: Dict[str, Any] = {} for key, val in { "name": name, "slug": slug, "content": content, "parent_id": parent_id, "group_id": group_id, "sort_by_date": sort_by_date, "tag_list": tag_list, "is_folder": is_folder, "path": path, "default_root": default_root, "disable_fe_editor": disable_fe_editor, "is_startpage": is_startpage, "meta_data": meta_data, "pinned": pinned, "first_published_at": first_published_at, "translated_slugs_attributes": translated_slugs_attributes, "position": position, "release_id": release_id, "lang": lang, }.items(): if val is not None: payload_story[key] = val if force_update: payload_story["force_update"] = "1" payload: Dict[str, Any] = {"story": payload_story} if publish: payload["publish"] = 1 url = build_management_url(f"/stories/{story_id}") resp = await client.put(url, headers=get_management_headers(), json=payload) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def delete_story(id: str) -> Any: """Deletes a story by ID.""" try: url = build_management_url(f"/stories/{id}") resp = await client.delete(url, headers=get_management_headers()) _handle_response(resp, url) return {"message": f"Story {id} has been successfully deleted."} except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def publish_story( story_id: int, lang: Optional[str] = None, release_id: Optional[int] = None ) -> Any: """ Publishes a Storyblok story by its ID. """ try: url = build_management_url(f"/stories/{story_id}/publish") params = {} if lang: params["lang"] = lang if release_id is not None: params["release_id"] = release_id resp = await client.get(url, headers=get_management_headers(), params=params) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def unpublish_story( story_id: int, lang: Optional[str] = None ) -> Any: """ Unpublishes a Storyblok story by its ID. """ try: url = build_management_url(f"/stories/{story_id}/unpublish") params = {} if lang: params["lang"] = lang resp = await client.get(url, headers=get_management_headers(), params=params) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def get_story_versions( by_story_id: int, version_id: Optional[int] = None, by_release_id: Optional[int] = None, page: Optional[int] = 1, per_page: Optional[int] = 25, show_content: Optional[bool] = False ) -> Any: """ Retrieves versions (revisions) of stories. """ try: url = build_management_url("/story_versions") params = { "by_story_id": by_story_id, "page": page, "per_page": min(per_page, 100) } if version_id is not None: params["version_id"] = version_id if by_release_id is not None: params["by_release_id"] = by_release_id if show_content: params["show_content"] = 1 resp = await client.get(url, headers=get_management_headers(), params=params) data = _handle_response(resp, url) return { "versions": data.get("story_versions", []), "page": page, "per_page": params["per_page"], "total": data.get("total", None) } except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def restore_story(id: str, version_id: str) -> Any: """Restores a story to a specific version.""" try: url = build_management_url(f"/stories/{id}/restore/{version_id}") resp = await client.post(url, headers=get_management_headers()) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def validate_story_content( component_name: str, story_id: Optional[str] = None, story_content: Optional[Dict[str, Any]] = None, space_id: Optional[str] = None # currently unused ) -> Any: """ Validates a story's content against a component schema. Either provide story_id (to fetch) or story_content directly. """ try: schema = await get_component_schema_by_name(component_name, space_id) if not schema: return {"isError": True, "content": [{"type": "text", "text": f"Error: Component schema '{component_name}' not found."}]} content = story_content if not content and story_id: url = build_management_url(f"/stories/{story_id}") resp = await client.get(url, headers=get_management_headers()) story_data = _handle_response(resp, url) content = story_data.get("story", {}).get("content") if not content: return {"isError": True, "content": [{"type": "text", "text": "Error: story_id or story_content must be provided and valid."}]} errors, missing, extraneous = [], [], [] for field, defn in schema.items(): if defn.get("required") and content.get(field) is None: missing.append(field) errors.append({"field": field, "type": "missing_required", "message": f"Field '{field}' is required."}) for field in content: if field not in schema: extraneous.append(field) errors.append({"field": field, "type": "extraneous_field", "message": f"Field '{field}' not in schema."}) return { "isValid": not errors, "errors": errors, "missingFields": missing, "extraneousFields": extraneous, "validatedComponentName": component_name, "storyIdProcessed": story_id or "N/A" } except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def debug_story_access(story_id: str) -> Any: """Debug access to a specific story via various fetch parameters.""" api_call_attempts = [] issues = [] suggestions = [] draft_details = {"accessible": False, "contentPresent": False, "fromScenario": ""} pub_details = {"accessible": False, "contentPresent": False, "fromScenario": ""} scenarios = [ ("Default (likely draft)", {}), ("Published", {"version": "published"}), ("Draft explicit", {"version": "draft"}), ("Draft with content", {"version": "draft", "with_content": "1"}), ("Published with content", {"version": "published", "with_content": "1"}), ] for name, params in scenarios: attempt = {"scenarioName": name, "paramsUsed": {**params, "story_id": story_id}} try: resp = await client.get( build_management_url(f"/stories/{story_id}"), headers=get_management_headers(), params=params ) data = _handle_response(resp, resp.url) story = data.get("story", {}) content_present = bool(story.get("content")) attempt.update({ "status": resp.status_code, "responseData": { "id": story.get("id"), "name": story.get("name"), "published_at": story.get("published_at"), "full_slug": story.get("full_slug"), "content_present": content_present, "content_component": story.get("content", {}).get("component"), "version": story.get("version"), } }) if params.get("version") == "published": if not pub_details["accessible"] or (content_present and not pub_details["contentPresent"]): pub_details.update({"accessible": True, "contentPresent": content_present, "fromScenario": name}) if story.get("published_at") is None: issues.append(f"Scenario '{name}': fetched as published but no published_at.") else: if not draft_details["accessible"] or (content_present and not draft_details["contentPresent"]): draft_details.update({"accessible": True, "contentPresent": content_present, "fromScenario": name}) if params.get("with_content") and not content_present: issues.append(f"Scenario '{name}': with_content=1 used but no content present.") except APIError as e: parsed = {} try: parsed = json.loads(str(e)) attempt["status"] = int(parsed.get("error", "").split(" ")[0]) attempt["errorDetails"] = parsed except: attempt["status"] = "ERROR" attempt["errorDetails"] = str(e) api_call_attempts.append(attempt) # Analyze and generate suggestions if draft_details["accessible"] and not pub_details["accessible"]: suggestions.append("Accessible in draft but not published. Might be unpublished.") if pub_details["accessible"] and not draft_details["accessible"]: issues.append("Accessible in published but not draft.") if draft_details["accessible"] and pub_details["accessible"]: if draft_details["contentPresent"] and not pub_details["contentPresent"]: suggestions.append("Published version doesn't include content; try with_content=1.") if pub_details["contentPresent"] and not draft_details["contentPresent"]: suggestions.append("Draft version doesn't include content; try with_content=1.") if not draft_details["accessible"] and not pub_details["accessible"]: issues.append("Story not accessible in any scenario.") suggestions.append("Check story ID and token permissions.") all_404 = all(att.get("status") == 404 for att in api_call_attempts) if all_404: issues.append("All attempts returned 404 Not Found.") suggestions.append("Verify the story exists and isn't deleted.") any_403 = any(att.get("status") == 403 for att in api_call_attempts) if any_403: issues.append("One or more attempts resulted in 403 Forbidden.") suggestions.append("Check that your API token has proper permissions.") return { "storyId": story_id, "accessibleAsDraftDetails": draft_details, "accessibleAsPublishedDetails": pub_details, "issuesDetected": list(set(issues)), "suggestions": list(set(suggestions)), "apiCallAttempts": api_call_attempts } @mcp.tool() async def bulk_publish_stories(story_ids: List[str]) -> Any: """Publishes multiple stories by ID.""" results = [] success = fail = 0 for sid in story_ids: try: resp = await client.post( build_management_url(f"/stories/{sid}/publish"), headers=get_management_headers() ) data = _handle_response(resp, resp.url) results.append({"id": sid, "status": "success", "data": data}) success += 1 except APIError as e: results.append({"id": sid, "status": "error", "error": str(e)}) fail += 1 return {"total_processed": len(story_ids), "successful_operations": success, "failed_operations": fail, "results": results} @mcp.tool() async def bulk_delete_stories(story_ids: List[str]) -> Any: """Deletes multiple stories in Storyblok.""" results = [] success = fail = 0 for sid in story_ids: try: resp = await client.delete( build_management_url(f"/stories/{sid}"), headers=get_management_headers() ) _handle_response(resp, resp.url) results.append({"id": sid, "status": "success"}) success += 1 except APIError as e: results.append({ "id": sid, "status": "error", "error": str(e) }) fail += 1 return { "total_processed": len(story_ids), "successful_operations": success, "failed_operations": fail, "results": results } @mcp.tool() async def bulk_update_stories( stories: List[Dict[str, Any]] ) -> Any: """Updates multiple stories in Storyblok, optionally publishing them.""" results = [] success = fail = 0 for story_update in stories: sid = story_update.get("id") publish = story_update.pop("publish", False) update_fields = {k: v for k, v in story_update.items() if v is not None} try: resp = await client.put( build_management_url(f"/stories/{sid}"), headers=get_management_headers(), json={"story": update_fields} ) data = _handle_response(resp, resp.url) published = False if publish: try: publish_resp = await client.post( build_management_url(f"/stories/{sid}/publish"), headers=get_management_headers() ) _handle_response(publish_resp, publish_resp.url) published = True except APIError: pass # Publishing errors don't block update success results.append({ "id": sid, "status": "success", "data": data, "published": published }) success += 1 except APIError as e: results.append({ "id": sid, "status": "error", "error": str(e) }) fail += 1 return { "total_processed": len(stories), "successful_operations": success, "failed_operations": fail, "results": results } @mcp.tool() async def bulk_create_stories( stories: List[Dict[str, Any]] ) -> Any: """Creates multiple stories in Storyblok.""" results = [] success = fail = 0 for story_input in stories: try: resp = await client.post( build_management_url("/stories"), headers=get_management_headers(), json={"story": story_input} ) data = _handle_response(resp, resp.url) results.append({ "input": story_input, "id": data.get("story", {}).get("id"), "slug": data.get("story", {}).get("slug"), "status": "success", "data": data }) success += 1 except APIError as e: results.append({ "input": story_input, "slug": story_input.get("slug"), "status": "error", "error": str(e) }) fail += 1 return { "total_processed": len(stories), "successful_operations": success, "failed_operations": fail, "results": results } @mcp.tool() async def get_unpublished_dependencies( story_ids: List[int], release_id: Optional[int] = None ) -> Any: """ Retrieves unpublished dependencies for one or more stories. """ try: url = build_management_url("/stories/unpublished_dependencies") payload: Dict[str, Any] = {"story_ids": story_ids} if release_id is not None: payload["release_id"] = release_id resp = await client.post( url, headers=get_management_headers(), json=payload ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def ai_translate_story( space_id: int, story_id: int, lang: str, code: str, overwrite: bool = False, release_id: Optional[int] = None ) -> Any: """ Translates a story's content into a specified language using AI. """ try: url = build_management_url(f"/spaces/{space_id}/stories/{story_id}/ai_translate") payload = { "lang": lang, "code": code, "overwrite": overwrite } if release_id: payload["release_id"] = release_id resp = await client.put( url, headers=get_management_headers(), json=payload ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def compare_story_versions( story_id: int, version_v2: int ) -> Any: """ Compares two versions of a story to identify changes. """ try: url = build_management_url(f"/stories/{story_id}/compare") params = {"version_v2": version_v2} resp = await client.get( url, headers=get_management_headers(), params=params ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kiran1689/storyblok-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server