Skip to main content
Glama
block_schemas.py3.05 kB
# ============================================================================ # block_schemas.py - Block Schemas Module # ============================================================================ @mcp.tool async def get_block_schemas( limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get block schemas.""" try: async with get_client() as client: response = await client._client.post( "/block_schemas/filter", json={"limit": limit, "offset": offset} ) return [types.TextContent(type="text", text=str({"block_schemas": response.json()}))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] @mcp.tool async def get_block_schema( schema_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get a block schema by ID.""" try: async with get_client() as client: response = await client._client.get(f"/block_schemas/{schema_id}") return [types.TextContent(type="text", text=str(response.json()))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] @mcp.tool async def get_block_schema_by_checksum( checksum: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get a block schema by checksum.""" try: async with get_client() as client: response = await client._client.get(f"/block_schemas/checksum/{checksum}") return [types.TextContent(type="text", text=str(response.json()))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] # ============================================================================ # flow_run_states.py - Flow Run States Module # ============================================================================ @mcp.tool async def get_flow_run_state( state_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get a flow run state by ID.""" try: async with get_client() as client: response = await client._client.get(f"/flow_run_states/{state_id}") return [types.TextContent(type="text", text=str(response.json()))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] @mcp.tool async def get_flow_run_states( flow_run_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get all states for a flow run.""" try: async with get_client() as client: response = await client._client.get(f"/flow_run_states/?flow_run_id={flow_run_id}") return [types.TextContent(type="text", text=str({"states": response.json()}))] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/allen-munsch/mcp-prefect'

If you have feedback or need assistance with the MCP directory API, please join our Discord server