"""
Simple test script for IRIS case management client.
Tests the following operations:
1. Create a case
2. Add a comment
3. Add notes (additional comments)
4. Add assets
5. Add IOC (observables)
6. Add tasks
All operations use the Python functions directly from src/integrations/case_management/iris/*
"""
import os
import sys
from datetime import datetime
# Add project root to path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../.."))
sys.path.insert(0, project_root)
from src.core.config_storage import load_config_from_file
from src.integrations.case_management.iris.iris_client import IRISCaseManagementClient
from src.api.case_management import Case, CaseStatus, CasePriority, CaseObservable
def main():
"""Run all test operations."""
print("=" * 70)
print("IRIS Case Management Client Test")
print("=" * 70)
# Load configuration
try:
config = load_config_from_file("config.json")
if not config.iris or not config.iris.base_url or not config.iris.api_key:
print("ERROR: IRIS configuration not found in config.json")
return
except Exception as e:
print(f"ERROR: Failed to load config: {e}")
return
# Create IRIS client
try:
client = IRISCaseManagementClient.from_config(config)
print("✓ IRIS client created successfully")
except Exception as e:
print(f"ERROR: Failed to create IRIS client: {e}")
return
# Test 1: Create a case
print("\n" + "=" * 70)
print("Test 1: Create a case")
print("=" * 70)
try:
test_case = Case(
id=None,
title="Test Case - IRIS Client",
description="This is a test case created by the IRIS client test script",
status=CaseStatus.OPEN,
priority=CasePriority.MEDIUM,
tags=["test", "automation"],
created_at=datetime.now(),
)
created_case = client.create_case(test_case)
case_id = created_case.id
print(f"✓ Case created successfully")
print(f" Case ID: {case_id}")
print(f" Title: {created_case.title}")
print(f" Status: {created_case.status}")
print(f" Priority: {created_case.priority}")
except Exception as e:
print(f"✗ Failed to create case: {e}")
import traceback
traceback.print_exc()
return
# Test 2: Add a comment
print("\n" + "=" * 70)
print("Test 2: Add a comment")
print("=" * 70)
print("Note: Comments may not be supported in this IRIS API version")
try:
comment = client.add_case_comment(
case_id=case_id,
content="This is a test comment added to the case",
author="test-automation",
)
print(f"✓ Comment added successfully")
print(f" Comment ID: {comment.id}")
print(f" Content: {comment.content[:50]}...")
print(f" Author: {comment.author}")
except Exception as e:
print(f"⚠ Comment endpoint not available (may be API version issue): {e}")
print(" This is expected if IRIS API doesn't support comments endpoint")
# Test 3: Add notes (additional comments)
print("\n" + "=" * 70)
print("Test 3: Add notes (additional comments)")
print("=" * 70)
print("Note: Notes use the same comment endpoint - may not be supported")
try:
note1 = client.add_case_comment(
case_id=case_id,
content="First note: Initial investigation started",
author="analyst-1",
)
print(f"✓ Note 1 added successfully")
print(f" Note ID: {note1.id}")
note2 = client.add_case_comment(
case_id=case_id,
content="Second note: Reviewing evidence and logs",
author="analyst-2",
)
print(f"✓ Note 2 added successfully")
print(f" Note ID: {note2.id}")
except Exception as e:
print(f"⚠ Notes endpoint not available (may be API version issue): {e}")
print(" This is expected if IRIS API doesn't support comments endpoint")
# Test 4: Add assets
print("\n" + "=" * 70)
print("Test 4: Add assets")
print("=" * 70)
try:
asset1 = client.add_case_asset(
case_id=case_id,
asset_name="workstation-01",
asset_type="endpoint",
description="Suspicious workstation under investigation",
ip_address="192.168.1.100",
tags=["compromised", "investigation"],
)
print(f"✓ Asset 1 added successfully")
print(f" Asset ID: {asset1.get('asset_id', 'N/A')}")
print(f" Asset Name: {asset1.get('asset_name', 'N/A')}")
asset2 = client.add_case_asset(
case_id=case_id,
asset_name="server-01",
asset_type="server",
description="Affected web server",
ip_address="10.0.0.50",
tags=["production"],
)
print(f"✓ Asset 2 added successfully")
print(f" Asset ID: {asset2.get('asset_id', 'N/A')}")
print(f" Asset Name: {asset2.get('asset_name', 'N/A')}")
except Exception as e:
print(f"✗ Failed to add assets: {e}")
import traceback
traceback.print_exc()
# Test 5: Add IOC (observables)
print("\n" + "=" * 70)
print("Test 5: Add IOC (observables)")
print("=" * 70)
try:
observable1 = CaseObservable(
type="ip",
value="192.168.1.100",
description="Suspicious source IP address",
tags=["malicious", "c2"],
)
ioc1 = client.add_case_observable(case_id=case_id, observable=observable1)
print(f"✓ IOC 1 (IP) added successfully")
print(f" Type: {ioc1.type}")
print(f" Value: {ioc1.value}")
observable2 = CaseObservable(
type="domain",
value="malicious.example.com",
description="C2 domain",
tags=["c2", "phishing"],
)
ioc2 = client.add_case_observable(case_id=case_id, observable=observable2)
print(f"✓ IOC 2 (Domain) added successfully")
print(f" Type: {ioc2.type}")
print(f" Value: {ioc2.value}")
observable3 = CaseObservable(
type="hash",
value="abc123def4567890123456789012345678901234567890123456789012345678",
description="Malware hash (SHA256)",
tags=["malware"],
)
ioc3 = client.add_case_observable(case_id=case_id, observable=observable3)
print(f"✓ IOC 3 (Hash) added successfully")
print(f" Type: {ioc3.type}")
print(f" Value: {ioc3.value[:50]}...")
except Exception as e:
print(f"✗ Failed to add IOC: {e}")
import traceback
traceback.print_exc()
# Test 6: Add tasks
print("\n" + "=" * 70)
print("Test 6: Add tasks")
print("=" * 70)
print("Note: Tasks may have API version compatibility issues")
task1_id = None
task2_id = None
task3_id = None
try:
task1 = client.add_case_task(
case_id=case_id,
title="Block malicious IP",
description="Add 192.168.1.100 to firewall blocklist",
priority="high",
status="pending",
)
# IRIS returns task data directly (not wrapped in 'data' field)
task1_id = task1.get('id') or task1.get('task_id')
print(f"✓ Task 1 added successfully")
print(f" Task ID: {task1_id}")
print(f" Title: {task1.get('task_title', 'N/A')}")
print(f" Status: {task1.get('task_status_id', 'N/A')}")
task2 = client.add_case_task(
case_id=case_id,
title="Investigate domain",
description="Perform DNS analysis on malicious.example.com",
priority="medium",
status="pending",
)
task2_id = task2.get('id') or task2.get('task_id')
print(f"✓ Task 2 added successfully")
print(f" Task ID: {task2_id}")
print(f" Title: {task2.get('task_title', 'N/A')}")
print(f" Status: {task2.get('task_status_id', 'N/A')}")
task3 = client.add_case_task(
case_id=case_id,
title="Review evidence",
description="Analyze collected evidence files",
priority="high",
status="in_progress",
)
task3_id = task3.get('id') or task3.get('task_id')
print(f"✓ Task 3 added successfully")
print(f" Task ID: {task3_id}")
print(f" Title: {task3.get('task_title', 'N/A')}")
print(f" Status: {task3.get('task_status_id', 'N/A')}")
except Exception as e:
print(f"⚠ Task creation failed (may be API version issue): {e}")
print(" IRIS API may have restrictions on task creation")
import traceback
traceback.print_exc()
# Test 6b: Update task status
print("\n" + "=" * 70)
print("Test 6b: Update task status")
print("=" * 70)
if task1_id:
try:
# Update task1 from pending to in_progress
print(f"\nUpdating task {task1_id} from 'pending' to 'in_progress'...")
updated_task1 = client.update_case_task_status(
case_id=case_id,
task_id=str(task1_id),
status="in_progress",
)
print(f"✓ Task status updated successfully")
print(f" Task ID: {updated_task1.get('data', {}).get('id', task1_id)}")
print(f" Status ID: {updated_task1.get('data', {}).get('task_status_id', 'N/A')}")
print(f" Message: {updated_task1.get('message', 'N/A')}")
# Update task1 from in_progress to completed
print(f"\nUpdating task {task1_id} from 'in_progress' to 'completed'...")
completed_task1 = client.update_case_task_status(
case_id=case_id,
task_id=str(task1_id),
status="completed",
)
print(f"✓ Task status updated successfully")
print(f" Task ID: {completed_task1.get('data', {}).get('id', task1_id)}")
print(f" Status ID: {completed_task1.get('data', {}).get('task_status_id', 'N/A')}")
print(f" Message: {completed_task1.get('message', 'N/A')}")
# Test updating task2 from pending to in_progress
if task2_id:
print(f"\nUpdating task {task2_id} from 'pending' to 'in_progress'...")
updated_task2 = client.update_case_task_status(
case_id=case_id,
task_id=str(task2_id),
status="in_progress",
)
print(f"✓ Task status updated successfully")
print(f" Task ID: {updated_task2.get('data', {}).get('id', task2_id)}")
print(f" Status ID: {updated_task2.get('data', {}).get('task_status_id', 'N/A')}")
print(f" Message: {updated_task2.get('message', 'N/A')}")
# List all tasks to verify status changes
print(f"\nListing all tasks to verify status changes...")
all_tasks = client.list_case_tasks(case_id)
print(f"✓ Found {len(all_tasks)} tasks")
for task in all_tasks:
task_id_val = task.get('id') or task.get('task_id')
status_id = task.get('task_status_id')
status_name = {1: 'pending', 2: 'in_progress', 3: 'completed', 4: 'blocked'}.get(status_id, 'unknown')
print(f" Task {task_id_val}: '{task.get('task_title', 'N/A')}' - Status: {status_name} (ID: {status_id})")
except Exception as e:
print(f"✗ Task status update failed: {e}")
import traceback
traceback.print_exc()
else:
print("⚠ Skipping task status update test - no task IDs available")
# Test 7: Add timeline events
print("\n" + "=" * 70)
print("Test 7: Add timeline events")
print("=" * 70)
try:
event1 = client.add_case_timeline_event(
case_id=case_id,
title="Case created",
content="Case created and initial triage started.",
tags=["status:open", "phase:initial"],
)
print("✓ Timeline event 1 added")
event2 = client.add_case_timeline_event(
case_id=case_id,
title="IOC analysis completed",
content="IOC triage completed and indicators reviewed.",
tags=["ioc", "analysis"],
)
print("✓ Timeline event 2 added")
event3 = client.add_case_timeline_event(
case_id=case_id,
title="Containment actions",
content="Containment actions executed on affected assets.",
tags=["containment", "response"],
)
print("✓ Timeline event 3 added")
# Fetch timeline events to verify they exist
events = client.list_case_timeline_events(case_id)
print(f"Timeline events count: {len(events)}")
if events:
first_titles = [e.get('event_title') for e in events[:3]]
print(f" First events: {first_titles}")
except Exception as e:
print(f"⚠ Timeline events creation failed (may be API version issue): {e}")
print(" IRIS API may have restrictions on timeline endpoints")
# Test 8: Add evidence with types
print("\n" + "=" * 70)
print("Test 8: Add evidence with types")
print("=" * 70)
try:
import tempfile
# Pick a valid evidence type from IRIS, preferring log-related types
types = client.list_evidence_types()
chosen_type = None
for t in types:
name = t.get("name", "")
if name.startswith("Logs - "):
chosen_type = name
break
if not chosen_type:
if types:
chosen_type = types[0]["name"]
else:
raise RuntimeError("No evidence types available in IRIS")
print(f"Using evidence type: {chosen_type}")
# Create a temporary evidence file
tmp = tempfile.NamedTemporaryFile("w+", delete=False, suffix=".txt")
tmp.write("This is a test evidence file for IRIS integration.\n")
tmp.flush()
tmp_path = tmp.name
tmp.close()
evidence_result = client.add_case_evidence(
case_id=case_id,
file_path=tmp_path,
description="Test evidence with explicit type",
evidence_type=chosen_type,
custom_attributes={"source": "integration-test"},
)
print("✓ Evidence added successfully")
print(f" Evidence ID: {evidence_result.get('id', 'N/A')}")
print(f" Filename: {evidence_result.get('filename', 'N/A')}")
# List evidences to verify
evidences = client.list_case_evidence(case_id)
print(f"Case evidences count: {len(evidences)}")
for ev in evidences[:3]:
print(f" Evidence: id={ev.get('id')}, filename={ev.get('filename')}, type_id={ev.get('type_id')}")
except Exception as e:
print(f"⚠ Evidence creation failed (may be API version issue): {e}")
print(" IRIS API may have restrictions on evidence endpoints")
# Summary
print("\n" + "=" * 70)
print("Test Summary")
print("=" * 70)
print(f"Case ID: {case_id}")
print("\nTest Results:")
print(" ✓ Case creation: SUCCESS")
print(" ⚠ Comments/Notes: May not be supported (API version issue)")
print(" ✓ Assets: SUCCESS")
print(" ✓ IOC (Observables): SUCCESS")
print(" ⚠ Tasks: May have API compatibility issues")
if task1_id:
print(" ✓ Task status updates: SUCCESS")
else:
print(" ⚠ Task status updates: Skipped (no tasks created)")
print("\n✓ All tests completed")
print("=" * 70)
if __name__ == "__main__":
main()