Skip to main content
Glama
validate_workflow.py15.7 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import json import os import re import subprocess import sys from typing import List, Tuple class Color: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" BOLD = "\033[1m" END = "\033[0m" def _disable_colors(): Color.RED = "" Color.GREEN = "" Color.YELLOW = "" Color.BLUE = "" Color.BOLD = "" Color.END = "" def print_error(message: str) -> None: print(f"{Color.RED}{Color.BOLD}ERROR:{Color.END} {message}") def print_warning(message: str) -> None: print(f"{Color.YELLOW}{Color.BOLD}WARNING:{Color.END} {message}") def print_success(message: str) -> None: print(f"{Color.GREEN}{Color.BOLD}SUCCESS:{Color.END} {message}") def print_info(message: str) -> None: print(f"{Color.BLUE}{Color.BOLD}INFO:{Color.END} {message}") def run_command(command: List[str]) -> Tuple[str, int]: try: result = subprocess.run( command, check=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) return result.stdout.strip(), result.returncode except FileNotFoundError: return "", 127 except Exception as e: return str(e), 1 # ------------------------ Commit Message Validation ------------------------- # def _allowed_prefixes_from_env(defaults: List[str]) -> List[str]: raw = os.environ.get("ALLOWED_PREFIXES", "") if not raw: return defaults # Split on comma or space, keep suffix ':' if provided; normalize spacing toks = [t.strip() for t in re.split(r"[\s,]+", raw) if t.strip()] # Ensure they end with ':' for our startswith check norm = [t if t.endswith(":") else t + ":" for t in toks] return norm or defaults def validate_commit_message_text(msg: str, allow_no_issue: bool = False) -> bool: """Validate commit message against repo conventions. Accepts either our "Prefix: subject" style or Conventional Commits (e.g., "feat(scope): subject"). Requires an issue reference unless bypassed. """ first = (msg.splitlines()[0] if msg else "").strip() max_len = int(os.environ.get("COMMIT_TITLE_MAXLEN", "0")) # 0 = disabled if max_len and len(first) > max_len: print_warning(f"Commit title is {len(first)} chars (>{max_len}). Consider tightening.") # Auto-allow merges/reverts/releases if first.startswith("Merge ") or first.startswith("Revert ") or first.startswith("Release "): print_info("Merge/Revert/Release commit detected — skipping prefix check") return True # 1) Prefix style (configurable) default_prefixes = [ "Feature:", "Fix:", "Docs:", "Documentation:", "Refactor:", "Test:", "Chore:", "CI:", "Build:", "Perf:", "Hotfix:", "Release:", ] allowed_prefixes = _allowed_prefixes_from_env(default_prefixes) prefix_ok = any(first.lower().startswith(p.lower()) for p in allowed_prefixes) # 2) Conventional Commits style (type(scope)!?: ...) cc_types = ( "feat|fix|docs|refactor|test|chore|ci|build|perf|revert|release|style" ) cc_re = re.compile(rf"^(?:{cc_types})(?:\([^)]+\))?!?:\s+.+", re.IGNORECASE) cc_ok = bool(cc_re.match(first)) if not (prefix_ok or cc_ok): print_error( "Commit message must start with one of: " + ", ".join(allowed_prefixes) + " OR match Conventional Commits (e.g., 'feat: ...')." ) print_error(f"First line was: '{first}'") return False # Issue reference requirement allow_no_issue = allow_no_issue or os.environ.get("ALLOW_NO_ISSUE", "").lower() in {"1", "true", "yes"} if not allow_no_issue: # Accept #123, ISSUE-123 (case-insensitive), or ABC-123 (Jira) issue_ok = bool(re.search(r"(#[0-9]+|issue-[0-9]+|[A-Z][A-Z0-9]+-[0-9]+)", msg, re.IGNORECASE)) if not issue_ok: print_error("Commit message must reference an issue (e.g., #123, issue-123, or ABC-123)") return False print_success("Commit message format OK") return True def validate_commit_message_file(path: str, allow_no_issue: bool = False) -> bool: try: with open(path, "r", encoding="utf-8") as f: msg = f.read() except Exception as e: print_warning(f"Could not read commit message file '{path}': {e}") return False return validate_commit_message_text(msg, allow_no_issue=allow_no_issue) # --------------------------- Branch Name Validation ------------------------- # def get_current_branch() -> str: """Return the current git branch name (or 'HEAD' if detached).""" out, code = run_command(["git", "rev-parse", "--abbrev-ref", "HEAD"]) if code != 0 or not out: print_warning(f"Could not determine current branch: {out}") return "" return out.strip() def validate_branch_name_text(name: str) -> bool: """Validate branch name against repo conventions. Allowed: - protected branches: main, master, develop, staging, production - type/TICKET[-slug] where type ∈ {feature, fix, hotfix, chore, docs, documentation, refactor, test, ci, perf, build, release, codex, claude} and TICKET ∈ {issue-<n>, <JIRAKEY>-<n>, <n>} slug is optional and may contain [a-z0-9._-] """ protected = {"main", "master", "develop", "staging", "production"} if name in protected or name == "HEAD": print_success(f"Branch '{name}' is allowed") return True if " " in name: print_error("Branch name must not contain spaces") return False allowed_types = ( "feature|fix|hotfix|chore|docs|documentation|refactor|test|ci|perf|build|release|codex|claude" ) pattern = re.compile( rf"^(?:{allowed_types})/(?:issue-\d+|[A-Z][A-Z0-9]+-\d+|\d+)(?:-[a-z0-9._-]+)*$" ) if pattern.fullmatch(name): print_success(f"Branch '{name}' matches naming convention") return True print_error( "Invalid branch name. Expected 'type/TICKET[-slug]' where type is one of " "feature, fix, hotfix, chore, docs, documentation, refactor, test, ci, perf, build, release, codex, claude " "and TICKET is issue-<n> / ABC-<n> / <n>." ) lowered = name.lower() if lowered != name: print_info(f"Suggestion: use lowercase for the slug → '{lowered}'") parts = re.split(r"[/_-]", name) guess_issue = next((p for p in parts if p.isdigit()), "123") print_info("Examples: feature/issue-" + guess_issue + "-short-slug | fix/ABC-" + guess_issue) return False # ---------------------------- Issue Closure Check --------------------------- # def _ensure_gh() -> bool: out, code = run_command(["gh", "--version"]) if code in (0,): return True print_error("GitHub CLI ('gh') not found. Install it or ensure it's on PATH.") return False def _append_summary(lines: List[str]) -> None: path = os.environ.get("GITHUB_STEP_SUMMARY") if not path: return try: with open(path, "a", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") except Exception: pass def validate_issue_closure(issue_id: str) -> bool: """Validate issue closure requirements with detailed feedback. - Lists each unchecked Markdown task list item ("- [ ] ...") - Verifies required narrative sections - Checks presence of required label categories (priority/type/status/area) - Writes a rich summary to $GITHUB_STEP_SUMMARY when running in Actions """ if not _ensure_gh(): return False # Get issue details using GitHub CLI output, exit_code = run_command(["gh", "issue", "view", issue_id]) if exit_code != 0: print_error(f"Failed to retrieve issue #{issue_id}: {output}") return False # Parse labels (JSON) once for policy decisions labels_output, _ = run_command(["gh", "issue", "view", issue_id, "--json", "labels,body,title,number"]) try: labels_data = json.loads(labels_output) label_names = [label["name"] for label in labels_data.get("labels", [])] issue_body = labels_data.get("body", "") or "" except (json.JSONDecodeError, KeyError) as e: print_error(f"Failed to parse labels for issue #{issue_id}: {e}") return False labels_lower = [l.lower() for l in label_names] # Label categories has_priority = any(re.fullmatch(r"p[0-5]", l) for l in labels_lower) has_type = any(l.startswith("type:") for l in labels_lower) or any( l in ["bug", "feature", "enhancement", "documentation", "test", "chore", "refactor", "ci", "dependencies"] for l in labels_lower ) has_area = any(l.startswith("area:") for l in labels_lower) has_status = any(l.startswith("status:") for l in labels_lower) missing_label_categories = [] if not has_priority: missing_label_categories.append("Priority (P0–P5)") if not has_type: missing_label_categories.append("Type (type:bug|feature|enhancement|documentation|test|chore|refactor|ci|dependencies)") if not has_area: missing_label_categories.append("Area (area:*)") if not has_status: missing_label_categories.append("Status (status:*)") if missing_label_categories: _append_summary([ f"### Issue #{issue_id} validation", f"- ❌ Missing label categories: {', '.join(missing_label_categories)}" ]) print_error(f"Issue #{issue_id} is missing required label categories: {', '.join(missing_label_categories)}") return False # ----- Acceptance Criteria policy ----- # AC is enforced based on strict-mode and priority # Determine priority bucket priority = next((l.lower() for l in label_names if re.fullmatch(r"P[0-5]", l, re.I)), "").lower() is_high_priority = priority in ("p0", "p1", "p2") # Count checkboxes if any are present task_any = re.compile(r"(?m)^\s*[-*]\s*\[[ xX]\]\s+") task_open = re.compile(r"(?m)^\s*[-*]\s*\[\s\]\s+") total_boxes = len(task_any.findall(issue_body)) unchecked_boxes = len(task_open.findall(issue_body)) # Read global args from env to keep function signature simple strict_mode = os.environ.get("AC_STRICT_MODE", "auto") # off|auto|strict require_sections = os.environ.get("REQUIRE_SECTIONS", "") # comma-separated require_verification = os.environ.get("REQUIRE_VERIFICATION", "false").lower() == "true" def is_strict() -> bool: if strict_mode == "strict": return True if strict_mode == "off": return False # auto mode return is_high_priority or ("enforce:strict" in [l.lower() for l in label_names]) # Enforce sections only if provided if require_sections: sections = [s.strip() for s in require_sections.split(",") if s.strip()] missing = [s for s in sections if s not in issue_body] if missing: if is_strict(): print_error(f"Issue #{issue_id} missing required sections: {', '.join(missing)}") return False else: print_warning(f"Issue #{issue_id} missing optional sections: {', '.join(missing)}") # Enforce verification phrase only if requested if require_verification: if not re.search(r"(✅\s*)?verification\s*:", issue_body, re.IGNORECASE): if is_strict(): print_error(f"Issue #{issue_id} missing verification section") return False else: print_warning(f"Issue #{issue_id} missing verification section") # Acceptance Criteria enforcement if total_boxes > 0 and unchecked_boxes > 0: msg = f"Issue #{issue_id} has {unchecked_boxes} unchecked acceptance criteria" if is_strict(): print_error(msg) return False else: print_warning(msg) _append_summary([ f"### Issue #{issue_id} validation", f"- Labels OK: {', '.join(label_names)}", f"- Acceptance criteria: {'OK' if unchecked_boxes == 0 else f'{unchecked_boxes} unchecked'}", ]) print_success(f"Issue #{issue_id} meets all closure requirements") return True # ---------------------------------- Main ----------------------------------- # def main() -> int: parser = argparse.ArgumentParser(description="Validate Attio MCP workflow requirements") parser.add_argument("--commit-msg-file", metavar="PATH", help="Validate a commit message file (Husky commit-msg hook passes $1)") parser.add_argument("--pre-commit", action="store_true", help="Run pre-commit validations") parser.add_argument("--validate-branch", action="store_true", help="Validate current git branch name") parser.add_argument("--issue-close", help="Validate issue closure requirements for the specified issue ID") parser.add_argument("--strict-mode", choices=["off","auto","strict"], default="auto") parser.add_argument("--require-sections", nargs="*", default=[]) parser.add_argument("--require-verification", action="store_true") parser.add_argument("--allow-no-issue", action="store_true", help="Allow commit messages without issue references (or set ALLOW_NO_ISSUE=1)") parser.add_argument("--no-color", action="store_true", help="Disable ANSI colors (or set NO_COLOR=1)") parser.add_argument("--print-config", action="store_true", help="Print current validation config and exit") args = parser.parse_args() # Color control if args.no_color or os.environ.get("NO_COLOR") or not sys.stdout.isatty(): _disable_colors() if args.print_config: defaults = [ "Feature:", "Fix:", "Docs:", "Documentation:", "Refactor:", "Test:", "Chore:", "CI:", "Build:", "Perf:", "Hotfix:", "Release:", ] print_info("Allowed commit prefixes: " + ", ".join(_allowed_prefixes_from_env(defaults))) print_info("Conventional Commit types allowed: feat, fix, docs, refactor, test, chore, ci, build, perf, revert, release, style") print_info("Require issue reference: " + ("no" if args.allow_no_issue or os.environ.get("ALLOW_NO_ISSUE", "").lower() in {"1", "true", "yes"} else "yes")) print_info(f"Strict-mode: {args.strict_mode}") if args.require_sections: print_info("Required sections: " + ", ".join(args.require_sections)) print_info("Require verification: " + ("yes" if args.require_verification else "no")) return 0 # Back-compat: tolerate --pre-commit alone if args.pre_commit and not (args.commit_msg_file or args.issue_close or args.validate_branch): print_info("--pre-commit: no-op (use lint-staged & dedicated hooks)") return 0 if args.validate_branch: branch = get_current_branch() if not branch: return 1 ok = validate_branch_name_text(branch) return 0 if ok else 1 if args.commit_msg_file: ok = validate_commit_message_file(args.commit_msg_file, allow_no_issue=args.allow_no_issue) return 0 if ok else 1 elif args.issue_close: # Pass policy via env (keeps current function signature) os.environ["AC_STRICT_MODE"] = args.strict_mode os.environ["REQUIRE_SECTIONS"] = ",".join(args.require_sections or []) os.environ["REQUIRE_VERIFICATION"] = "true" if args.require_verification else "false" if validate_issue_closure(args.issue_close): return 0 else: return 1 parser.print_help() return 0 if __name__ == "__main__": sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kesslerio/attio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server