Newer
Older
# Copyright (C) 2024, UChicago Argonne, LLC
# Licensed under the 3-clause BSD license. See accompanying LICENSE.txt file
# in the top-level directory.
import datetime
import pytest
from dateutil.parser import parse as date_parse
from Octeres.forthwith import (
split_event,
timedelta_str_to_td,
timedelta_to_str,
get_now,
)
from Octeres.forthwith import (
datetime_to_epoch,
epoch_to_datetime,
FORMAT_DATE_FULL,
get_daily_ranges,
from Octeres.util import iterate_chain
def input_datetimes():
lst = [
(date_parse("2023-01-01"), "2023-01-01 00:00:00.000000", 1672531200),
(date_parse("2023-01-01"), "2023-01-01 00:00:00.000000", 1672531200.0),
(
date_parse("2023-01-01 00:00:00.000001"),
"2023-01-01 00:00:00.000001",
1672531200.000001,
),
(
date_parse("1970-01-01 00:00:00.000001"),
"1970-01-01 00:00:00.000001",
0.000001,
),
]
return lst
@pytest.mark.parametrize("dt_obj,dt_str,epoch", input_datetimes())
def test_datetime_to_from_epoch(dt_obj, dt_str, epoch):
result_epoch = datetime_to_epoch(dt_obj)
result_dt = epoch_to_datetime(epoch)
result_dt_str = dt_obj.strftime(FORMAT_DATE_FULL)
result_dt_obj = datetime.datetime.strptime(dt_str, FORMAT_DATE_FULL)
assert result_epoch == epoch
assert result_dt == dt_obj
assert result_dt_str == dt_str
assert result_dt_obj == dt_obj
print(result_epoch)
class Test_split_event:
def test_split_event_00(self):
"""
Test a event within a range.
"""
rs = date_parse("2016-01-01")
re = date_parse("2016-02-01")
ts = date_parse("2016-01-01")
te = date_parse("2016-01-02")
correct = (ts, te, datetime.timedelta(seconds=0))
result = split_event(rs, re, ts, te)
assert result == correct
def test_split_event_01(self):
"""
Test a event crossing a range.
"""
rs = date_parse("2016-01-01")
re = date_parse("2016-02-01")
ts = date_parse("2016-01-15")
te = date_parse("2016-02-05")
correct = (ts, re, datetime.timedelta(days=4))
result = split_event(rs, re, ts, te)
assert result == correct
def test_split_event_all(self):
range_start = date_parse("2015-01-03")
range_end = date_parse("2015-01-06")
range_full_remaining = range_end - range_start
cases = []
# A outer left touching
a_time_start = date_parse("2015-01-01")
a_time_end = date_parse("2015-01-03")
a_correct = (None, None, range_full_remaining)
cases.append(("a", a_time_start, a_time_end, a_correct))
b_time_start = date_parse("2015-01-03")
b_time_end = date_parse("2015-01-06")
b_correct = (b_time_start, b_time_end, datetime.timedelta(seconds=0))
cases.append(("b", b_time_start, b_time_end, b_correct))
c_time_start = date_parse("2015-01-06")
c_time_end = date_parse("2015-01-08")
c_correct = (None, None, range_full_remaining)
cases.append(("c", c_time_start, c_time_end, c_correct))
d_time_start = date_parse("2015-01-04")
d_time_end = date_parse("2015-01-05")
d_correct = (d_time_start, d_time_end, datetime.timedelta(seconds=0))
cases.append(("d", d_time_start, d_time_end, d_correct))
e_time_start = date_parse("2015-01-02")
e_time_end = date_parse("2015-01-07")
e_correct = (range_start, range_end, datetime.timedelta(days=2))
cases.append(("e", e_time_start, e_time_end, e_correct))
f_time_start = date_parse("2015-01-01")
f_time_end = date_parse("2015-01-04")
f_correct = (range_start, f_time_end, datetime.timedelta(days=2))
cases.append(("f", f_time_start, f_time_end, f_correct))
g_time_start = date_parse("2015-01-05")
g_time_end = date_parse("2015-01-08")
g_correct = (g_time_start, range_end, datetime.timedelta(days=2))
cases.append(("g", g_time_start, g_time_end, g_correct))
h_time_start = date_parse("2015-01-03")
h_time_end = date_parse("2015-01-04")
h_correct = (h_time_start, h_time_end, datetime.timedelta(seconds=0))
cases.append(("h", h_time_start, h_time_end, h_correct))
i_time_start = date_parse("2015-01-05")
i_time_end = date_parse("2015-01-06")
i_correct = (i_time_start, i_time_end, datetime.timedelta(seconds=0))
cases.append(("i", i_time_start, i_time_end, i_correct))
# J outer left not touching
j_time_start = date_parse("2015-01-01")
j_time_end = date_parse("2015-01-02")
j_correct = (None, None, range_full_remaining)
cases.append(("j", j_time_start, j_time_end, j_correct))
# K outer right not touching
k_time_start = date_parse("2015-01-07")
k_time_end = date_parse("2015-01-08")
k_correct = (None, None, range_full_remaining)
cases.append(("k", k_time_start, k_time_end, k_correct))
for case_name, ts, te, correct in cases:
result = split_event(range_start, range_end, ts, te)
assert result == correct
TIME_FIXTURES = (
(datetime.timedelta(minutes=2), "00:02:00"),
(datetime.timedelta(days=1, minutes=2), "1 day, 00:02:00"),
(datetime.timedelta(days=2, minutes=2), "2 days, 00:02:00"),
(
datetime.timedelta(days=1, hours=2, minutes=3, seconds=5, microseconds=700000),
"1 day, 02:03:05.700000",
),
(datetime.timedelta(days=-1, minutes=2), "-1 day, 00:02:00"),
(datetime.timedelta(days=-1, minutes=2, microseconds=1), "-1 day, 00:02:00.000001"),
(datetime.timedelta(days=-1, minutes=-2), "-2 days, 23:58:00"),
(datetime.timedelta(days=-1, microseconds=-1), "-2 days, 23:59:59.999999"),
(
datetime.timedelta(days=-2, seconds=86399, microseconds=999999),
"-2 days, 23:59:59.999999",
),
@pytest.mark.parametrize("td_a,td_b_str", TIME_FIXTURES)
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def test_timedelta_str_to_td_00(self, td_a, td_b_str):
td_a_str = timedelta_to_str(td_a)
td_b = timedelta_str_to_td(td_b_str)
print()
print(f"{td_a=} {td_b=}")
print(f"{str(td_a)=} {str(td_b)=}")
print(td_a.total_seconds(), td_b.total_seconds())
print(f"{td_a_str=} {td_b_str=}")
assert td_a.total_seconds() == td_b.total_seconds()
assert td_a == td_b
assert td_a_str == td_b_str
def test_get_now_00():
now = get_now()
now2 = get_now()
assert (now2 - now).total_seconds() < 1
now = get_now(notz=True)
now2 = get_now(notz=True)
assert (now2 - now).total_seconds() < 1
assert now.tzinfo is None
assert now2.tzinfo is None
now = get_now(notz=False)
now2 = get_now(notz=False)
assert (now2 - now).total_seconds() < 1
assert now.tzinfo is not None
assert now2.tzinfo is not None
def test_get_daily_ranges():
range_start = datetime.datetime.fromisoformat("2020-01-01")
range_end = datetime.datetime.fromisoformat("2020-01-07")
ranges = get_daily_ranges(range_start, range_end - datetime.timedelta(days=1))
ranges = [(ts.isoformat(), te.isoformat()) for ts, te in ranges]
assert ranges == [
("2020-01-01T00:00:00", "2020-01-02T00:00:00"),
("2020-01-02T00:00:00", "2020-01-03T00:00:00"),
("2020-01-03T00:00:00", "2020-01-04T00:00:00"),
("2020-01-04T00:00:00", "2020-01-05T00:00:00"),
("2020-01-05T00:00:00", "2020-01-06T00:00:00"),
("2020-01-06T00:00:00", "2020-01-07T00:00:00"),
]
def test_IntervalRange_00():
range_start = datetime.datetime.fromisoformat("2020-01-01")
range_end = datetime.datetime.fromisoformat("2020-01-07")
ir = IntervalRange(range_start, range_end)
interval = datetime.timedelta(minutes=15)
for tb, ts, ta in iterate_chain(ir.get_range(interval)):
if tb and ts:
assert ts - tb == interval
intervals = ir.get_range_tuple(interval)
for ts, te in intervals:
assert te - ts == interval
assert (range_end - range_start).total_seconds() / (15 * 60 ) == len(intervals)