#!/usr/bin/env python3
"""Test ClipSense video analysis - TEST #3"""
import requests
import os
import time
API_KEY = "cs_sk_pNQhgId_0X8P-gt010CkRfZ4cgVVAejH9JQj_LpPmYg"
BASE_URL = "https://api.clipsense.app/api/v1"
# TEST #3: Tableau Question 2 video
video_path = "/Users/jerlitaburanday/Desktop/ASSORTED FILES/SCHOOL ASSORTED FILES/FALL 2022/TGM 557 Analytics/Tableau Vids/SB1 Q 2.mov"
question = "Analyze this Tableau tutorial video and explain what is being demonstrated."
def get_content_type(filename):
ext = filename.lower().split(".")[-1]
types = {"mp4": "video/mp4", "mov": "video/quicktime", "webm": "video/webm", "avi": "video/x-msvideo"}
return types.get(ext, "video/mp4")
# Check if video exists
if not os.path.exists(video_path):
print(f"ā Video not found: {video_path}")
exit(1)
file_size = os.path.getsize(video_path)
filename = os.path.basename(video_path)
content_type = get_content_type(filename)
print(f"š¹ Video: {video_path}")
print(f"š¦ Size: {file_size / (1024*1024):.2f} MB")
print(f"š Type: {content_type}")
print(f"ā Question: {question}")
print(f"\\nš Getting presigned URL...")
# Step 1: Get presigned URL
presign_response = requests.post(
f"{BASE_URL}/upload/presign",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"filename": filename, "content_type": content_type, "file_size": file_size}
)
if presign_response.status_code != 200:
print(f"ā Presign failed: {presign_response.status_code}")
print(f"Response: {presign_response.text}")
exit(1)
presign_data = presign_response.json()
upload_url = presign_data["upload_url"]
video_key = presign_data["video_key"]
print(f"ā
Video key: {video_key}")
print(f"\\nš¤ Uploading video...")
# Step 2: Upload video
with open(video_path, 'rb') as f:
upload_response = requests.put(upload_url, data=f, headers={'Content-Type': content_type})
if upload_response.status_code not in [200, 204]:
print(f"ā Upload failed: {upload_response.status_code}")
print(f"Response: {upload_response.text}")
exit(1)
print(f"ā
Upload complete!")
print(f"\\nš¤ Starting analysis...")
# Step 3: Start analysis
analyze_response = requests.post(
f"{BASE_URL}/analyze/start",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"video_key": video_key, "filename": filename, "question": question, "analysis_type": "mobile_bug"}
)
if analyze_response.status_code != 200:
print(f"ā Analysis start failed: {analyze_response.status_code}")
print(f"Response: {analyze_response.text}")
exit(1)
job_data = analyze_response.json()
job_id = job_data["id"]
print(f"ā
Job started: {job_id}")
print(f"\\nā³ Polling for results...")
# Step 4: Poll for results
max_attempts = 120
attempt = 0
while attempt < max_attempts:
status_response = requests.get(
f"{BASE_URL}/analyze/jobs/{job_id}/status",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if status_response.status_code != 200:
print(f"ā Status check failed: {status_response.status_code}")
exit(1)
status_data = status_response.json()
status = status_data["status"]
print(f" Attempt {attempt + 1}/{max_attempts}: {status}")
if status == "completed":
job_response = requests.get(
f"{BASE_URL}/analyze/jobs/{job_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if job_response.status_code != 200:
print(f"ā Failed to get job details: {job_response.status_code}")
exit(1)
result = job_response.json()
print(f"\\nā
Analysis complete!")
print(f"\\n{'='*80}")
print(f"ANALYSIS RESULT:")
print(f"{'='*80}")
print(result.get("result", {}).get("response", "No analysis returned"))
print(f"\\n{'='*80}")
print(f"Details:")
print(f" - Frames analyzed: {result.get('frames_extracted', 'N/A')}")
print(f" - Tokens used: {result.get('tokens_used', 0)}")
print(f" - Cost: ${result.get('cost_total', 0):.4f}")
print(f"{'='*80}\\n")
exit(0)
elif status == "failed":
error = status_data.get("error_message", "Unknown error")
print(f"\\nā Analysis failed: {error}")
exit(1)
time.sleep(5)
attempt += 1
print(f"\\nā Analysis timeout after {max_attempts * 5} seconds")