Skip to main content
Glama
leeguooooo
by leeguooooo
test_n8n_api.py3.48 kB
#!/usr/bin/env python3 """ 测试 n8n API 连接 """ import os import sys import requests from pathlib import Path # 添加项目根目录到 Python 路径 repo_root = Path(__file__).resolve().parents[1] sys.path.insert(0, str(repo_root)) # 加载 .env 文件(如果存在) try: from dotenv import load_dotenv env_path = repo_root / '.env' if env_path.exists(): load_dotenv(env_path) print(f"📝 已加载 .env 文件\n") except ImportError: pass # python-dotenv 不是必需的 def test_n8n_connection(): """测试 n8n API 连接""" n8n_url = os.getenv('N8N_URL', 'https://n8n.ifoodme.com') n8n_api_key = os.getenv('N8N_API_KEY') if not n8n_api_key: print("❌ 错误: 未设置 N8N_API_KEY 环境变量") print("\n请设置:") print(" export N8N_API_KEY='your_api_key'") return False print(f"🔍 测试 n8n API 连接...") print(f" URL: {n8n_url}") print(f" API Key: {'*' * 20}{n8n_api_key[-8:]}") print() headers = { 'X-N8N-API-KEY': n8n_api_key, 'Content-Type': 'application/json', 'Accept': 'application/json' } try: # 测试获取工作流列表 response = requests.get( f'{n8n_url}/api/v1/workflows', headers=headers, timeout=10 ) if response.status_code == 200: workflows = response.json() workflow_list = workflows.get('data', []) print(f"✅ n8n API 连接成功!") print(f" 当前工作流数量: {len(workflow_list)}") if workflow_list: print(f"\n📋 现有工作流:") for wf in workflow_list[:5]: # 只显示前5个 status = "🟢 激活" if wf.get('active') else "⚪ 未激活" print(f" {status} {wf.get('name')} (ID: {wf.get('id')})") if len(workflow_list) > 5: print(f" ... 还有 {len(workflow_list) - 5} 个工作流") print(f"\n✨ 可以开始导入工作流了!") print(f" 运行: ./setup_n8n.sh") return True elif response.status_code == 401: print(f"❌ 认证失败 (HTTP 401)") print(f" 请检查 N8N_API_KEY 是否正确") return False elif response.status_code == 403: print(f"❌ 权限不足 (HTTP 403)") print(f" 请确认 API Key 有足够的权限") return False else: print(f"❌ 请求失败: HTTP {response.status_code}") print(f" 响应: {response.text}") return False except requests.exceptions.ConnectionError: print(f"❌ 连接失败: 无法连接到 {n8n_url}") print(f" 请检查:") print(f" 1. URL 是否正确") print(f" 2. 网络连接是否正常") print(f" 3. n8n 服务是否运行") return False except requests.exceptions.Timeout: print(f"❌ 请求超时") print(f" 请检查网络连接") return False except Exception as e: print(f"❌ 发生错误: {e}") return False if __name__ == "__main__": print("🚀 n8n API 连接测试\n") success = test_n8n_connection() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server