We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/allen-munsch/mcp-prefect'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import Any, Callable, Dict, List, Optional, Union
from uuid import UUID
import mcp.types as types
from prefect import get_client
def get_all_functions() -> list[tuple[Callable, str, str]]:
return [
(get_concurrency_limits, "get_concurrency_limits", "Get all concurrency limits"),
(get_concurrency_limit, "get_concurrency_limit", "Get a concurrency limit by ID"),
(get_concurrency_limit_by_tag, "get_concurrency_limit_by_tag", "Get a concurrency limit by tag"),
(create_concurrency_limit, "create_concurrency_limit", "Create a concurrency limit"),
(update_concurrency_limit, "update_concurrency_limit", "Update a concurrency limit"),
(delete_concurrency_limit, "delete_concurrency_limit", "Delete a concurrency limit"),
(increment_concurrency_limit, "increment_concurrency_limit", "Increment a concurrency limit"),
(decrement_concurrency_limit, "decrement_concurrency_limit", "Decrement a concurrency limit"),
(reset_concurrency_limit, "reset_concurrency_limit", "Reset a concurrency limit by tag"),
]
async def get_concurrency_limits() -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Get all concurrency limits.
Returns:
A list of concurrency limits
"""
async with get_client() as client:
try:
# Prefect API doesn't support limit/offset on concurrency limits endpoint directly
# Use the filter endpoint instead
concurrency_limits = await client.read_concurrency_limits_filter()
limits_result = {
"concurrency_limits": [limit.dict() for limit in concurrency_limits]
}
return [types.TextContent(type="text", text=str(limits_result))]
except Exception as e:
error_message = f"Error fetching concurrency limits: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def get_concurrency_limit(
limit_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Get a concurrency limit by ID.
Args:
limit_id: The concurrency limit UUID
Returns:
Concurrency limit details
"""
async with get_client() as client:
try:
concurrency_limit = await client.read_concurrency_limit(UUID(limit_id))
return [types.TextContent(type="text", text=str(concurrency_limit.dict()))]
except Exception as e:
error_message = f"Error fetching concurrency limit {limit_id}: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def get_concurrency_limit_by_tag(
tag: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Get a concurrency limit by tag.
Args:
tag: The concurrency limit tag
Returns:
Concurrency limit details
"""
async with get_client() as client:
try:
concurrency_limit = await client.read_concurrency_limit_by_tag(tag)
return [types.TextContent(type="text", text=str(concurrency_limit.dict()))]
except Exception as e:
error_message = f"Error fetching concurrency limit for tag '{tag}': {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def create_concurrency_limit(
tag: str,
concurrency_limit: int,
active: Optional[bool] = True,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Create a concurrency limit.
Args:
tag: The tag to limit
concurrency_limit: The maximum allowed concurrency
active: Whether the limit is active
Returns:
Details of the created concurrency limit
"""
async with get_client() as client:
try:
limit = await client.create_concurrency_limit(
tag=tag,
concurrency_limit=concurrency_limit,
active=active
)
return [types.TextContent(type="text", text=str(limit.dict()))]
except Exception as e:
error_message = f"Error creating concurrency limit: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def update_concurrency_limit(
limit_id: str,
concurrency_limit: Optional[int] = None,
active: Optional[bool] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Update a concurrency limit.
Args:
limit_id: The concurrency limit UUID
concurrency_limit: The new maximum allowed concurrency
active: Whether the limit is active
Returns:
Details of the updated concurrency limit
"""
async with get_client() as client:
try:
# Prepare update data
update_data = {}
if concurrency_limit is not None:
update_data["concurrency_limit"] = concurrency_limit
if active is not None:
update_data["active"] = active
updated_limit = await client.update_concurrency_limit(
id=UUID(limit_id),
**update_data
)
return [types.TextContent(type="text", text=str(updated_limit.dict()))]
except Exception as e:
error_message = f"Error updating concurrency limit {limit_id}: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def delete_concurrency_limit(
limit_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Delete a concurrency limit by ID.
Args:
limit_id: The concurrency limit UUID
Returns:
Confirmation message
"""
async with get_client() as client:
try:
await client.delete_concurrency_limit(UUID(limit_id))
return [types.TextContent(type="text", text=f"Concurrency limit '{limit_id}' deleted successfully.")]
except Exception as e:
error_message = f"Error deleting concurrency limit {limit_id}: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def increment_concurrency_limit(
tag: str,
delta: int = 1,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Increment a concurrency limit by tag.
Args:
tag: The concurrency limit tag
delta: Amount to increment by (default 1)
Returns:
Updated concurrency limit details
"""
async with get_client() as client:
try:
updated_limit = await client.increment_concurrency_limit(
tag=tag,
delta=delta
)
return [types.TextContent(type="text", text=str(updated_limit.dict()))]
except Exception as e:
error_message = f"Error incrementing concurrency limit for tag '{tag}': {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def decrement_concurrency_limit(
tag: str,
delta: int = 1,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Decrement a concurrency limit by tag.
Args:
tag: The concurrency limit tag
delta: Amount to decrement by (default 1)
Returns:
Updated concurrency limit details
"""
async with get_client() as client:
try:
updated_limit = await client.decrement_concurrency_limit(
tag=tag,
delta=delta
)
return [types.TextContent(type="text", text=str(updated_limit.dict()))]
except Exception as e:
error_message = f"Error decrementing concurrency limit for tag '{tag}': {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def reset_concurrency_limit(
tag: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
"""
Reset a concurrency limit by tag, setting its current count to 0.
Args:
tag: The concurrency limit tag
Returns:
Updated concurrency limit details
"""
async with get_client() as client:
try:
updated_limit = await client.reset_concurrency_limit(tag=tag)
return [types.TextContent(type="text", text=str(updated_limit.dict()))]
except Exception as e:
error_message = f"Error resetting concurrency limit for tag '{tag}': {str(e)}"
return [types.TextContent(type="text", text=error_message)]