from typing import List, Dict, Set
from .schemas import FlowInput, Finding, Node, Edge
class LinterRules:
@staticmethod
def check_unreachable_node(flow: FlowInput) -> List[Finding]:
findings = []
nodes_map = {node.id: node for node in flow.nodes}
start_nodes = [node for node in flow.nodes if node.type == "Start"]
if not start_nodes:
return [Finding(
code="missing_start_node",
message="Flow must have a Start node",
severity="error"
)]
# BFS to find reachable nodes
visited = set()
queue = [start_nodes[0].id]
visited.add(start_nodes[0].id)
# Build adjacency list
adjacency = {node.id: [] for node in flow.nodes}
for edge in flow.edges:
if edge.from_node in adjacency:
adjacency[edge.from_node].append(edge.to_node)
while queue:
current = queue.pop(0)
for neighbor in adjacency.get(current, []):
if neighbor not in visited and neighbor in nodes_map:
visited.add(neighbor)
queue.append(neighbor)
# Check for unvisited nodes
for node in flow.nodes:
if node.id not in visited:
findings.append(Finding(
code="unreachable_node",
node_id=node.id,
message=f"Node '{node.id}' is not reachable from Start",
fix="Connect this node to the main flow or remove it",
severity="error"
))
return findings
@staticmethod
def check_dead_end_not_end(flow: FlowInput) -> List[Finding]:
findings = []
outgoing_edges = {node.id: 0 for node in flow.nodes}
for edge in flow.edges:
if edge.from_node in outgoing_edges:
outgoing_edges[edge.from_node] += 1
for node in flow.nodes:
if node.type != "End" and outgoing_edges[node.id] == 0:
findings.append(Finding(
code="dead_end_not_end",
node_id=node.id,
message=f"Node '{node.id}' ({node.type}) has no outgoing edges but is not an End node",
fix="Add an outgoing edge or change type to End",
severity="error"
))
return findings
@staticmethod
def check_missing_fallback_on_ask(flow: FlowInput) -> List[Finding]:
findings = []
for node in flow.nodes:
if node.type == "Ask":
has_fallback = False
outgoing = [e for e in flow.edges if e.from_node == node.id]
for edge in outgoing:
# Check for explicit fallback flag or conventions
labels = (edge.label or "").lower()
conditions = (edge.condition or "").lower()
if (edge.is_fallback or
"timeout" in labels or "no_match" in labels or "invalid" in labels or
"timeout" in conditions or "no_match" in conditions):
has_fallback = True
break
if not has_fallback:
findings.append(Finding(
code="missing_fallback_on_ask",
node_id=node.id,
message=f"Ask node '{node.id}' missing fallback (timeout/no-match) path",
fix="Add an edge with is_fallback=true or label='timeout'",
severity="error"
))
return findings
@staticmethod
def check_var_used_not_defined(flow: FlowInput) -> List[Finding]:
# Conservative approximation: gather all reached definitions properly
# Ideally needs full flow analysis. Simple version:
# Collect all globally defined vars (in any reachable node) vs used vars.
# STRICTER: Ensure defined in *some* path before usage is hard in purely static analysis without walking paths.
# REQ SPEC: "any var in vars_used must exist in vars_defined in some node reachable BEFORE"
# We'll use a simplification: Check if variable is defined ANYWHERE in the flow first.
# If not -> Definite Error.
findings = []
all_defined_vars = set()
for node in flow.nodes:
if node.vars_defined:
all_defined_vars.update(node.vars_defined)
for node in flow.nodes:
if node.vars_used:
for var in node.vars_used:
if var not in all_defined_vars:
findings.append(Finding(
code="var_used_not_defined",
node_id=node.id,
message=f"Variable '{var}' used in '{node.id}' is never defined in flow",
fix=f"Define '{var}' in a SetVar or Input node",
severity="error"
))
return findings
@staticmethod
def check_prompt_too_long(flow: FlowInput) -> List[Finding]:
findings = []
for node in flow.nodes:
if node.prompt and len(node.prompt) > 220:
findings.append(Finding(
code="prompt_too_long",
node_id=node.id,
message=f"Prompt in '{node.id}' exceeds 220 chars ({len(node.prompt)})",
fix="Shorten the prompt text",
severity="warning"
))
return findings
@staticmethod
def check_multi_question(flow: FlowInput) -> List[Finding]:
findings = []
for node in flow.nodes:
if node.prompt and node.prompt.count('?') > 2:
findings.append(Finding(
code="multi_question",
node_id=node.id,
message=f"Prompt in '{node.id}' asks too many questions (>2)",
fix="Split into multiple nodes",
severity="warning"
))
return findings
@staticmethod
def check_no_confirmation(flow: FlowInput) -> List[Finding]:
findings = []
# Heuristic: Transfer/Cancel/Pay needs preceding Confirm
sensitive_actions = {"Transfer"}
for node in flow.nodes:
# Check implicit actions via types or explicit meta actions
action = node.meta.get("action")
if node.type in sensitive_actions or action in ["cancel", "pay", "transfer"]:
# Check incoming neighbors
incoming = [e.from_node for e in flow.edges if e.to_node == node.id]
# Check if ANY predecessor looks like a confirmation
# This is a weak check, but matches the "Simple Heuristic" requirements
# Ideally check prompts of parents
has_confirm = False
parent_nodes = [n for n in flow.nodes if n.id in incoming]
for p in parent_nodes:
text = (p.prompt or "").lower() + (p.label if hasattr(p, 'label') else "")
# Also check edge labels connecting them?
if "confirm" in text or "seguro" in text or "sure" in text:
has_confirm = True
break
if not has_confirm and incoming: # Only warn if it's connected
findings.append(Finding(
code="no_confirmation_for_irreversible_action",
node_id=node.id,
message=f"Sensitive action in '{node.id}' missing preceding confirmation",
fix="Add a confirmation Ask node before this step",
severity="warning"
))
return findings
def collect_findings(flow: FlowInput) -> (List[Finding], List[Finding]):
errors = []
warnings = []
# Run Errors
errors.extend(LinterRules.check_unreachable_node(flow))
errors.extend(LinterRules.check_dead_end_not_end(flow))
errors.extend(LinterRules.check_missing_fallback_on_ask(flow))
errors.extend(LinterRules.check_var_used_not_defined(flow))
# Run Warnings
warnings.extend(LinterRules.check_prompt_too_long(flow))
warnings.extend(LinterRules.check_multi_question(flow))
warnings.extend(LinterRules.check_no_confirmation(flow))
return errors, warnings