We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/shifusen329/n8n-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
# Configuration
WORKFLOWS_DIR = Path(__file__).parent.parent.parent / "workflows"
OUTPUT_DIR = Path(__file__).parent.parent.parent / "processed-workflows"
# Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def extract_category(filename: str) -> str:
"""Extract category from filename."""
parts = filename.split(":")
return parts[0] if len(parts) > 1 else "uncategorized"
def extract_name(filename: str) -> str:
"""Extract name from filename."""
parts = filename.split(":")
if len(parts) > 1:
return parts[1].replace(".json", "")
return filename.replace(".json", "")
def get_node_type_str(node: Dict[str, Any]) -> Optional[str]:
"""Get the node type as a string, handling both string and dict types."""
node_type = node.get("type")
if not isinstance(node_type, (str, bytes)):
try:
return json.dumps(node_type)
except TypeError:
return str(node_type)
return node_type
def generate_description(workflow: Dict[str, Any]) -> str:
"""Generate a description for a workflow based on its content."""
description = []
if workflow.get("name"):
description.append(f"Workflow Name: {workflow['name']}\n")
node_types: Dict[str, int] = {}
node_names: Set[str] = set()
if isinstance(workflow.get("nodes"), list):
for node in workflow["nodes"]:
node_type = get_node_type_str(node)
if node_type:
node_types[node_type] = node_types.get(node_type, 0) + 1
if node.get("name"):
node_names.add(node["name"])
if node_types:
description.append("Node Types:")
for type, count in node_types.items():
description.append(f"- {type}: {count}")
if node_names:
description.append("\nNode Names:")
for name in sorted(list(node_names)):
description.append(f"- {name}")
sticky_notes = [
node for node in workflow.get("nodes", []) if node.get("type") == "n8n-nodes-base.stickyNote"
]
if sticky_notes:
description.append("\nWorkflow Documentation:")
for note in sticky_notes:
if note.get("parameters", {}).get("content"):
description.append(note["parameters"]["content"])
return "\n".join(description)
def extract_tags(workflow: Dict[str, Any]) -> List[str]:
"""Extract tags from a workflow."""
tags: Set[str] = set()
if isinstance(workflow.get("tags"), list):
for tag in workflow["tags"]:
if isinstance(tag, dict) and "name" in tag:
tags.add(tag["name"])
elif isinstance(tag, str):
tags.add(tag)
if isinstance(workflow.get("nodes"), list):
for node in workflow["nodes"]:
node_type = get_node_type_str(node)
if node_type:
base_type = node_type.split(".")[-1]
if base_type:
tags.add(base_type)
service_tags = [
"gmail", "google", "openai", "langchain", "webhook",
"http", "database", "postgres", "supabase"
]
for service in service_tags:
if service in node_type:
tags.add(service)
return sorted(list(tags))
def analyze_complexity(workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze workflow complexity."""
metrics = {
"nodeCount": 0,
"connectionCount": 0,
"uniqueNodeTypes": 0,
"complexity": "simple",
}
if isinstance(workflow.get("nodes"), list):
metrics["nodeCount"] = len(workflow["nodes"])
node_type_set = {get_node_type_str(node) for node in workflow["nodes"] if get_node_type_str(node)}
metrics["uniqueNodeTypes"] = len(node_type_set)
if isinstance(workflow.get("connections"), dict):
connection_count = 0
for conn in workflow["connections"].values():
if isinstance(conn.get("main"), list):
for main_conn in conn["main"]:
if isinstance(main_conn, list):
connection_count += len(main_conn)
metrics["connectionCount"] = connection_count
if metrics["nodeCount"] > 15 or metrics["connectionCount"] > 20:
metrics["complexity"] = "complex"
elif metrics["nodeCount"] > 7 or metrics["connectionCount"] > 10:
metrics["complexity"] = "moderate"
return metrics
def process_workflow(file_path: Path) -> Optional[Dict[str, Any]]:
"""Process a single workflow file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
if not content:
print(f"Warning: Skipping empty file {file_path}")
return None
workflow = json.loads(content)
filename = file_path.name
category = extract_category(filename)
name = extract_name(filename)
description = generate_description(workflow)
tags = extract_tags(workflow)
complexity = analyze_complexity(workflow)
enriched_workflow = {
"id": workflow.get("id") or f"generated-{int(Path.stat(file_path).st_ctime)}",
"originalFilename": filename,
"category": category,
"name": name,
"description": description,
"tags": tags,
"complexity": complexity,
"originalWorkflow": workflow,
}
return enriched_workflow
except Exception as e:
print(f"Error processing workflow {file_path}: {e}")
return None
def process_all_workflows():
"""Main function to process all workflows."""
try:
workflow_files = [f for f in WORKFLOWS_DIR.glob("*.json")]
print(f"Found {len(workflow_files)} workflow files to process")
processed_workflows = []
for file_path in workflow_files:
processed_workflow = process_workflow(file_path)
if processed_workflow:
processed_workflows.append(processed_workflow)
output_path = OUTPUT_DIR / f"{processed_workflow['id']}.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(processed_workflow, f, indent=2)
summary_path = OUTPUT_DIR / "workflows-summary.json"
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(processed_workflows, f, indent=2)
print(f"Successfully processed {len(processed_workflows)} workflows")
print(f"Results saved to {OUTPUT_DIR}")
return processed_workflows
except Exception as e:
print(f"Error processing workflows: {e}")
return []
if __name__ == "__main__":
process_all_workflows()