Skip to main content
Glama
queue.py1.7 kB
from dataclasses import dataclass from typing import Optional, Dict, Union, List from utils.flows.flow import ( KeyValue, KeyMapValue, Processor, ) @dataclass class QueueProcessor: quota_id: str ttl_seconds: int queue_size: int priority_group_by_header: Optional[str] = None priority_groups: Optional[Dict[str, int]] = None processor: str = "Queue" def get_condition_ok(self) -> str: return "allowed" def get_condition_bad(self) -> str: return "blocked" def get_processor(self) -> Processor: params: List[Union[KeyValue, KeyMapValue]] = [ KeyValue(key="quota_id", value=self.quota_id), KeyValue(key="ttl_seconds", value=self.ttl_seconds), KeyValue(key="queue_size", value=self.queue_size), ] if self.priority_group_by_header and not self.priority_groups: raise ValueError( "priority_groups must be provided when priority_group_by_header is provided" ) if not self.priority_group_by_header and self.priority_groups: raise ValueError( "group_by_header must be provided when priority_group_by_header is provided" ) if self.priority_group_by_header and self.priority_groups: params.append( KeyValue( key="priority_group_by_header", value=self.priority_group_by_header ) ) params.append( KeyMapValue(key="priority_groups", value=self.priority_groups) ) return Processor( processor=self.processor, parameters=params, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server