import time
from datetime import UTC, datetime, timedelta, timezone
from typing import Optional
from zoneinfo import ZoneInfo
from dateutil import parser
from dateutil.relativedelta import relativedelta
def current_datetime() -> datetime:
return datetime.now(UTC)
def truncate_minute(dt: datetime) -> datetime:
return dt.replace(second=0, microsecond=0)
def truncate_hour(dt: datetime) -> datetime:
return dt.replace(minute=0, second=0, microsecond=0)
def truncate_day(dt: datetime) -> datetime:
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def truncate_month(dt: datetime) -> datetime:
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
def truncate_year(dt: datetime) -> datetime:
return dt.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
def current_minute() -> datetime:
return truncate_minute(current_datetime())
def current_hour() -> datetime:
return truncate_hour(current_datetime())
def current_day() -> datetime:
return truncate_day(current_datetime())
def yesterday() -> datetime:
return current_day() - timedelta(days=1)
def current_month() -> datetime:
return truncate_month(current_datetime())
def current_year() -> datetime:
return truncate_year(current_datetime())
def set_timezone(dt: datetime, tz: str = 'UTC') -> datetime:
tzinfo = ZoneInfo(tz)
if dt.tzinfo is None:
return dt.replace(tzinfo=tzinfo)
else:
return dt.astimezone(tzinfo)
def dt_to_unix(value: datetime, tz: str = 'UTC') -> Optional[int]:
if not value:
return None
if value.tzinfo:
value = value.astimezone(ZoneInfo(tz))
else:
value = value.replace(tzinfo=ZoneInfo(tz))
return int(value.timestamp() * 1_000_000)
def unix_to_dt(value: int, tz: str = 'UTC') -> Optional[datetime]:
if value is None:
return None
ts = value // 1_000_000
microseconds = value % 1_000_000
dt = datetime.fromtimestamp(ts, ZoneInfo(tz))
return dt.replace(microsecond=microseconds)
def to_iso_format(dt: datetime | None, default: str | None = None) -> str | None:
if not dt:
return default
return dt.strftime('%Y-%m-%dT%H:%M:%S') + iso_timezone(dt)
def from_iso_format(iso_str: str) -> datetime:
dt = parser.parse(iso_str)
# do not touch!
str(dt)
# Simple workaround for https://github.com/dateutil/dateutil/issues/188
result = to_utc(dt)
assert result is not None
return result
def iso_timezone(dt: datetime) -> str:
off = dt.utcoffset()
if off is not None:
if off.days < 0:
sign = "-"
off = -off
else:
sign = "+"
hh_delta, mm_delta = divmod(off, timedelta(hours=1))
hh = int(hh_delta.total_seconds() / 3600) if isinstance(hh_delta, timedelta) else hh_delta
mm = int(mm_delta.total_seconds() / 60)
return "%s%02d:%02d" % (sign, hh, mm)
else:
return ''
def ms() -> float:
return time.time() * 1000
def format_sec(ts: int, r: int = 2) -> str:
seconds = round((ms() - ts) / 1000, r)
return f'seconds: {seconds}'
def get_last_day_of_month(dt: datetime) -> datetime:
curr_month = truncate_month(dt)
return curr_month + relativedelta(months=1) - timedelta(days=1)
def to_utc(dt: datetime) -> datetime | None:
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
else:
return dt.astimezone(timezone.utc)
def pretty_day(dt: datetime) -> str:
return dt.strftime('%Y-%m-%d')
def pretty_hour(dt: datetime) -> str:
return dt.strftime('%Y-%m-%dT%H:00')
def pretty_time(dt: datetime) -> str:
return dt.strftime('%Y-%m-%dT%H:%M:%S')
def pretty_timedelta(value: timedelta | relativedelta) -> str:
"""
Covert timedelta to the human-readable string
"""
sign = '-' if value.total_seconds() < 0 else ''
total_seconds = abs(int(value.total_seconds()))
days, remainder = divmod(total_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
msgs = []
if days:
if days == 1:
msgs.append(f'{sign}{days} day')
else:
msgs.append(f'{sign}{days} days')
if hours:
if hours == 1:
msgs.append(f'{sign}{hours} hour')
else:
msgs.append(f'{sign}{hours} hours')
if minutes:
if minutes == 1:
msgs.append(f'{sign}{minutes} min')
else:
msgs.append(f'{sign}{minutes} minutes')
if seconds:
if seconds == 1:
msgs.append(f'{sign}{seconds} sec')
else:
msgs.append(f'{sign}{seconds} seconds')
return ', '.join(msgs)