Skip to main content
Glama
validations.go5.31 kB
package streamflow import ( "fmt" "github.com/rs/zerolog/log" ) // validateFlow validates the flow graph for logical inconsistencies. // It ensures that graph has a valid root, // all processor connections within the graph are valid. // It also checks if the edges correctly point to existing nodes, // detects unconnected processors, and detects circular connections. // If any validation error is found, an error is returned. func validateFlow(flowGraph *Flow) error { if err := validateDirection(flowGraph.request); err != nil { return fmt.Errorf("request direction: %w", err) } if err := validateDirection(flowGraph.response); err != nil { return fmt.Errorf("response direction: %w", err) } if !flowGraph.request.IsDefined() && !flowGraph.response.IsDefined() { return fmt.Errorf("flow graph has no flow direction defined") } return nil } // validateDirection validates the flow direction for logical inconsistencies. func validateDirection(flow *FlowDirection) error { if !flow.IsDefined() { log.Trace(). Msgf("flow direction '%s' of type '%s' is not defined", flow.flowName, flow.flowType) return nil } // response flow can be without root - it can be a flow that only works with early response if flow.GetFlowType().IsRequestType() && !flow.HasValidRoot() { return fmt.Errorf("flow graph has no valid root node") } if err := validateEdges(flow.nodes); err != nil { return err } if err := validateUnconnectedProcessors(flow); err != nil { return err } return detectCircularConnections(flow) } // validateEdges validates the edges of the flow graph node. func validateEdges(nodes map[string]*FlowGraphNode) error { for _, node := range nodes { for _, edge := range node.GetEdges() { if !edge.IsValid() { return fmt.Errorf("edge from processor '%s' is invalid", node.processorKey) } } } return nil } // validateUnconnectedProcessors checks if any processors in the flow graph are unconnected. func validateUnconnectedProcessors(flow *FlowDirection) error { connectedProcessors := make(map[string]bool) // Mark processors as connected if they have outgoing connections. for processorName, node := range flow.nodes { log.Trace().Msgf("Validating connected processor %s", processorName) if len(node.edges) > 0 { connectedProcessors[processorName] = true } // Mark target processors of outgoing edges as connected for _, edge := range node.edges { if edge.node != nil { log.Trace().Msgf("Marking connected processor %s", edge.node.processorKey) connectedProcessors[edge.node.processorKey] = true } } } // Mark root node as connected if flow.root != nil && flow.root.IsValid() && len(flow.root.node.GetEdges()) > 0 { connectedProcessors[flow.root.node.processorKey] = true } log.Trace().Msgf("Connected processors: %v", connectedProcessors) // Identify any processors not marked as connected for processorName := range flow.nodes { if _, exists := connectedProcessors[processorName]; !exists { return fmt.Errorf("processor '%s' is unconnected", processorName) } } return nil } // detectCircularConnections detects circular connections in the flow graph. func detectCircularConnections(flowDir *FlowDirection) error { if flowDir.GetFlowType().IsResponseType() && !flowDir.HasValidRoot() { return nil } rootEdges := flowDir.root.node.edges for _, connection := range rootEdges { if connection.node == nil { continue } log.Trace(). Str("flowGraphName", connection.node.flowGraphName). Msgf("Validating no circular connections for processor %s", connection.node.processorKey) visitedByCondition := make( map[string]map[string]bool, ) // key - condition, value - processorKey proc := connection.node.processorKey if !dfsDetectCycles(connection.node, visitedByCondition, proc, connection.condition) { return fmt.Errorf("circular connection detected - processor '%s'", proc) } log.Trace().Msgf("No cycle detected for processor %s", proc) } return nil } // dfsDetectCycles performs a DFS from the given node to detect cycles. // Returns true if a cycle is detected. func dfsDetectCycles( node *FlowGraphNode, visitedByCondition map[string]map[string]bool, current, condition string, ) bool { if condition == "" { condition = "*" } log.Trace(). Msgf("dfsDetectCycles: visiting processor %s on condition %s", current, condition) if visited, found := visitedByCondition[condition]; found { if _, foundCurrent := visited[current]; foundCurrent { log.Debug().Msgf("Cycle detected: on condition %s -> processor %s", condition, current) return false } } if visitedByCondition[condition] == nil { visitedByCondition[condition] = make(map[string]bool) } visitedByCondition[condition][current] = true for _, edge := range node.edges { if edge.node == nil { continue } if !dfsDetectCycles( edge.node, cloneVisitsMap(visitedByCondition), edge.node.processorKey, edge.condition, ) { return false } } return true } func cloneVisitsMap(m map[string]map[string]bool) map[string]map[string]bool { result := map[string]map[string]bool{} for outerK, outerV := range m { result[outerK] = map[string]bool{} for innerK, innerV := range outerV { result[outerK][innerK] = innerV } } return result }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server