"""
Tests for task-related functionality using dependency injection.
"""
import pytest
import json
def test_search_tasks(tasks_service):
"""Test searching tasks by description."""
# Search for a broader term to find tasks
results = tasks_service.search_tasks("the", limit=20)
# Should have results since we're searching the actual database
if len(results) > 0:
# At least some tasks should have valid content
tasks_with_content = [r for r in results if r["content"] is not None]
if len(tasks_with_content) > 0:
# Verify content structure is correct for parsed tasks
for task in tasks_with_content:
plain_text = task["content"].to_plain_text()
assert isinstance(plain_text, str), "Plain text should be a string"
else:
# If no tasks found, the search still worked (empty result is valid)
assert isinstance(results, list)
def test_search_tasks_no_results(tasks_service):
"""Test task search with no matching results."""
results = tasks_service.search_tasks("nonexistent_task_xyz", limit=10)
assert len(results) == 0
def test_list_tasks(tasks_service):
"""Test listing tasks with default filters."""
results = tasks_service.list_tasks(limit=10)
assert len(results) > 0
assert all("uuid" in r for r in results)
assert all("content" in r for r in results)
assert all("deleted" in r for r in results)
def test_list_tasks_filter_by_priority(tasks_service):
"""Test filtering tasks by priority level."""
# Note: Priority filtering is not implemented in the new schema
# This test needs to be updated or removed based on actual schema
results = tasks_service.list_tasks(limit=10)
assert len(results) >= 0
def test_list_tasks_exclude_deleted(tasks_service):
"""Test that deleted tasks are excluded by default."""
results = tasks_service.list_tasks(include_deleted=False)
assert all(r["deleted"] == False for r in results)
def test_get_tasks_by_note(tasks_service):
"""Test retrieving tasks associated with a note UUID."""
# This test requires tasks that reference notes in their body
# For now, just verify the function works
results = tasks_service.get_tasks_by_note("some-note-uuid")
# Should return empty list if no tasks reference this note
assert isinstance(results, list)
def test_get_tasks_by_note_ordering(tasks_service):
"""Test that tasks are ordered by due date."""
results = tasks_service.get_tasks_by_note("task")
# If we have results, verify ordering by due date
if len(results) > 1:
for i in range(len(results) - 1):
# Earlier due dates should come first
if results[i]["due"] and results[i + 1]["due"]:
assert results[i]["due"] <= results[i + 1]["due"]
def test_tasks_service_with_mock_db():
"""Test that TasksService can work with a mocked DatabaseConnection."""
from unittest.mock import Mock
import sqlite3
# Create an in-memory database with test data
mock_db_connection = Mock()
conn = sqlite3.connect(":memory:")
# Set up complete schema for testing
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Use proper ProseMirror JSON format for content
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Test task description"}]
}])
cursor.execute("""
INSERT INTO tasks (uuid, local_uuid, remote_uuid, deleted, calendar_sync_required, notify_at, attrs, content, due, done, is_scheduled_bullet, parent_uuid)
VALUES ('test-task-uuid', 'local-uuid', 'remote-uuid', 0, 0, NULL, '{}', ?, NULL, 0, 0, NULL)
""", (test_content,))
conn.commit()
# Mock returns the in-memory connection
mock_db_connection.get_readonly_connection.return_value = conn
# Import after mocking to avoid import-time issues
from app.tasks import TasksService
service = TasksService(mock_db_connection)
# Test that service uses the mocked connection
# We need to search for something in the content
results = service.search_tasks("Test", limit=10)
assert len(results) > 0
# Content is now a TaskContent model
assert results[0]["content"] is not None
assert results[0]["content"].to_plain_text() == "Test task description"
assert results[0]["uuid"] == "test-task-uuid"
conn.close()
def test_tasks_service_dependency_injection():
"""Test that TasksService properly receives injected dependencies."""
from unittest.mock import Mock
from app.tasks import TasksService
mock_db_connection = Mock()
service = TasksService(mock_db_connection)
# Verify the dependency was injected
assert service.db_connection is mock_db_connection
def test_task_content_parsing(tasks_service):
"""Test that task content JSON is properly parsed into TaskContent model."""
# Use list_tasks with include filters to get all tasks
results = tasks_service.list_tasks(limit=5, include_deleted=False, include_done=False)
if len(results) > 0:
# Check that content is parsed
for task in results:
if task["content"] is not None:
# Should be TaskContent model, not string
assert hasattr(task["content"], "root"), "Content should be a TaskContent model"
assert isinstance(task["content"].root, list), "Content should have a list of paragraphs"
# Test to_plain_text method
plain_text = task["content"].to_plain_text()
assert isinstance(plain_text, str), "to_plain_text should return a string"
def test_task_attrs_parsing(tasks_service):
"""Test that task attrs JSON is properly parsed into TaskAttrs model."""
# Use list_tasks with include filters to get all tasks
results = tasks_service.list_tasks(limit=5, include_deleted=False, include_done=False)
if len(results) > 0:
# Check that attrs is parsed
for task in results:
if task["attrs"] is not None:
# Should be TaskAttrs model, not string
assert hasattr(task["attrs"], "created_at"), "Attrs should be a TaskAttrs model"
assert hasattr(task["attrs"], "victory_value"), "Attrs should have victory_value field"
# Verify field access works
if task["attrs"].created_at:
assert isinstance(task["attrs"].created_at, int), "created_at should be an int"
def test_task_content_with_links():
"""Test parsing task content that contains links."""
from app.tasks import TasksService
content_json = json.dumps([
{
"type": "paragraph",
"content": [
{"type": "text", "text": "Review the "},
{
"type": "link",
"attrs": {"href": "https://example.com/ticket/123"},
"content": [{"type": "text", "text": "TICKET-123"}]
},
{"type": "text", "text": " today"}
]
}
])
parsed = TasksService._parse_task_content(content_json)
assert parsed is not None
assert len(parsed.root) == 1
assert len(parsed.root[0].content) == 3
# Verify plain text extraction
plain_text = parsed.to_plain_text()
assert "Review the" in plain_text
assert "TICKET-123" in plain_text
assert "today" in plain_text
def test_task_content_with_formatting():
"""Test parsing task content with text formatting marks."""
from app.tasks import TasksService
content_json = json.dumps([
{
"type": "paragraph",
"content": [
{"type": "text", "text": "This is "},
{"type": "text", "marks": [{"type": "em"}], "text": "important"},
{"type": "text", "text": " and "},
{"type": "text", "marks": [{"type": "strong"}], "text": "urgent"}
]
}
])
parsed = TasksService._parse_task_content(content_json)
assert parsed is not None
assert len(parsed.root) == 1
assert len(parsed.root[0].content) == 4
# Check marks
assert parsed.root[0].content[1].marks is not None
assert parsed.root[0].content[1].marks[0].type == "em"
assert parsed.root[0].content[3].marks is not None
assert parsed.root[0].content[3].marks[0].type == "strong"
def test_task_attrs_with_recurring():
"""Test parsing task attrs with recurring task data."""
from app.tasks import TasksService
attrs_json = json.dumps({
"createdAt": 1758663212,
"repeat": "DTSTART:20250421T090000\nRRULE:FREQ=WEEKLY;BYDAY=MO",
"startAt": 1759042800,
"startRule": "P1DT1H",
"streakCount": 4,
"duration": "PT15M",
"flags": "IU",
"points": 15.1,
"pointsUpdatedAt": 1761162794,
"victoryValue": 9.7,
"completedAt": 1761163795
})
parsed = TasksService._parse_task_attrs(attrs_json)
assert parsed is not None
assert parsed.created_at == 1758663212
assert parsed.repeat == "DTSTART:20250421T090000\nRRULE:FREQ=WEEKLY;BYDAY=MO"
assert parsed.streak_count == 4
assert parsed.duration == "PT15M"
assert parsed.flags == "IU"
assert parsed.points == 15.1
assert parsed.victory_value == 9.7
def test_task_attrs_with_references():
"""Test parsing task attrs with note/task references."""
from app.tasks import TasksService
attrs_json = json.dumps({
"createdAt": 1758664814,
"references": ["db177700-018f-11ef-8a67-7e9dd6e19a06", "another-uuid"]
})
parsed = TasksService._parse_task_attrs(attrs_json)
assert parsed is not None
assert parsed.references is not None
assert len(parsed.references) == 2
assert "db177700-018f-11ef-8a67-7e9dd6e19a06" in parsed.references
def test_parse_invalid_json():
"""Test that invalid JSON doesn't crash the parser."""
from app.tasks import TasksService
# Test invalid content JSON
invalid_content = "not valid json"
parsed_content = TasksService._parse_task_content(invalid_content)
assert parsed_content is None
# Test invalid attrs JSON
invalid_attrs = "not valid json"
parsed_attrs = TasksService._parse_task_attrs(invalid_attrs)
assert parsed_attrs is None
# Test None values
assert TasksService._parse_task_content(None) is None
assert TasksService._parse_task_attrs(None) is None
# ============================================================================
# Tests for new task filtering and ordering features
# ============================================================================
def test_get_tasks_by_created_date():
"""Test filtering tasks by creation date."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
# Helper function to create a fresh connection with test data
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task 1"}]
}])
# Task created on Jan 1, 2025
attrs1 = json.dumps({"createdAt": 1735689600})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-1', ?, ?, 0, 0)
""", (attrs1, test_content))
# Task created on Jan 15, 2025
attrs2 = json.dumps({"createdAt": 1736899200})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-2', ?, ?, 0, 0)
""", (attrs2, test_content))
# Task created on Feb 1, 2025
attrs3 = json.dumps({"createdAt": 1738368000})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-3', ?, ?, 0, 0)
""", (attrs3, test_content))
conn.commit()
return conn
# Create mock that returns a fresh connection each time
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test: Get all tasks (no date filter)
results = service.get_tasks_by_created_date(limit=10)
assert len(results) == 3
# Test: Get tasks created after Jan 10, 2025
results = service.get_tasks_by_created_date(start_date=1736467200, limit=10) # 2025-01-10
assert len(results) == 2
assert all(r["attrs"].created_at >= 1736467200 for r in results)
# Test: Get tasks created before Jan 20, 2025
results = service.get_tasks_by_created_date(end_date=1737331200, limit=10) # 2025-01-20
assert len(results) == 2
assert all(r["attrs"].created_at <= 1737331200 for r in results)
# Test: Get tasks in a specific range
results = service.get_tasks_by_created_date(
start_date=1735689600, # 2025-01-01
end_date=1737331200, # 2025-01-20
limit=10
)
assert len(results) == 2
# Test: Verify sorting (most recent first)
results = service.get_tasks_by_created_date(limit=10)
for i in range(len(results) - 1):
assert results[i]["attrs"].created_at >= results[i + 1]["attrs"].created_at
def test_get_tasks_by_created_date_exclude_deleted_done():
"""Test that creation date filtering respects deleted/done filters."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
attrs = json.dumps({"createdAt": 1735689600})
# Insert normal task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-1', ?, ?, 0, 0)
""", (attrs, test_content))
# Insert deleted task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-2', ?, ?, 1, 0)
""", (attrs, test_content))
# Insert done task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-3', ?, ?, 0, 1)
""", (attrs, test_content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test: Exclude deleted and done (default)
results = service.get_tasks_by_created_date(limit=10)
assert len(results) == 1
assert results[0]["uuid"] == "task-1"
# Test: Include deleted
results = service.get_tasks_by_created_date(include_deleted=True, limit=10)
assert len(results) == 2
# Test: Include done
results = service.get_tasks_by_created_date(include_done=True, limit=10)
assert len(results) == 2
def test_get_tasks_ordered_by_points():
"""Test ordering tasks by points."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
# Insert tasks with different points
attrs1 = json.dumps({"createdAt": 1735689600, "points": 5.0})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-1', ?, ?, 0, 0)
""", (attrs1, test_content))
attrs2 = json.dumps({"createdAt": 1735689600, "points": 15.5})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-2', ?, ?, 0, 0)
""", (attrs2, test_content))
attrs3 = json.dumps({"createdAt": 1735689600, "points": 10.0})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-3', ?, ?, 0, 0)
""", (attrs3, test_content))
# Task without points (should be excluded)
attrs4 = json.dumps({"createdAt": 1735689600})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-4', ?, ?, 0, 0)
""", (attrs4, test_content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test: Get tasks ordered by points (descending - default)
results = service.get_tasks_ordered_by_points(limit=10)
assert len(results) == 3 # task-4 excluded because no points
assert results[0]["attrs"].points == 15.5
assert results[1]["attrs"].points == 10.0
assert results[2]["attrs"].points == 5.0
# Test: Get tasks ordered by points (ascending)
results = service.get_tasks_ordered_by_points(limit=10, ascending=True)
assert len(results) == 3
assert results[0]["attrs"].points == 5.0
assert results[1]["attrs"].points == 10.0
assert results[2]["attrs"].points == 15.5
# Test: Limit works correctly
results = service.get_tasks_ordered_by_points(limit=2)
assert len(results) == 2
def test_get_tasks_ordered_by_points_exclude_deleted_done():
"""Test that points ordering respects deleted/done filters."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
attrs = json.dumps({"createdAt": 1735689600, "points": 10.0})
# Normal task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-1', ?, ?, 0, 0)
""", (attrs, test_content))
# Deleted task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-2', ?, ?, 1, 0)
""", (attrs, test_content))
# Done task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-3', ?, ?, 0, 1)
""", (attrs, test_content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Default excludes deleted and done
results = service.get_tasks_ordered_by_points(limit=10)
assert len(results) == 1
# Include deleted
results = service.get_tasks_ordered_by_points(include_deleted=True, limit=10)
assert len(results) == 2
# Include done
results = service.get_tasks_ordered_by_points(include_done=True, limit=10)
assert len(results) == 2
def test_get_tasks_by_priority_flags_urgent_only():
"""Test filtering tasks with urgent flag only."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
mock_db_connection = Mock()
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
# Urgent only
attrs_u = json.dumps({"createdAt": 1735689600, "flags": "U"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-urgent', ?, ?, 0, 0)
""", (attrs_u, test_content))
# Important only
attrs_i = json.dumps({"createdAt": 1735689600, "flags": "I"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-important', ?, ?, 0, 0)
""", (attrs_i, test_content))
# Both
attrs_both = json.dumps({"createdAt": 1735689600, "flags": "IU"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-both', ?, ?, 0, 0)
""", (attrs_both, test_content))
# None
attrs_none = json.dumps({"createdAt": 1735689600})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-none', ?, ?, 0, 0)
""", (attrs_none, test_content))
conn.commit()
mock_db_connection.get_readonly_connection.return_value = conn
service = TasksService(mock_db_connection)
# Test: Urgent only
results = service.get_tasks_by_priority_flags("urgent", limit=10)
assert len(results) == 1
assert results[0]["uuid"] == "task-urgent"
conn.close()
def test_get_tasks_by_priority_flags_important_only():
"""Test filtering tasks with important flag only."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
mock_db_connection = Mock()
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
# Urgent only
attrs_u = json.dumps({"createdAt": 1735689600, "flags": "U"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-urgent', ?, ?, 0, 0)
""", (attrs_u, test_content))
# Important only
attrs_i = json.dumps({"createdAt": 1735689600, "flags": "I"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-important', ?, ?, 0, 0)
""", (attrs_i, test_content))
# Both
attrs_both = json.dumps({"createdAt": 1735689600, "flags": "IU"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-both', ?, ?, 0, 0)
""", (attrs_both, test_content))
conn.commit()
mock_db_connection.get_readonly_connection.return_value = conn
service = TasksService(mock_db_connection)
# Test: Important only
results = service.get_tasks_by_priority_flags("important", limit=10)
assert len(results) == 1
assert results[0]["uuid"] == "task-important"
conn.close()
def test_get_tasks_by_priority_flags_both():
"""Test filtering tasks with both urgent and important flags."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
mock_db_connection = Mock()
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
# Urgent only
attrs_u = json.dumps({"createdAt": 1735689600, "flags": "U"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-urgent', ?, ?, 0, 0)
""", (attrs_u, test_content))
# Both (IU)
attrs_both1 = json.dumps({"createdAt": 1735689600, "flags": "IU"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-both-1', ?, ?, 0, 0)
""", (attrs_both1, test_content))
# Both (UI - different order)
attrs_both2 = json.dumps({"createdAt": 1735689600, "flags": "UI"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-both-2', ?, ?, 0, 0)
""", (attrs_both2, test_content))
conn.commit()
mock_db_connection.get_readonly_connection.return_value = conn
service = TasksService(mock_db_connection)
# Test: Both flags
results = service.get_tasks_by_priority_flags("both", limit=10)
assert len(results) == 2
assert set(r["uuid"] for r in results) == {"task-both-1", "task-both-2"}
conn.close()
def test_get_tasks_by_priority_flags_none():
"""Test filtering tasks with no priority flags."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
mock_db_connection = Mock()
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
# Urgent only
attrs_u = json.dumps({"createdAt": 1735689600, "flags": "U"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-urgent', ?, ?, 0, 0)
""", (attrs_u, test_content))
# No flags
attrs_none1 = json.dumps({"createdAt": 1735689600})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-none-1', ?, ?, 0, 0)
""", (attrs_none1, test_content))
# Empty flags string
attrs_none2 = json.dumps({"createdAt": 1735689600, "flags": ""})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-none-2', ?, ?, 0, 0)
""", (attrs_none2, test_content))
# Different flag (D for delegated)
attrs_d = json.dumps({"createdAt": 1735689600, "flags": "D"})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-delegated', ?, ?, 0, 0)
""", (attrs_d, test_content))
conn.commit()
mock_db_connection.get_readonly_connection.return_value = conn
service = TasksService(mock_db_connection)
# Test: No priority flags
results = service.get_tasks_by_priority_flags("none", limit=10)
assert len(results) == 3 # none-1, none-2, and delegated
assert set(r["uuid"] for r in results) == {"task-none-1", "task-none-2", "task-delegated"}
conn.close()
def test_get_tasks_by_priority_flags_exclude_deleted_done():
"""Test that priority flag filtering respects deleted/done filters."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
test_content = json.dumps([{
"type": "paragraph",
"content": [{"type": "text", "text": "Task"}]
}])
attrs = json.dumps({"createdAt": 1735689600, "flags": "U"})
# Normal task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-1', ?, ?, 0, 0)
""", (attrs, test_content))
# Deleted task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-2', ?, ?, 1, 0)
""", (attrs, test_content))
# Done task
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES ('task-3', ?, ?, 0, 1)
""", (attrs, test_content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Default excludes deleted and done
results = service.get_tasks_by_priority_flags("urgent", limit=10)
assert len(results) == 1
assert results[0]["uuid"] == "task-1"
# Include deleted
results = service.get_tasks_by_priority_flags("urgent", include_deleted=True, limit=10)
assert len(results) == 2
# Include done
results = service.get_tasks_by_priority_flags("urgent", include_done=True, limit=10)
assert len(results) == 2
# ============================================================================
# Tests for TaskQuery model - comprehensive query scenarios
# ============================================================================
def test_query_tasks_basic():
"""Test basic TaskQuery functionality."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create test tasks
for i in range(5):
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": f"Task {i}"}]}])
attrs = json.dumps({"createdAt": 1735689600 + i * 86400, "points": float(i + 1) * 5})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (f"task-{i}", attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test basic query - get all tasks
query = TaskQuery(limit=10)
results = service.query_tasks(query)
assert len(results) == 5
# Test content search
query = TaskQuery(content_search="Task 2", limit=10)
results = service.query_tasks(query)
assert len(results) == 1
assert results[0]["uuid"] == "task-2"
def test_query_tasks_points_range():
"""Test TaskQuery with points range filtering."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create tasks with different point values
test_data = [
("task-1", 5.0),
("task-2", 10.0),
("task-3", 15.0),
("task-4", 20.0),
("task-5", 25.0),
]
for uuid, points in test_data:
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": "Task"}]}])
attrs = json.dumps({"createdAt": 1735689600, "points": points})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (uuid, attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test min_points only
query = TaskQuery(min_points=15.0, limit=10)
results = service.query_tasks(query)
assert len(results) == 3
assert all(r["attrs"].points >= 15.0 for r in results)
# Test max_points only
query = TaskQuery(max_points=15.0, limit=10)
results = service.query_tasks(query)
assert len(results) == 3
assert all(r["attrs"].points <= 15.0 for r in results)
# Test both min and max
query = TaskQuery(min_points=10.0, max_points=20.0, limit=10)
results = service.query_tasks(query)
assert len(results) == 3
assert all(10.0 <= r["attrs"].points <= 20.0 for r in results)
def test_query_tasks_timestamp_ranges():
"""Test TaskQuery with timestamp range filtering."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create tasks with different timestamps
test_data = [
("task-1", 1735689600, None), # Created Jan 1, not completed
("task-2", 1736899200, 1737331200), # Created Jan 15, completed Jan 20
("task-3", 1738368000, 1738454400), # Created Feb 1, completed Feb 2
]
for uuid, created, completed in test_data:
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": "Task"}]}])
attrs_data = {"createdAt": created}
if completed:
attrs_data["completedAt"] = completed
attrs = json.dumps(attrs_data)
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (uuid, attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test created_after
query = TaskQuery(created_after=1736000000, limit=10) # After Jan 5
results = service.query_tasks(query)
assert len(results) == 2
assert all(r["attrs"].created_at >= 1736000000 for r in results)
# Test created_before
query = TaskQuery(created_before=1737000000, limit=10) # Before Jan 16
results = service.query_tasks(query)
assert len(results) == 2
assert all(r["attrs"].created_at <= 1737000000 for r in results)
# Test completed_after (only completed tasks)
query = TaskQuery(completed_after=1737500000, limit=10)
results = service.query_tasks(query)
assert len(results) == 1
assert results[0]["uuid"] == "task-3"
def test_query_tasks_flags_filtering():
"""Test TaskQuery with comprehensive flags filtering."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create tasks with different flag combinations
test_data = [
("task-urgent", "U"),
("task-important", "I"),
("task-both", "IU"),
("task-delegated", "D"),
("task-none", ""),
("task-all", "IUD"),
]
for uuid, flags in test_data:
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": "Task"}]}])
attrs = json.dumps({"createdAt": 1735689600, "flags": flags})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (uuid, attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test flags_filter: "any" - has at least one I or U
query = TaskQuery(flags_filter="any", limit=10)
results = service.query_tasks(query)
assert len(results) == 4 # urgent, important, both, all
uuids = {r["uuid"] for r in results}
assert uuids == {"task-urgent", "task-important", "task-both", "task-all"}
# Test has_flags: must have all specified flags
query = TaskQuery(has_flags="IU", limit=10)
results = service.query_tasks(query)
assert len(results) == 2 # both and all
uuids = {r["uuid"] for r in results}
assert uuids == {"task-both", "task-all"}
def test_query_tasks_duration_and_recurring():
"""Test TaskQuery with duration and recurring filters."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create tasks with different durations and repeat settings
test_data = [
("task-1", "PT30M", None), # Has duration, not recurring
("task-2", "PT1H", "P1D"), # Has duration, recurring
("task-3", None, "P1W"), # No duration, recurring
("task-4", None, None), # No duration, not recurring
]
for uuid, duration, repeat in test_data:
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": "Task"}]}])
attrs_data = {"createdAt": 1735689600}
if duration:
attrs_data["duration"] = duration
if repeat:
attrs_data["repeat"] = repeat
attrs = json.dumps(attrs_data)
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (uuid, attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test has_duration filter
query = TaskQuery(has_duration=True, limit=10)
results = service.query_tasks(query)
assert len(results) == 2
assert all(r["attrs"].duration is not None for r in results)
# Test duration_equals
query = TaskQuery(duration_equals="PT30M", limit=10)
results = service.query_tasks(query)
assert len(results) == 1
assert results[0]["uuid"] == "task-1"
# Test is_recurring
query = TaskQuery(is_recurring=True, limit=10)
results = service.query_tasks(query)
assert len(results) == 2
uuids = {r["uuid"] for r in results}
assert uuids == {"task-2", "task-3"}
def test_query_tasks_sorting_and_pagination():
"""Test TaskQuery sorting and pagination."""
from unittest.mock import Mock
import sqlite3
from app.tasks import TasksService
from app.models import TaskQuery
def create_test_db():
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
uuid CHARACTER(36),
local_uuid CHARACTER(36),
remote_uuid CHARACTER(36),
deleted INTEGER DEFAULT 0,
calendar_sync_required INTEGER DEFAULT 0,
notify_at INTEGER,
attrs TEXT,
content TEXT,
due INTEGER,
done INTEGER DEFAULT 0,
is_scheduled_bullet INTEGER DEFAULT 0,
parent_uuid CHARACTER(36)
)
""")
# Create tasks with varying attributes
for i in range(10):
content = json.dumps([{"type": "paragraph", "content": [{"type": "text", "text": f"Task {i}"}]}])
attrs = json.dumps({
"createdAt": 1735689600 + i * 86400,
"points": float(i + 1),
"victoryValue": float(10 - i),
"streakCount": i % 3
})
cursor.execute("""
INSERT INTO tasks (uuid, attrs, content, deleted, done)
VALUES (?, ?, ?, 0, 0)
""", (f"task-{i}", attrs, content))
conn.commit()
return conn
mock_db_connection = Mock()
mock_db_connection.get_readonly_connection.side_effect = lambda: create_test_db()
service = TasksService(mock_db_connection)
# Test sorting by points ascending
query = TaskQuery(sort_by="points", sort_descending=False, limit=10)
results = service.query_tasks(query)
assert len(results) == 10
for i in range(9):
assert results[i]["attrs"].points <= results[i + 1]["attrs"].points
# Test sorting by victory_value descending
query = TaskQuery(sort_by="victory_value", sort_descending=True, limit=10)
results = service.query_tasks(query)
for i in range(9):
assert results[i]["attrs"].victory_value >= results[i + 1]["attrs"].victory_value
# Test pagination
query = TaskQuery(limit=3, offset=0, sort_by="created")
page1 = service.query_tasks(query)
assert len(page1) == 3
query = TaskQuery(limit=3, offset=3, sort_by="created")
page2 = service.query_tasks(query)
assert len(page2) == 3
# Pages should have different tasks
page1_uuids = {t["uuid"] for t in page1}
page2_uuids = {t["uuid"] for t in page2}
assert len(page1_uuids & page2_uuids) == 0