Skip to main content
Glama
concurrency_limits.py8.89 kB
from typing import Any, Callable, Dict, List, Optional, Union from uuid import UUID import mcp.types as types from prefect import get_client def get_all_functions() -> list[tuple[Callable, str, str]]: return [ (get_concurrency_limits, "get_concurrency_limits", "Get all concurrency limits"), (get_concurrency_limit, "get_concurrency_limit", "Get a concurrency limit by ID"), (get_concurrency_limit_by_tag, "get_concurrency_limit_by_tag", "Get a concurrency limit by tag"), (create_concurrency_limit, "create_concurrency_limit", "Create a concurrency limit"), (update_concurrency_limit, "update_concurrency_limit", "Update a concurrency limit"), (delete_concurrency_limit, "delete_concurrency_limit", "Delete a concurrency limit"), (increment_concurrency_limit, "increment_concurrency_limit", "Increment a concurrency limit"), (decrement_concurrency_limit, "decrement_concurrency_limit", "Decrement a concurrency limit"), (reset_concurrency_limit, "reset_concurrency_limit", "Reset a concurrency limit by tag"), ] async def get_concurrency_limits() -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get all concurrency limits. Returns: A list of concurrency limits """ async with get_client() as client: try: # Prefect API doesn't support limit/offset on concurrency limits endpoint directly # Use the filter endpoint instead concurrency_limits = await client.read_concurrency_limits_filter() limits_result = { "concurrency_limits": [limit.dict() for limit in concurrency_limits] } return [types.TextContent(type="text", text=str(limits_result))] except Exception as e: error_message = f"Error fetching concurrency limits: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_concurrency_limit( limit_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a concurrency limit by ID. Args: limit_id: The concurrency limit UUID Returns: Concurrency limit details """ async with get_client() as client: try: concurrency_limit = await client.read_concurrency_limit(UUID(limit_id)) return [types.TextContent(type="text", text=str(concurrency_limit.dict()))] except Exception as e: error_message = f"Error fetching concurrency limit {limit_id}: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_concurrency_limit_by_tag( tag: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a concurrency limit by tag. Args: tag: The concurrency limit tag Returns: Concurrency limit details """ async with get_client() as client: try: concurrency_limit = await client.read_concurrency_limit_by_tag(tag) return [types.TextContent(type="text", text=str(concurrency_limit.dict()))] except Exception as e: error_message = f"Error fetching concurrency limit for tag '{tag}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def create_concurrency_limit( tag: str, concurrency_limit: int, active: Optional[bool] = True, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Create a concurrency limit. Args: tag: The tag to limit concurrency_limit: The maximum allowed concurrency active: Whether the limit is active Returns: Details of the created concurrency limit """ async with get_client() as client: try: limit = await client.create_concurrency_limit( tag=tag, concurrency_limit=concurrency_limit, active=active ) return [types.TextContent(type="text", text=str(limit.dict()))] except Exception as e: error_message = f"Error creating concurrency limit: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def update_concurrency_limit( limit_id: str, concurrency_limit: Optional[int] = None, active: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Update a concurrency limit. Args: limit_id: The concurrency limit UUID concurrency_limit: The new maximum allowed concurrency active: Whether the limit is active Returns: Details of the updated concurrency limit """ async with get_client() as client: try: # Prepare update data update_data = {} if concurrency_limit is not None: update_data["concurrency_limit"] = concurrency_limit if active is not None: update_data["active"] = active updated_limit = await client.update_concurrency_limit( id=UUID(limit_id), **update_data ) return [types.TextContent(type="text", text=str(updated_limit.dict()))] except Exception as e: error_message = f"Error updating concurrency limit {limit_id}: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def delete_concurrency_limit( limit_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Delete a concurrency limit by ID. Args: limit_id: The concurrency limit UUID Returns: Confirmation message """ async with get_client() as client: try: await client.delete_concurrency_limit(UUID(limit_id)) return [types.TextContent(type="text", text=f"Concurrency limit '{limit_id}' deleted successfully.")] except Exception as e: error_message = f"Error deleting concurrency limit {limit_id}: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def increment_concurrency_limit( tag: str, delta: int = 1, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Increment a concurrency limit by tag. Args: tag: The concurrency limit tag delta: Amount to increment by (default 1) Returns: Updated concurrency limit details """ async with get_client() as client: try: updated_limit = await client.increment_concurrency_limit( tag=tag, delta=delta ) return [types.TextContent(type="text", text=str(updated_limit.dict()))] except Exception as e: error_message = f"Error incrementing concurrency limit for tag '{tag}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def decrement_concurrency_limit( tag: str, delta: int = 1, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Decrement a concurrency limit by tag. Args: tag: The concurrency limit tag delta: Amount to decrement by (default 1) Returns: Updated concurrency limit details """ async with get_client() as client: try: updated_limit = await client.decrement_concurrency_limit( tag=tag, delta=delta ) return [types.TextContent(type="text", text=str(updated_limit.dict()))] except Exception as e: error_message = f"Error decrementing concurrency limit for tag '{tag}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def reset_concurrency_limit( tag: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Reset a concurrency limit by tag, setting its current count to 0. Args: tag: The concurrency limit tag Returns: Updated concurrency limit details """ async with get_client() as client: try: updated_limit = await client.reset_concurrency_limit(tag=tag) return [types.TextContent(type="text", text=str(updated_limit.dict()))] except Exception as e: error_message = f"Error resetting concurrency limit for tag '{tag}': {str(e)}" return [types.TextContent(type="text", text=error_message)]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/allen-munsch/mcp-prefect'

If you have feedback or need assistance with the MCP directory API, please join our Discord server