Skip to main content
Glama
flow.py2.27 kB
import yaml from dataclasses import dataclass, field, asdict from typing import List, Dict, Optional, Any from utils.consts import * from utils.flows.flow import Filter, Processor @dataclass class FlowRef: name: str at: str @dataclass class StreamRef: name: str at: str @dataclass class ProcessorRef: name: str condition: Optional[str] = None @dataclass class Connection: stream: Optional[StreamRef] = None flow: Optional[FlowRef] = None processor: Optional[ProcessorRef] = None @dataclass class FlowConnection: from_: Connection to: Connection @dataclass class Flow: request: List[FlowConnection] = field(default_factory=list) response: List[FlowConnection] = field(default_factory=list) @dataclass class FlowRepresentation: name: Optional[str] = field(default=None) filter: Filter = field(default_factory=Filter) processors: Dict[str, Processor] = field(default_factory=dict) flow: Flow = field(default_factory=Flow) def add_processor(self, key: str, processor: Processor): self.processors[key] = processor def add_flow_request(self, from_: Connection, to: Connection): self.flow.request.append(FlowConnection(from_, to)) def add_flow_response(self, from_: Connection, to: Connection): self.flow.response.append(FlowConnection(from_, to)) def to_dict(self): def dict_factory(data: Any) -> Dict[str, Any]: return { k: v for k, v in data if v is not None and (not isinstance(v, list) or v) } return asdict(self, dict_factory=dict_factory) def build_yaml(self): def custom_represent_dict(dumper: Any, data: Any): new_data = {} for k, v in data.items(): if isinstance(k, Enum): k = str(k) if isinstance(v, Enum): v = str(v) cleaned_key = k.rstrip("_") new_data[cleaned_key] = v return dumper.represent_dict(new_data) yaml.add_representer(dict, custom_represent_dict) return yaml.dump( self.to_dict(), default_flow_style=False, sort_keys=False, canonical=False )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server