Skip to main content
Glama
components.py16.1 kB
import json import requests from typing import Optional, Dict, Any, List from mcp.server.fastmcp import FastMCP from httpx import AsyncClient from config import API_ENDPOINTS from utils.api import ( build_management_url, get_management_headers, _handle_response, APIError, ) def register_components(mcp: FastMCP, client: AsyncClient) -> None: """ @mcp.tool() async def fetch_components( component_summary: Optional[bool] = False, include_schema_details: Optional[bool] = True, filter_by_name: Optional[str] = None ) -> Dict[str, Any]: # Fetches components, optionally filtering and shaping the response. try: url = build_management_url("/components") resp = await client.get(url, headers=get_management_headers()) data = _handle_response(resp, url) comps = data.get("components", []) # Name filter if filter_by_name: fl = filter_by_name.lower() comps = [c for c in comps if fl in (c.get("name", "").lower() + c.get("display_name", "").lower())] # Summaries or remove schema if component_summary: comps = [{"id": c["id"], "name": c["name"], "display_name": c["display_name"]} for c in comps] elif not include_schema_details: comps = [{k: v for k, v in c.items() if k != "schema"} for c in comps] return {"components_count": len(comps), "components": comps} except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} """ @mcp.tool() async def fetch_components( component_summary: bool = False, include_schema_details: bool = True, filter_by_name: Optional[str] = None, is_root: Optional[bool] = None, in_group: Optional[int] = None, sort_by: Optional[str] = None, per_page: Optional[int] = None, # not used since non-paginated ) -> Dict[str, Any]: """Fetches components with server-side filters, sorting, and option to include groups.""" try: url = build_management_url("/components") params = {} if filter_by_name: params["search"] = filter_by_name if is_root is not None: params["is_root"] = 1 if is_root else 0 if in_group is not None: params["in_group"] = in_group if sort_by: params["sort_by"] = sort_by if per_page: params["per_page"] = per_page resp = await client.get(url, headers=get_management_headers(), params=params) data = _handle_response(resp, url) components = data.get("components", []) # Summaries or remove schema if requested if component_summary: components = [ {"id": c["id"], "name": c["name"], "display_name": c["display_name"]} for c in components ] elif not include_schema_details: components = [{k: v for k, v in c.items() if k != "schema"} for c in components] # Also fetch component groups (folders) # API returns component_groups at /component_groups endpoint groups_url = build_management_url("/component_groups") grp_resp = await client.get(groups_url, headers=get_management_headers(), params={}) groups_data = _handle_response(grp_resp, groups_url).get("component_groups", []) return { "components_count": len(components), "components": components, "component_groups": groups_data } except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def get_component(id: str) -> Dict[str, Any]: """Gets a specific component by ID.""" try: url = build_management_url(f"/components/{id}") resp = await client.get(url, headers=get_management_headers()) data = _handle_response(resp, url) return data except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} """ @mcp.tool() async def create_component( name: str, display_name: Optional[str] = None, schema: Dict[str, Any] = {}, is_root: Optional[bool] = False, is_nestable: Optional[bool] = True ) -> Dict[str, Any]: #Creates a new component (block). try: url = build_management_url("/components") payload = {"component": { "name": name, "display_name": display_name or name, "schema": schema, "is_root": is_root, "is_nestable": is_nestable }} resp = await client.post(url, headers=get_management_headers(), content=json.dumps(payload)) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} """ @mcp.tool() async def create_component( name: str, display_name: Optional[str] = None, schema: Dict[str, Any] = {}, is_root: Optional[bool] = False, is_nestable: Optional[bool] = True, preview_field: Optional[str] = None, preview_tmpl: Optional[str] = None, component_group_uuid: Optional[str] = None, color: Optional[str] = None, icon: Optional[str] = None, internal_tag_ids: Optional[List[str]] = None, content_type_asset_preview: Optional[str] = None, ) -> Dict[str, Any]: """Creates a new component with all supported fields.""" try: url = build_management_url("/components") payload_comp: Dict[str, Any] = { "name": name, "display_name": display_name or name, "schema": schema, "is_root": is_root, "is_nestable": is_nestable, } # Optional fields for key, val in { "preview_field": preview_field, "preview_tmpl": preview_tmpl, "component_group_uuid": component_group_uuid, "color": color, "icon": icon, "internal_tag_ids": internal_tag_ids, "content_type_asset_preview": content_type_asset_preview }.items(): if val is not None: payload_comp[key] = val resp = await client.post( url, headers=get_management_headers(), content=json.dumps({"component": payload_comp}) ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} """ @mcp.tool() async def update_component( id: str, name: Optional[str] = None, display_name: Optional[str] = None, schema: Optional[Dict[str, Any]] = None, is_root: Optional[bool] = None, is_nestable: Optional[bool] = None ) -> Dict[str, Any]: #Updates an existing component. try: url = build_management_url(f"/components/{id}") comp_data: Dict[str, Any] = {} for k, v in (("name", name), ("display_name", display_name), ("schema", schema), ("is_root", is_root), ("is_nestable", is_nestable)): if v is not None: comp_data[k] = v resp = await client.put(url, headers=get_management_headers(), content=json.dumps({"component": comp_data})) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} """ @mcp.tool() async def update_component( id: str, name: Optional[str] = None, display_name: Optional[str] = None, schema: Optional[Dict[str, Any]] = None, image: Optional[str] = None, preview_field: Optional[str] = None, preview_tmpl: Optional[str] = None, is_root: Optional[bool] = None, is_nestable: Optional[bool] = None, component_group_uuid: Optional[str] = None, color: Optional[str] = None, icon: Optional[str] = None, internal_tag_ids: Optional[List[str]] = None, content_type_asset_preview: Optional[str] = None, ) -> Dict[str, Any]: """Updates an existing component with all supported fields.""" try: url = build_management_url(f"/components/{id}") comp_data: Dict[str, Any] = {} # Populate only provided fields for key, val in { "name": name, "display_name": display_name, "schema": schema, "image": image, "preview_field": preview_field, "preview_tmpl": preview_tmpl, "is_root": is_root, "is_nestable": is_nestable, "component_group_uuid": component_group_uuid, "color": color, "icon": icon, "internal_tag_ids": internal_tag_ids, "content_type_asset_preview": content_type_asset_preview }.items(): if val is not None: comp_data[key] = val resp = await client.put( url, headers=get_management_headers(), content=json.dumps({"component": comp_data}) ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def delete_component(id: str) -> Dict[str, Any]: """Deletes a component by ID.""" try: url = build_management_url(f"/components/{id}") resp = await client.delete(url, headers=get_management_headers()) _handle_response(resp, url) return {"message": f"Component {id} has been successfully deleted."} except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def get_component_usage(component_name: str) -> Dict[str, Any]: """Finds stories where a component is used in content (draft & published).""" MAX_PAGES = 10 PER_PAGE = 100 stories_map: Dict[int, Dict[str, Any]] = {} limit_reached = False async def fetch_page(version: str, page: int): url = (build_management_url("/stories") + f"?page={page}&per_page={PER_PAGE}&with_content=1&version={version}") resp = await client.get(url, headers=get_management_headers()) return _handle_response(resp, url) for version in ("published", "draft"): page = 1 while page <= MAX_PAGES: try: data = await fetch_page(version, page) for st in data.get("stories", []): stories_map[st["id"]] = st if len(data.get("stories", [])) < PER_PAGE: break page += 1 except APIError: break else: limit_reached = True used = [] def search(val: Any) -> bool: if isinstance(val, list): return any(search(v) for v in val) if isinstance(val, dict): if val.get("component") == component_name: return True return any(search(v) for v in val.values()) return False for st in stories_map.values(): if search(st.get("content", {})): used.append({k: st[k] for k in ("id", "name", "slug", "full_slug")}) return { "component_name": component_name, "usage_count": len(used), "stories_analyzed_count": len(stories_map), "search_limit_reached": limit_reached, "used_in_stories": used } @mcp.tool() async def retrieve_component_versions( component_id: str, page: Optional[int] = 1, per_page: Optional[int] = 25 ) -> Dict[str, Any]: """ Retrieves paginated versions of a component. """ try: url = build_management_url("/versions") params = { "model": "components", "model_id": component_id, "page": page, "per_page": min(per_page, 100) } resp = await client.get( url, headers=get_management_headers(), params=params ) data = _handle_response(resp, url) versions = data.get("versions", []) return { "versions": versions, "page": page, "per_page": params["per_page"], "total_versions": len(versions) } except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def retrieve_single_component_version( component_id: str, version_id: str ) -> Dict[str, Any]: """ Retrieves the schema details of a specific component version. """ try: url = build_management_url( f"/components/{component_id}/component_versions/{version_id}" ) resp = await client.get( url, headers=get_management_headers() ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} @mcp.tool() async def restore_component_version( version_id: str, component_id: str, ) -> Dict[str, Any]: """ Restores a component to a previous version. """ try: path = f"/versions/{version_id}" url = build_management_url(path) payload = { "model": "components", "model_id": component_id } resp = await client.put( url, headers=get_management_headers(), content=json.dumps(payload) ) return _handle_response(resp, url) except APIError as e: return {"isError": True, "content": [{"type": "text", "text": str(e)}]} def get_component_schema_by_name(component_name: str, space_id: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Fetches the schema of a component by its name from the Storyblok Management API. Args: component_name (str): The name of the component to retrieve. space_id (Optional[str]): Placeholder for future use (e.g., handling different spaces or credentials). Returns: Optional[Dict[str, Any]]: The schema of the component if found, otherwise None. """ # Future support for space-specific logic if space_id: # Placeholder for handling alternate tokens or clients per space pass endpoint = build_management_url('/components') response = requests.get(endpoint, headers=get_management_headers()) data = _handle_response(response, endpoint) if data and isinstance(data.get("components"), list): for component in data["components"]: if component.get("name") == component_name: return component.get("schema") or None return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kiran1689/storyblok-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server