"""
Batch todo creation with auto-categorization
Handles creating multiple todos at once with smart keyword detection.
"""
import re
from datetime import datetime
from typing import Any, Optional
import aiosqlite
async def add_todos_batch(
todos_batch: list[dict[str, Any]],
db: aiosqlite.Connection,
auto_categorize: bool = True,
) -> dict[str, Any]:
"""
Create multiple todos at once with auto-categorization
Args:
todos_batch: List of todo dicts with title + optional fields
db: Database connection
auto_categorize: Apply keyword-based auto-categorization
Returns:
Dict with created todos and suggestions
"""
created_todos = []
suggestions = []
for todo_data in todos_batch:
# Apply auto-categorization if enabled
if auto_categorize:
enhanced_data = auto_categorize_todo(todo_data)
categorization_notes = enhanced_data.pop("_categorization_notes", [])
if categorization_notes:
suggestions.extend([f"{todo_data['title']}: {note}" for note in categorization_notes])
else:
enhanced_data = todo_data.copy()
# Set defaults
enhanced_data.setdefault("priority", "medium")
enhanced_data.setdefault("energy_required", "medium")
enhanced_data.setdefault("status", "active")
enhanced_data.setdefault("created_at", datetime.now().isoformat())
# Insert into database
cursor = await db.execute(
"""
INSERT INTO todos (
title, priority, notes, quick, timeframe, energy_required,
theme_tag, task_context, blocked_by, linked_goal_id,
parent_todo_id, task_order, time_estimate, status, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
enhanced_data["title"],
enhanced_data["priority"],
enhanced_data.get("notes"),
enhanced_data.get("quick", False),
enhanced_data.get("timeframe"),
enhanced_data["energy_required"],
enhanced_data.get("theme_tag"),
enhanced_data.get("task_context"),
enhanced_data.get("blocked_by"),
enhanced_data.get("linked_goal_id"),
enhanced_data.get("parent_todo_id"),
enhanced_data.get("task_order", 0),
enhanced_data.get("time_estimate"),
enhanced_data["status"],
enhanced_data["created_at"],
),
)
todo_id = cursor.lastrowid
created_todos.append({"id": todo_id, **enhanced_data})
await db.commit()
return {
"created_count": len(created_todos),
"created_todos": created_todos,
"suggestions": suggestions,
}
def auto_categorize_todo(todo_data: dict[str, Any]) -> dict[str, Any]:
"""
Apply keyword-based auto-categorization to a todo
Detects:
- Theme tags from keywords
- Time estimates from notes
- Quick win status
- Suggested timeframe
- Suggested energy level
Returns enhanced todo_data with _categorization_notes
"""
enhanced = todo_data.copy()
notes = []
title = todo_data["title"].lower()
description = (todo_data.get("notes") or "").lower()
combined_text = f"{title} {description}"
# 1. Detect theme tag
if not todo_data.get("theme_tag"):
theme = detect_theme(combined_text)
if theme:
enhanced["theme_tag"] = theme
notes.append(f"Auto-tagged as '{theme}'")
# 2. Extract time estimate
if not todo_data.get("time_estimate"):
time_est = extract_time_estimate(combined_text)
if time_est:
enhanced["time_estimate"] = time_est
notes.append(f"Detected time estimate: {time_est}min")
# Auto-mark as quick if <= 30min
if time_est <= 30 and not todo_data.get("quick"):
enhanced["quick"] = True
notes.append("Marked as quick win")
# 3. Suggest timeframe
if not todo_data.get("timeframe"):
from .temporal import suggest_timeframe_from_keywords
suggested_timeframe = suggest_timeframe_from_keywords(
combined_text, todo_data.get("priority", "medium")
)
if suggested_timeframe:
enhanced["timeframe"] = suggested_timeframe
notes.append(f"Suggested timeframe: {suggested_timeframe}")
# 4. Suggest energy level
if not todo_data.get("energy_required") or enhanced["energy_required"] == "medium":
suggested_energy = suggest_energy_level(combined_text)
if suggested_energy and suggested_energy != "medium":
enhanced["energy_required"] = suggested_energy
notes.append(f"Suggested energy: {suggested_energy}")
# 5. Detect blockers
blocker_match = re.search(r"blocked by:?\s*(.+?)(?:\n|$)", description, re.IGNORECASE)
if blocker_match and not todo_data.get("blocked_by"):
enhanced["blocked_by"] = blocker_match.group(1).strip()
notes.append(f"Detected blocker: {enhanced['blocked_by']}")
enhanced["_categorization_notes"] = notes
return enhanced
def detect_theme(text: str) -> Optional[str]:
"""Detect theme tag from keywords"""
# Sprint work
sprint_keywords = [
"sprint",
"deployment",
"deploy",
"smoke test",
"endpoint",
"test",
"bug fix",
"hotfix",
"production",
]
if any(kw in text for kw in sprint_keywords):
return "sprint_work"
# Strategic work
strategic_keywords = [
"pitch",
"deck",
"platform",
"strategy",
"planning",
"roadmap",
"architecture",
"vision",
"proposal",
]
if any(kw in text for kw in strategic_keywords):
return "strategic"
# Admin work
admin_keywords = [
"meeting",
"email",
"calendar",
"schedule",
"ado board",
"update board",
"standup",
"1:1",
"sync",
"check-in",
]
if any(kw in text for kw in admin_keywords):
return "admin"
# Learning
learning_keywords = [
"learn",
"research",
"study",
"explore",
"investigate",
"read about",
"tutorial",
"course",
]
if any(kw in text for kw in learning_keywords):
return "learning"
return None
def extract_time_estimate(text: str) -> Optional[int]:
"""Extract time estimate in minutes from text"""
# Patterns: "30min", "2h", "1.5hr", "(45 minutes)", "~2 hours"
patterns = [
(r"(\d+(?:\.\d+)?)\s*h(?:ou)?rs?", lambda m: int(float(m.group(1)) * 60)),
(r"(\d+)\s*min(?:ute)?s?", lambda m: int(m.group(1))),
(r"(\d+(?:\.\d+)?)\s*h\b", lambda m: int(float(m.group(1)) * 60)),
(r"(\d+)m\b", lambda m: int(m.group(1))),
]
for pattern, converter in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return converter(match)
# Quick keyword = 15min default
if re.search(r"\bquick\b", text, re.IGNORECASE):
return 15
return None
def suggest_energy_level(text: str) -> Optional[str]:
"""Suggest energy level from keywords"""
# High energy markers
high_keywords = [
"creative",
"design",
"architecture",
"planning",
"strategy",
"complex",
"deep work",
"difficult",
"challenging",
]
if any(kw in text for kw in high_keywords):
return "high"
# Low energy markers
low_keywords = [
"review",
"check",
"update",
"email",
"admin",
"routine",
"simple",
"quick",
"easy",
"scan",
]
if any(kw in text for kw in low_keywords):
return "low"
return "medium" # Default