# Copyright (C) 2024, UChicago Argonne, LLC # Licensed under the 3-clause BSD license. See accompanying LICENSE.txt file # in the top-level directory. import datetime import pytest from dateutil.parser import parse as date_parse from Octeres.forthwith import ( split_event, timedelta_str_to_td, timedelta_to_str, get_now, ) from Octeres.forthwith import ( datetime_to_epoch, epoch_to_datetime, FORMAT_DATE_FULL, get_daily_ranges, IntervalRange, ) from Octeres.util import iterate_chain def input_datetimes(): lst = [ (date_parse("2023-01-01"), "2023-01-01 00:00:00.000000", 1672531200), (date_parse("2023-01-01"), "2023-01-01 00:00:00.000000", 1672531200.0), ( date_parse("2023-01-01 00:00:00.000001"), "2023-01-01 00:00:00.000001", 1672531200.000001, ), ( date_parse("1970-01-01 00:00:00.000001"), "1970-01-01 00:00:00.000001", 0.000001, ), ] return lst @pytest.mark.parametrize("dt_obj,dt_str,epoch", input_datetimes()) def test_datetime_to_from_epoch(dt_obj, dt_str, epoch): result_epoch = datetime_to_epoch(dt_obj) result_dt = epoch_to_datetime(epoch) result_dt_str = dt_obj.strftime(FORMAT_DATE_FULL) result_dt_obj = datetime.datetime.strptime(dt_str, FORMAT_DATE_FULL) assert result_epoch == epoch assert result_dt == dt_obj assert result_dt_str == dt_str assert result_dt_obj == dt_obj print(result_epoch) class Test_split_event: def test_split_event_00(self): """ Test a event within a range. """ rs = date_parse("2016-01-01") re = date_parse("2016-02-01") ts = date_parse("2016-01-01") te = date_parse("2016-01-02") correct = (ts, te, datetime.timedelta(seconds=0)) result = split_event(rs, re, ts, te) assert result == correct def test_split_event_01(self): """ Test a event crossing a range. """ rs = date_parse("2016-01-01") re = date_parse("2016-02-01") ts = date_parse("2016-01-15") te = date_parse("2016-02-05") correct = (ts, re, datetime.timedelta(days=4)) result = split_event(rs, re, ts, te) assert result == correct def test_split_event_all(self): range_start = date_parse("2015-01-03") range_end = date_parse("2015-01-06") range_full_remaining = range_end - range_start cases = [] # A outer left touching a_time_start = date_parse("2015-01-01") a_time_end = date_parse("2015-01-03") a_correct = (None, None, range_full_remaining) cases.append(("a", a_time_start, a_time_end, a_correct)) # B exact left, right b_time_start = date_parse("2015-01-03") b_time_end = date_parse("2015-01-06") b_correct = (b_time_start, b_time_end, datetime.timedelta(seconds=0)) cases.append(("b", b_time_start, b_time_end, b_correct)) # C outer right touching c_time_start = date_parse("2015-01-06") c_time_end = date_parse("2015-01-08") c_correct = (None, None, range_full_remaining) cases.append(("c", c_time_start, c_time_end, c_correct)) # D inside d_time_start = date_parse("2015-01-04") d_time_end = date_parse("2015-01-05") d_correct = (d_time_start, d_time_end, datetime.timedelta(seconds=0)) cases.append(("d", d_time_start, d_time_end, d_correct)) # E span e_time_start = date_parse("2015-01-02") e_time_end = date_parse("2015-01-07") e_correct = (range_start, range_end, datetime.timedelta(days=2)) cases.append(("e", e_time_start, e_time_end, e_correct)) # F left into middle f_time_start = date_parse("2015-01-01") f_time_end = date_parse("2015-01-04") f_correct = (range_start, f_time_end, datetime.timedelta(days=2)) cases.append(("f", f_time_start, f_time_end, f_correct)) # G right into middle g_time_start = date_parse("2015-01-05") g_time_end = date_parse("2015-01-08") g_correct = (g_time_start, range_end, datetime.timedelta(days=2)) cases.append(("g", g_time_start, g_time_end, g_correct)) # H inner left touching h_time_start = date_parse("2015-01-03") h_time_end = date_parse("2015-01-04") h_correct = (h_time_start, h_time_end, datetime.timedelta(seconds=0)) cases.append(("h", h_time_start, h_time_end, h_correct)) # I inner right touching i_time_start = date_parse("2015-01-05") i_time_end = date_parse("2015-01-06") i_correct = (i_time_start, i_time_end, datetime.timedelta(seconds=0)) cases.append(("i", i_time_start, i_time_end, i_correct)) # J outer left not touching j_time_start = date_parse("2015-01-01") j_time_end = date_parse("2015-01-02") j_correct = (None, None, range_full_remaining) cases.append(("j", j_time_start, j_time_end, j_correct)) # K outer right not touching k_time_start = date_parse("2015-01-07") k_time_end = date_parse("2015-01-08") k_correct = (None, None, range_full_remaining) cases.append(("k", k_time_start, k_time_end, k_correct)) for case_name, ts, te, correct in cases: result = split_event(range_start, range_end, ts, te) assert result == correct TIME_FIXTURES = ( (datetime.timedelta(minutes=2), "00:02:00"), (datetime.timedelta(days=1, minutes=2), "1 day, 00:02:00"), (datetime.timedelta(days=2, minutes=2), "2 days, 00:02:00"), ( datetime.timedelta(days=1, hours=2, minutes=3, seconds=5, microseconds=700000), "1 day, 02:03:05.700000", ), (datetime.timedelta(days=-1, minutes=2), "-1 day, 00:02:00"), (datetime.timedelta(days=-1, minutes=2, microseconds=1), "-1 day, 00:02:00.000001"), (datetime.timedelta(days=-1, minutes=-2), "-2 days, 23:58:00"), (datetime.timedelta(days=-1, microseconds=-1), "-2 days, 23:59:59.999999"), ( datetime.timedelta(days=-2, seconds=86399, microseconds=999999), "-2 days, 23:59:59.999999", ), ) class Test: @pytest.mark.parametrize("td_a,td_b_str", TIME_FIXTURES) def test_timedelta_str_to_td_00(self, td_a, td_b_str): td_a_str = timedelta_to_str(td_a) td_b = timedelta_str_to_td(td_b_str) print() print(f"{td_a=} {td_b=}") print(f"{str(td_a)=} {str(td_b)=}") print(td_a.total_seconds(), td_b.total_seconds()) print(f"{td_a_str=} {td_b_str=}") assert td_a.total_seconds() == td_b.total_seconds() assert td_a == td_b assert td_a_str == td_b_str def test_get_now_00(): now = get_now() now2 = get_now() assert (now2 - now).total_seconds() < 1 now = get_now(notz=True) now2 = get_now(notz=True) assert (now2 - now).total_seconds() < 1 assert now.tzinfo is None assert now2.tzinfo is None now = get_now(notz=False) now2 = get_now(notz=False) assert (now2 - now).total_seconds() < 1 assert now.tzinfo is not None assert now2.tzinfo is not None def test_get_daily_ranges(): range_start = datetime.datetime.fromisoformat("2020-01-01") range_end = datetime.datetime.fromisoformat("2020-01-07") ranges = get_daily_ranges(range_start, range_end - datetime.timedelta(days=1)) ranges = [(ts.isoformat(), te.isoformat()) for ts, te in ranges] assert ranges == [ ("2020-01-01T00:00:00", "2020-01-02T00:00:00"), ("2020-01-02T00:00:00", "2020-01-03T00:00:00"), ("2020-01-03T00:00:00", "2020-01-04T00:00:00"), ("2020-01-04T00:00:00", "2020-01-05T00:00:00"), ("2020-01-05T00:00:00", "2020-01-06T00:00:00"), ("2020-01-06T00:00:00", "2020-01-07T00:00:00"), ] def test_IntervalRange_00(): range_start = datetime.datetime.fromisoformat("2020-01-01") range_end = datetime.datetime.fromisoformat("2020-01-07") ir = IntervalRange(range_start, range_end) interval = datetime.timedelta(minutes=15) for tb, ts, ta in iterate_chain(ir.get_range(interval)): if tb and ts: assert ts - tb == interval intervals = ir.get_range_tuple(interval) for ts, te in intervals: assert te - ts == interval assert (range_end - range_start).total_seconds() / (15 * 60 ) == len(intervals)