#!/usr/bin/env python3
"""Test ClipSense API directly"""
import requests
import time
import sys
from pathlib import Path
API_KEY = "cs_sk_pNQhgId_0X8P-gt010CkRfZ4cgVVAejH9JQj_LpPmYg"
API_BASE = "https://api.clipsense.app/api/v1"
VIDEO_PATH = "/Users/jerlitaburanday/Pictures/ASUGradBlastClip.mp4"
def main():
print(f"š¬ Testing ClipSense API with video: {VIDEO_PATH}")
# Check if video exists
video_file = Path(VIDEO_PATH)
if not video_file.exists():
print(f"ā Video file not found: {VIDEO_PATH}")
sys.exit(1)
file_size_mb = video_file.stat().st_size / (1024 * 1024)
print(f"š Video size: {file_size_mb:.2f} MB")
headers = {"Authorization": f"Bearer {API_KEY}"}
# Step 1: Get presigned URL
print("\nš¤ Step 1: Getting presigned upload URL...")
presign_response = requests.post(
f"{API_BASE}/upload/presign",
headers=headers,
json={
"filename": video_file.name,
"content_type": "video/mp4",
"file_size": video_file.stat().st_size
}
)
if presign_response.status_code != 200:
print(f"ā Presign failed: {presign_response.status_code}")
print(presign_response.text)
sys.exit(1)
presign_data = presign_response.json()
print(f"ā
Got upload URL: {presign_data['upload_url']}")
print(f"š Video key: {presign_data['video_key']}")
# Step 2: Upload video to storage
print("\nš¤ Step 2: Uploading video to storage...")
with open(VIDEO_PATH, 'rb') as f:
upload_response = requests.put(
presign_data['upload_url'],
data=f,
headers={'Content-Type': 'video/mp4'}
)
if upload_response.status_code not in [200, 204]:
print(f"ā Upload failed: {upload_response.status_code}")
print(upload_response.text)
sys.exit(1)
print("ā
Video uploaded successfully!")
# Step 3: Start analysis
print("\nš Step 3: Starting analysis job...")
analyze_response = requests.post(
f"{API_BASE}/analyze/start",
headers=headers,
json={
"video_key": presign_data['video_key'],
"filename": video_file.name,
"question": "What's happening in this video?",
"analysis_type": "mobile_bug"
}
)
if analyze_response.status_code != 200:
print(f"ā Analysis start failed: {analyze_response.status_code}")
print(analyze_response.text)
sys.exit(1)
job_data = analyze_response.json()
job_id = job_data['id']
print(f"ā
Analysis job started: {job_id}")
# Step 4: Poll for results
print("\nā³ Step 4: Waiting for analysis to complete...")
max_attempts = 120 # 10 minutes
for attempt in range(max_attempts):
time.sleep(5)
status_response = requests.get(
f"{API_BASE}/analyze/jobs/{job_id}/status",
headers=headers
)
if status_response.status_code != 200:
print(f"ā Status check failed: {status_response.status_code}")
continue
status_data = status_response.json()
status = status_data['status']
print(f"ā³ [{attempt * 5}s] Status: {status}")
if status == "completed":
print("\nā
Analysis completed!")
# Get full job details
job_response = requests.get(
f"{API_BASE}/analyze/jobs/{job_id}",
headers=headers
)
if job_response.status_code == 200:
full_job = job_response.json()
print("\n" + "="*60)
print("š ANALYSIS RESULTS")
print("="*60)
print(f"\n{full_job['result']['response']}")
print("\n" + "="*60)
print(f"š Frames: {full_job.get('frames_extracted', 'N/A')}")
print(f"š° Tokens: {full_job.get('tokens_used', 'N/A')}")
print(f"šµ Cost: ${full_job.get('cost_total', 0):.4f}")
print("="*60)
return
elif status == "failed":
print(f"\nā Analysis failed: {status_data.get('error_message', 'Unknown error')}")
sys.exit(1)
print("\nā° Timeout: Analysis took too long")
sys.exit(1)
if __name__ == "__main__":
main()