test_dlt_status.py•1.82 kB
#!/usr/bin/env python3
"""
Test script to check the status of a DLT pipeline.
"""
import os
import sys
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def test_dlt_pipeline_status():
"""Test checking DLT pipeline status"""
# Import the MCP function
from main import mcp_check_dlt_pipeline_status
# Test parameters (from the previous successful run)
pipeline_id = "7f7acc35-6fb8-4170-8ec3-c11c4ef422b5"
update_id = "9684474a-3569-42bd-b62a-8b80d6a422e9"
print("Testing DLT pipeline status check...")
print(f"Pipeline ID: {pipeline_id}")
print(f"Update ID: {update_id}")
print("-" * 50)
try:
# Call the MCP function
result = mcp_check_dlt_pipeline_status(
pipeline_id=pipeline_id,
update_id=update_id
)
print("Result:")
print(f"Status: {result.get('status')}")
if result.get('status') == 'success':
print(f"✅ DLT Pipeline status retrieved successfully!")
print(f"Pipeline ID: {result.get('pipeline_id')}")
print(f"Update ID: {result.get('update_id')}")
print(f"Life Cycle State: {result.get('life_cycle_state')}")
print(f"Result State: {result.get('result_state')}")
print(f"Pipeline URL: {result.get('pipeline_url')}")
print(f"Update URL: {result.get('update_url')}")
print(f"Message: {result.get('message')}")
else:
print(f"❌ DLT Pipeline status check failed:")
print(f"Error: {result.get('detail')}")
except Exception as e:
print(f"❌ Exception occurred: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_dlt_pipeline_status()