Skip to content
Snippets Groups Projects
forthwith.py 7.98 KiB
Newer Older
#!/usr/bin/env python3

# Copyright (C) 2024, UChicago Argonne, LLC
# Licensed under the 3-clause BSD license.  See accompanying LICENSE.txt file
# in the top-level directory.

from typing import Union
from datetime import datetime, timezone, timedelta


FORMAT_DATE_FULL_ISO8601 = '%Y-%m-%dT%H:%M:%S.%f'
FORMAT_DATE_FULL = '%Y-%m-%d %H:%M:%S.%f'
FORMAT_DATE_FULL_T = '%Y-%m-%dT%H:%M:%S.%f'
FORMAT_DATE_FULL_TZ = '%Y-%m-%d %H:%M:%S.%f %Z'
FORMAT_DATE_SECOND = '%Y-%m-%d %H:%M:%S'
FORMAT_DATE_SECOND_T = '%Y-%m-%dT%H:%M:%S'
FORMAT_DATE_HOUR = '%Y-%m-%d %H'
FORMAT_DATE_DAY = '%Y-%m-%d'

utc_tz = timezone(timedelta(seconds=0), name='UTC')
ONE_DAY = timedelta(days=1)
EPOCH_START_UTC = datetime(1970, 1, 1).replace(tzinfo=utc_tz)
EPOCH_END_UTC = datetime(2038, 1, 1).replace(tzinfo=utc_tz)
EPOCH_START_NOTZ = datetime(1970, 1, 1)
EPOCH_END_NOTZ = datetime(2038, 1, 1)


def timedelta_to_str(td: timedelta):
    """Convert a timedelta into a string."""
    days = td.days
    seconds = td.seconds
    hours, seconds = divmod(seconds, 3600)
    minutes, seconds = divmod(seconds, 60)
    microseconds = td.microseconds
    days = int(days)
    hours = int(hours)
    minutes = int(minutes)
    seconds = int(seconds)
    microseconds = round(microseconds)
    if days:
        if days > 1:
            day_str = "days" if days > 1 else "day"
        else:
            day_str = "days" if days < -1 else "day"
        result = f"{days} {day_str}, {hours:0>2}:{minutes:0>2}:{seconds:0>2}"
    else:
        result = f"{hours:0>2}:{minutes:0>2}:{seconds:0>2}"
    if microseconds:
        result = f"{result}.{microseconds:0>6}"
    return result


def timedelta_to_daytime(timedeltaobj):
    """Convert a timedelta object to hours"""
    return (timedeltaobj.microseconds / 1000000.0) + (timedeltaobj.seconds / 3600.0 / 24 + timedeltaobj.days)


def timedelta_to_microtime(timedeltaobj):
    """Convert a timedelta object to microseconds"""
    return timedeltaobj.microseconds + (timedeltaobj.seconds + timedeltaobj.days * 86400) * 1000000


def timedelta_to_minutetime(timedeltaobj):
    """Convert a timedelta object to seconds"""
    return (timedeltaobj.microseconds / 1000000.0 / 60.0) + (timedeltaobj.seconds / 60.0 + timedeltaobj.days * 24 * 60)


def timedelta_to_secondtime(td):
    """Convert a timedelta object to seconds"""
    return td.total_seconds()


def timedelta_to_hourtime(timedeltaobj):
    """Convert a timedelta object to hours"""
    return (timedeltaobj.microseconds / 1000000.0 / 3600.0) + (timedeltaobj.seconds / 3600.0 + timedeltaobj.days * 24)


def timedelta_str_to_td(td_str: str):
    """Convert a timedelta string, such as a walltime into a timedelta."""
    if td_str.startswith("-"):
        negative = True
        td_str = td_str.lstrip("-")
    else:
        negative = False
    if '.' in td_str:
        dhms, us = td_str.split(".", 1)
    else:
        us = 0.0
        dhms = td_str

    if "days" in dhms:
        d, hms = dhms.split(" days, ", 1)
    elif "day" in dhms:
        d, hms = dhms.split(" day, ", 1)
    else:
        d = "0"
        hms = dhms
    h, m, s = hms.split(':', 2)
    d = int(d)
    h = int(h)
    m = int(m)
    s = int(s)
    us = int(us)
    if negative:
        d = -d
    td = timedelta(days=int(d), hours=int(h), minutes=int(m), seconds=int(s), microseconds=us)
    return td


def split_event(rs: datetime, re: datetime, ts: datetime, te: datetime) -> tuple:
    """
    Given a range, split the ts->te to fit within the range.
    """
    _time_start = None
    _time_end = None
    _time_remaining = None

    if ts >= rs:
        if ts < re:
            _time_start = ts
        else:  # ts >= re:
            _time_start = None
    else:  # ts < rs
        if te > rs:  # we need to check if it's still in range
            _time_start = rs
        else:
            _time_start = None

    if te < re:
        if te >= rs:
            _time_end = te
        else:
            _time_end = None
    else:  # te >= re
        if ts <= re:
            _time_end = re
        else:
            _time_end = None

    # if _time_start == _time_end:
    #    _???

    if _time_start is not None and _time_end is not None:
        _time_remaining = (te - ts) - (_time_end - _time_start)
    else:
        _time_start = None
        _time_end = None
        _time_remaining = re - rs

    return _time_start, _time_end, _time_remaining


def get_now(notz=False) -> datetime:
    """
    Return a datetime object of now as the local server time then convert it to utc
    """
    if notz:
        result = datetime.utcnow()
    else:
        result = datetime.now(utc_tz)
    return result


class DurationException(Exception):
    pass


class IntervalRange:
    """Simple class to assist with handling ranges.
    extracted from trireme.duration
    """

    def __init__(self, start, end):
        """set the start and compute the delta"""
        self.start = start
        self.end = end
        self.delta = end - start
        if self.delta < timedelta(0):
            raise DurationException("Time Range is negative!")

    def set_start(self, start):
        """set the start and compute the delta again"""
        self.start = start
        self.delta = self.end - self.start

    def set_end(self, end):
        """set the end and compute the delta again"""
        self.end = end
        self.delta = self.end - self.start

    def get_range(self, interval):
        """Return a range from the start to end at an interval"""
        range_lst = []
        if type(interval) == timedelta:
            interval_count = int(timedelta_to_secondtime(self.delta) / timedelta_to_secondtime(interval))
            if (timedelta_to_secondtime(self.delta) % timedelta_to_secondtime(interval)) > 0:
                interval_count += 1
        else:
            interval_count = int(self.delta / interval)
            if (self.delta % interval) > 0:
                interval_count += 1
        for interval_index in range(interval_count):
            x1 = self.start + (interval_index * interval)
            range_lst.append(x1)
        return range_lst

    def in_range(self, point):
        """For something to be in a range it must be >= start and < the end"""
        if self.start <= point < self.end:
            result = True
        else:
            result = False
        return result


def epoch_to_datetime(epoch_time: Union[float, int]) -> datetime:
    return datetime.utcfromtimestamp(epoch_time)


def datetime_to_epoch(datetime_obj: datetime) -> float:
    """Given a datetime object with timezone,
    return the number of seconds in epoch"""
    if datetime_obj.tzinfo is None:
        datetime_obj = datetime_obj.replace(tzinfo=utc_tz)
    delta = (datetime_obj - EPOCH_START_UTC)
    return timedelta_to_microtime(delta) / 1000000


def get_daily_ranges(time_start_obj, time_end_obj):
    """Given a range, return a new set of ranges for each day that the first
    range is inside.
    extracted from trireme.duration
    - Deprecated -
    """
    start_obj = datetime.fromisoformat(time_start_obj.strftime(FORMAT_DATE_DAY))
    end_obj = datetime.fromisoformat(time_end_obj.strftime(FORMAT_DATE_DAY)) + ONE_DAY
    time_list = []
    for x in range(int(timedelta_to_daytime(end_obj - start_obj))):
        start = start_obj + (x * ONE_DAY)
        end = start_obj + ((x + 1) * ONE_DAY)
        time_list.append((start, end,))
    return time_list


def get_ts_delta_iterator(start_obj, enden_obj, delta):
    """given a start and end and a timedelta, return an interator for that time range.
    - Deprecated -
    """
    class TimeIterator(object):
        def __init__(self, start_obj, enden_obj, delta):
            self.current_obj = start_obj
            self.enden_obj = enden_obj
            self.delta = delta

        def __iter__(self):
            return self

        def __next__(self):
            if self.current_obj >= self.enden_obj:
                raise StopIteration
            else:
                result = self.current_obj
                self.current_obj += self.delta
                return result

    return TimeIterator(start_obj, enden_obj, delta)