#!/usr/bin/env python3 # Copyright (C) 2024, UChicago Argonne, LLC # Licensed under the 3-clause BSD license. See accompanying LICENSE.txt file # in the top-level directory. from typing import Union from datetime import datetime, timezone, timedelta FORMAT_DATE_FULL_ISO8601 = "%Y-%m-%dT%H:%M:%S.%f" FORMAT_DATE_FULL = "%Y-%m-%d %H:%M:%S.%f" FORMAT_DATE_FULL_T = "%Y-%m-%dT%H:%M:%S.%f" FORMAT_DATE_FULL_TZ = "%Y-%m-%d %H:%M:%S.%f %Z" FORMAT_DATE_FULL_T_NOCOLON = "%Y-%m-%dT%H%M%S.%f" FORMAT_DATE_SECOND = "%Y-%m-%d %H:%M:%S" FORMAT_DATE_SECOND_T = "%Y-%m-%dT%H:%M:%S" FORMAT_DATE_SECOND_T_NOCOLON = "%Y-%m-%dT%H%M%S" FORMAT_DATE_MINUTE_T_NOCOLON = "%Y-%m-%dT%H%M" FORMAT_DATE_HOUR = "%Y-%m-%d %H" FORMAT_DATE_HOUR_T = "%Y-%m-%dT%H" FORMAT_DATE_DAY = "%Y-%m-%d" FORMAT_DATE_MONTH = "%Y-%m" utc_tz = timezone(timedelta(seconds=0), name="UTC") ONE_DAY = timedelta(days=1) EPOCH_START_UTC = datetime(1970, 1, 1).replace(tzinfo=utc_tz) EPOCH_END_UTC = datetime(2038, 1, 1).replace(tzinfo=utc_tz) EPOCH_START_NOTZ = datetime(1970, 1, 1) EPOCH_END_NOTZ = datetime(2038, 1, 1) def timedelta_to_str(td: timedelta): """Convert a timedelta into a string.""" days = td.days seconds = td.seconds hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) microseconds = td.microseconds days = int(days) hours = int(hours) minutes = int(minutes) seconds = int(seconds) microseconds = round(microseconds) if days: if days > 1: day_str = "days" if days > 1 else "day" else: day_str = "days" if days < -1 else "day" result = f"{days} {day_str}, {hours:0>2}:{minutes:0>2}:{seconds:0>2}" else: result = f"{hours:0>2}:{minutes:0>2}:{seconds:0>2}" if microseconds: result = f"{result}.{microseconds:0>6}" return result def timedelta_to_daytime(timedeltaobj): """Convert a timedelta object to hours""" return (timedeltaobj.microseconds / 1000000.0) + (timedeltaobj.seconds / 3600.0 / 24 + timedeltaobj.days) def timedelta_to_microtime(timedeltaobj): """Convert a timedelta object to microseconds""" return timedeltaobj.microseconds + (timedeltaobj.seconds + timedeltaobj.days * 86400) * 1000000 def timedelta_to_minutetime(timedeltaobj): """Convert a timedelta object to seconds""" return (timedeltaobj.microseconds / 1000000.0 / 60.0) + (timedeltaobj.seconds / 60.0 + timedeltaobj.days * 24 * 60) def timedelta_to_secondtime(td): """Convert a timedelta object to seconds""" return td.total_seconds() def timedelta_to_hourtime(timedeltaobj): """Convert a timedelta object to hours""" return (timedeltaobj.microseconds / 1000000.0 / 3600.0) + (timedeltaobj.seconds / 3600.0 + timedeltaobj.days * 24) def timedelta_str_to_td(td_str: str): """Convert a timedelta string, such as a walltime into a timedelta.""" if td_str.startswith("-"): negative = True td_str = td_str.lstrip("-") else: negative = False if "." in td_str: dhms, us = td_str.split(".", 1) else: us = 0.0 dhms = td_str if "days" in dhms: d, hms = dhms.split(" days, ", 1) elif "day" in dhms: d, hms = dhms.split(" day, ", 1) else: d = "0" hms = dhms h, m, s = hms.split(":", 2) d = int(d) h = int(h) m = int(m) s = int(s) us = int(us) if negative: d = -d td = timedelta(days=int(d), hours=int(h), minutes=int(m), seconds=int(s), microseconds=us) return td def split_event(rs: datetime, re: datetime, ts: datetime, te: datetime) -> tuple: """ Given a range, split the ts->te to fit within the range. """ _time_start = None _time_end = None _time_remaining = None if ts >= rs: if ts < re: _time_start = ts else: # ts >= re: _time_start = None else: # ts < rs if te > rs: # we need to check if it's still in range _time_start = rs else: _time_start = None if te < re: if te >= rs: _time_end = te else: _time_end = None else: # te >= re if ts <= re: _time_end = re else: _time_end = None # if _time_start == _time_end: # _??? if _time_start is not None and _time_end is not None: _time_remaining = (te - ts) - (_time_end - _time_start) else: _time_start = None _time_end = None _time_remaining = re - rs return _time_start, _time_end, _time_remaining def get_now(notz=False) -> datetime: """ Return a datetime object of now as the local server time then convert it to utc """ if notz: result = datetime.utcnow() else: result = datetime.now(utc_tz) return result class DurationException(Exception): pass class IntervalRange: """Simple class to assist with handling ranges.""" def __init__(self, start, end): """set the start and compute the delta""" self.start = start self.end = end self.delta = end - start if self.delta < timedelta(0): raise DurationException("Time Range is negative!") def set_start(self, start): """set the start and compute the delta again""" self.start = start self.delta = self.end - self.start def set_end(self, end): """set the end and compute the delta again""" self.end = end self.delta = self.end - self.start def get_range(self, interval): """Return a range from the start to end at an interval""" range_lst = [] if type(interval) == timedelta: interval_count = int(timedelta_to_secondtime(self.delta) / timedelta_to_secondtime(interval)) if (timedelta_to_secondtime(self.delta) % timedelta_to_secondtime(interval)) > 0: interval_count += 1 else: interval_count = int(self.delta / interval) if (self.delta % interval) > 0: interval_count += 1 for interval_index in range(interval_count): x1 = self.start + (interval_index * interval) range_lst.append(x1) return range_lst def in_range(self, point): """For something to be in a range it must be >= start and < the end""" if self.start <= point < self.end: result = True else: result = False return result def epoch_to_datetime(epoch_time: Union[float, int]) -> datetime: return datetime.utcfromtimestamp(epoch_time) def datetime_to_epoch(datetime_obj: datetime) -> float: """Given a datetime object with timezone, return the number of seconds in epoch""" if datetime_obj.tzinfo is None: datetime_obj = datetime_obj.replace(tzinfo=utc_tz) delta = datetime_obj - EPOCH_START_UTC return timedelta_to_microtime(delta) / 1000000 def get_daily_ranges(time_start_obj, time_end_obj): """Given a range, return a new set of ranges for each day that the first range is inside. - Deprecated - """ start_obj = datetime.fromisoformat(time_start_obj.strftime(FORMAT_DATE_DAY)) end_obj = datetime.fromisoformat(time_end_obj.strftime(FORMAT_DATE_DAY)) + ONE_DAY time_list = [] for x in range(int(timedelta_to_daytime(end_obj - start_obj))): start = start_obj + (x * ONE_DAY) end = start_obj + ((x + 1) * ONE_DAY) time_list.append( ( start, end, ) ) return time_list def get_ts_delta_iterator(start_obj, enden_obj, delta): """given a start and end and a timedelta, return an interator for that time range. - Deprecated - """ class TimeIterator(object): def __init__(self, start_obj, enden_obj, delta): self.current_obj = start_obj self.enden_obj = enden_obj self.delta = delta def __iter__(self): return self def __next__(self): if self.current_obj >= self.enden_obj: raise StopIteration else: result = self.current_obj self.current_obj += self.delta return result return TimeIterator(start_obj, enden_obj, delta)