Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# Copyright (C) 2024, UChicago Argonne, LLC
# Licensed under the 3-clause BSD license. See accompanying LICENSE.txt file
# in the top-level directory.
import datetime
import pytest
from dateutil.parser import parse as date_parse
from Octeres.forthwith import split_event, timedelta_str_to_td, timedelta_to_str, get_now
from Octeres.forthwith import datetime_to_epoch, epoch_to_datetime, FORMAT_DATE_FULL, get_daily_ranges
def input_datetimes():
lst = [
(date_parse('2023-01-01'), '2023-01-01 00:00:00.000000', 1672531200),
(date_parse('2023-01-01'), '2023-01-01 00:00:00.000000', 1672531200.0),
(date_parse('2023-01-01 00:00:00.000001'), '2023-01-01 00:00:00.000001', 1672531200.000001),
(date_parse('1970-01-01 00:00:00.000001'), '1970-01-01 00:00:00.000001', 0.000001),
]
return lst
@pytest.mark.parametrize("dt_obj,dt_str,epoch", input_datetimes())
def test_datetime_to_from_epoch(dt_obj, dt_str, epoch):
result_epoch = datetime_to_epoch(dt_obj)
result_dt = epoch_to_datetime(epoch)
result_dt_str = dt_obj.strftime(FORMAT_DATE_FULL)
result_dt_obj = datetime.datetime.strptime(dt_str, FORMAT_DATE_FULL)
assert result_epoch == epoch
assert result_dt == dt_obj
assert result_dt_str == dt_str
assert result_dt_obj == dt_obj
print(result_epoch)
class Test_split_event:
def test_split_event_00(self):
"""
Test a event within a range.
"""
rs = date_parse('2016-01-01')
re = date_parse('2016-02-01')
ts = date_parse('2016-01-01')
te = date_parse('2016-01-02')
correct = (ts, te, datetime.timedelta(seconds=0))
result = split_event(rs, re, ts, te)
assert result == correct
def test_split_event_01(self):
"""
Test a event crossing a range.
"""
rs = date_parse('2016-01-01')
re = date_parse('2016-02-01')
ts = date_parse('2016-01-15')
te = date_parse('2016-02-05')
correct = (ts, re, datetime.timedelta(days=4))
result = split_event(rs, re, ts, te)
assert result == correct
def test_split_event_all(self):
range_start = date_parse('2015-01-03')
range_end = date_parse('2015-01-06')
range_full_remaining = range_end - range_start
cases = []
# A outer left touching
a_time_start = date_parse('2015-01-01')
a_time_end = date_parse('2015-01-03')
a_correct = (None, None, range_full_remaining)
cases.append(('a', a_time_start, a_time_end, a_correct))
# B exact left, right
b_time_start = date_parse('2015-01-03')
b_time_end = date_parse('2015-01-06')
b_correct = (b_time_start, b_time_end, datetime.timedelta(seconds=0))
cases.append(('b', b_time_start, b_time_end, b_correct))
# C outer right touching
c_time_start = date_parse('2015-01-06')
c_time_end = date_parse('2015-01-08')
c_correct = (None, None, range_full_remaining)
cases.append(('c', c_time_start, c_time_end, c_correct))
# D inside
d_time_start = date_parse('2015-01-04')
d_time_end = date_parse('2015-01-05')
d_correct = (d_time_start, d_time_end, datetime.timedelta(seconds=0))
cases.append(('d', d_time_start, d_time_end, d_correct))
# E span
e_time_start = date_parse('2015-01-02')
e_time_end = date_parse('2015-01-07')
e_correct = (range_start, range_end, datetime.timedelta(days=2))
cases.append(('e', e_time_start, e_time_end, e_correct))
# F left into middle
f_time_start = date_parse('2015-01-01')
f_time_end = date_parse('2015-01-04')
f_correct = (range_start, f_time_end, datetime.timedelta(days=2))
cases.append(('f', f_time_start, f_time_end, f_correct))
# G right into middle
g_time_start = date_parse('2015-01-05')
g_time_end = date_parse('2015-01-08')
g_correct = (g_time_start, range_end, datetime.timedelta(days=2))
cases.append(('g', g_time_start, g_time_end, g_correct))
# H inner left touching
h_time_start = date_parse('2015-01-03')
h_time_end = date_parse('2015-01-04')
h_correct = (h_time_start, h_time_end, datetime.timedelta(seconds=0))
cases.append(('h', h_time_start, h_time_end, h_correct))
# I inner right touching
i_time_start = date_parse('2015-01-05')
i_time_end = date_parse('2015-01-06')
i_correct = (i_time_start, i_time_end, datetime.timedelta(seconds=0))
cases.append(('i', i_time_start, i_time_end, i_correct))
# J outer left not touching
j_time_start = date_parse('2015-01-01')
j_time_end = date_parse('2015-01-02')
j_correct = (None, None, range_full_remaining)
cases.append(('j', j_time_start, j_time_end, j_correct))
# K outer right not touching
k_time_start = date_parse('2015-01-07')
k_time_end = date_parse('2015-01-08')
k_correct = (None, None, range_full_remaining)
cases.append(('k', k_time_start, k_time_end, k_correct))
for case_name, ts, te, correct in cases:
result = split_event(range_start, range_end, ts, te)
assert result == correct
TIME_FIXTURES = (
(datetime.timedelta(minutes=2), '00:02:00'),
(datetime.timedelta(days=1, minutes=2), '1 day, 00:02:00'),
(datetime.timedelta(days=2, minutes=2), '2 days, 00:02:00'),
(datetime.timedelta(days=1, hours=2, minutes=3, seconds=5, microseconds=700000), '1 day, 02:03:05.700000'),
(datetime.timedelta(days=-1, minutes=2), '-1 day, 00:02:00'),
(datetime.timedelta(days=-1, minutes=2, microseconds=1), '-1 day, 00:02:00.000001'),
(datetime.timedelta(days=-1, minutes=-2), '-2 days, 23:58:00'),
(datetime.timedelta(days=-1, microseconds=-1), '-2 days, 23:59:59.999999'),
(datetime.timedelta(days=-2, seconds=86399, microseconds=999999), '-2 days, 23:59:59.999999'),
)
class Test:
@pytest.mark.parametrize('td_a,td_b_str', TIME_FIXTURES)
def test_timedelta_str_to_td_00(self, td_a, td_b_str):
td_a_str = timedelta_to_str(td_a)
td_b = timedelta_str_to_td(td_b_str)
print()
print(f"{td_a=} {td_b=}")
print(f"{str(td_a)=} {str(td_b)=}")
print(td_a.total_seconds(), td_b.total_seconds())
print(f"{td_a_str=} {td_b_str=}")
assert td_a.total_seconds() == td_b.total_seconds()
assert td_a == td_b
assert td_a_str == td_b_str
def test_get_now_00():
now = get_now()
now2 = get_now()
assert (now2 - now).total_seconds() < 1
now = get_now(notz=True)
now2 = get_now(notz=True)
assert (now2 - now).total_seconds() < 1
assert now.tzinfo is None
assert now2.tzinfo is None
now = get_now(notz=False)
now2 = get_now(notz=False)
assert (now2 - now).total_seconds() < 1
assert now.tzinfo is not None
assert now2.tzinfo is not None
def test_get_daily_ranges():
range_start = datetime.datetime.fromisoformat('2020-01-01')
range_end = datetime.datetime.fromisoformat('2020-01-07')
ranges = get_daily_ranges(range_start, range_end-datetime.timedelta(days=1))
ranges = [(ts.isoformat(), te.isoformat()) for ts, te in ranges]
assert ranges == [('2020-01-01T00:00:00', '2020-01-02T00:00:00'),
('2020-01-02T00:00:00', '2020-01-03T00:00:00'),
('2020-01-03T00:00:00', '2020-01-04T00:00:00'),
('2020-01-04T00:00:00', '2020-01-05T00:00:00'),
('2020-01-05T00:00:00', '2020-01-06T00:00:00'),
('2020-01-06T00:00:00', '2020-01-07T00:00:00')]