toggl_service.py•7.83 kB
"""Toggl API integration service."""
import httpx
import asyncio
from base64 import b64encode
from typing import List, Dict, Any, Optional
from ..utils import date_range, log_info, log_error
import logging
logger = logging.getLogger(__name__)
class TogglService:
"""Handles Toggl API integration with rate limiting and pagination."""
BASE_URL = "https://api.track.toggl.com"
REPORTS_URL = f"{BASE_URL}/reports/api/v3"
def __init__(self, api_token: str, workspace_id: str):
"""
Initialize Toggl service.
Args:
api_token: Toggl API token
workspace_id: Toggl workspace ID
"""
self.api_token = api_token
self.workspace_id = workspace_id
self._auth_header = self._build_auth_header()
self._client: Optional[httpx.AsyncClient] = None
logger.debug(f"TogglService initialized with workspace_id={workspace_id}")
def _build_auth_header(self) -> str:
"""Build HTTP Basic Auth header."""
credentials = b64encode(f"{self.api_token}:api_token".encode()).decode()
return f"Basic {credentials}"
def _get_headers(self) -> Dict[str, str]:
"""Get standard request headers."""
return {
"Content-Type": "application/json",
"Authorization": self._auth_header,
}
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create async HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient()
return self._client
async def _fetch_with_backoff(
self,
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
max_retries: int = 3,
) -> httpx.Response:
"""
Fetch URL with exponential backoff on 429 (rate limit).
Args:
url: URL to fetch
method: HTTP method (GET or POST)
params: Query parameters (for GET requests)
json_data: JSON body (for POST requests)
max_retries: Maximum number of retries
Returns:
Response object
"""
client = await self._get_client()
for attempt in range(max_retries):
try:
if method == "POST":
response = await client.post(
url, json=json_data, headers=self._get_headers(), timeout=10.0
)
else:
response = await client.get(
url, params=params, headers=self._get_headers(), timeout=10.0
)
if response.status_code == 429:
wait_time = 1 * (2**attempt) # 1s, 2s, 4s - much more reasonable
log_info(f"Rate limited (429). Waiting {wait_time}s before retry {attempt + 1}/3")
await asyncio.sleep(wait_time)
continue
if response.status_code >= 400:
log_error(f"API error: {response.status_code} - {response.text}")
response.raise_for_status()
return response
except httpx.TimeoutException:
log_error(f"Request timeout on attempt {attempt + 1}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(5)
raise Exception(f"Max retries exceeded for {url}")
async def close(self) -> None:
"""Close the async client connection."""
if self._client is not None:
await self._client.aclose()
self._client = None
async def get_workspace_users(self) -> List[Dict[str, Any]]:
"""
Fetch all users in the workspace.
Returns:
List of user dicts with id, email, name
"""
url = f"{self.BASE_URL}/api/v9/workspaces/{self.workspace_id}/workspace_users"
response = await self._fetch_with_backoff(url, {})
users = response.json()
# Transform to expected format
return [
{"id": str(user["id"]), "email": user["email"], "name": user["name"]}
for user in users
]
async def get_time_entries(
self, start_date: str, end_date: str, user_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Fetch time entries for a date range using Reports API.
Args:
start_date: Start date in ISO 8601 format (YYYY-MM-DD)
end_date: End date in ISO 8601 format (YYYY-MM-DD)
user_id: Optional user ID to filter by
Returns:
List of time entry dicts
"""
url = f"{self.REPORTS_URL}/workspace/{self.workspace_id}/search/time_entries"
all_entries = []
page_size = 50
first_id = None
first_row_number = None
first_timestamp = None
page_count = 0
while True:
page_count += 1
# Safety check: prevent infinite loops (max 100 pages per date range)
if page_count > 100:
log_error(f"Pagination exceeded 100 pages, stopping to prevent infinite loop")
break
# Build request body
body = {
"start_date": start_date,
"end_date": end_date,
"page_size": page_size,
}
if user_id:
body["user_ids"] = [int(user_id)]
# Add pagination markers if present (only after first page)
if first_id is not None:
body["first_id"] = first_id
body["first_row_number"] = first_row_number
body["first_timestamp"] = first_timestamp
log_info(f"Fetching time entries page {page_count}: start={start_date}, end={end_date}, page_size={page_size}")
response = await self._fetch_with_backoff(
url, method="POST", json_data=body
)
data = response.json()
# Extract entries from response
if isinstance(data, list):
entries = data
elif isinstance(data, dict) and "data" in data:
entries = data["data"]
else:
entries = []
log_info(f"Received {len(entries)} entries on page {page_count}")
all_entries.extend(entries)
# Check if there are more pages (if we got fewer than page_size, this is the last page)
if len(entries) < page_size:
log_info(f"Pagination complete: got {len(entries)} entries (< {page_size}), stopping")
break # No more pages
# Set pagination markers for next request using response headers
# The Toggl API returns pagination info in response headers, not in the body
next_id = response.headers.get("X-Next-Id")
next_row_number = response.headers.get("X-Next-Row-Number")
next_timestamp = response.headers.get("X-Next-Timestamp")
if next_id:
# Headers return strings, but API expects integers for first_id and first_timestamp
first_id = int(next_id) if next_id else None
first_row_number = int(next_row_number) if next_row_number else len(all_entries)
first_timestamp = int(next_timestamp) if next_timestamp else None
log_info(f"Set next page markers from headers: first_id={first_id}, row={first_row_number}, ts={first_timestamp}")
else:
log_info(f"No X-Next-Id in response headers, pagination may be complete")
log_info(f"Finished pagination: fetched {len(all_entries)} total entries from {page_count} pages")
return all_entries