Skip to content
Snippets Groups Projects
test_forthwith.py 7.57 KiB
Newer Older
# Copyright (C) 2024, UChicago Argonne, LLC
# Licensed under the 3-clause BSD license.  See accompanying LICENSE.txt file
# in the top-level directory.

import datetime
import pytest
from dateutil.parser import parse as date_parse
from Octeres.forthwith import split_event, timedelta_str_to_td, timedelta_to_str, get_now
from Octeres.forthwith import datetime_to_epoch, epoch_to_datetime, FORMAT_DATE_FULL, get_daily_ranges


def input_datetimes():
    lst = [
        (date_parse('2023-01-01'), '2023-01-01 00:00:00.000000', 1672531200),
        (date_parse('2023-01-01'), '2023-01-01 00:00:00.000000', 1672531200.0),
        (date_parse('2023-01-01 00:00:00.000001'), '2023-01-01 00:00:00.000001', 1672531200.000001),
        (date_parse('1970-01-01 00:00:00.000001'), '1970-01-01 00:00:00.000001', 0.000001),
    ]
    return lst


@pytest.mark.parametrize("dt_obj,dt_str,epoch", input_datetimes())
def test_datetime_to_from_epoch(dt_obj, dt_str, epoch):
    result_epoch = datetime_to_epoch(dt_obj)
    result_dt = epoch_to_datetime(epoch)
    result_dt_str = dt_obj.strftime(FORMAT_DATE_FULL)
    result_dt_obj = datetime.datetime.strptime(dt_str, FORMAT_DATE_FULL)

    assert result_epoch == epoch
    assert result_dt == dt_obj
    assert result_dt_str == dt_str
    assert result_dt_obj == dt_obj
    print(result_epoch)


class Test_split_event:
    def test_split_event_00(self):
        """
        Test a event within a range.
        """
        rs = date_parse('2016-01-01')
        re = date_parse('2016-02-01')
        ts = date_parse('2016-01-01')
        te = date_parse('2016-01-02')
        correct = (ts, te, datetime.timedelta(seconds=0))
        result = split_event(rs, re, ts, te)
        assert result == correct

    def test_split_event_01(self):
        """
        Test a event crossing a range.
        """
        rs = date_parse('2016-01-01')
        re = date_parse('2016-02-01')
        ts = date_parse('2016-01-15')
        te = date_parse('2016-02-05')
        correct = (ts, re, datetime.timedelta(days=4))
        result = split_event(rs, re, ts, te)
        assert result == correct

    def test_split_event_all(self):
        range_start = date_parse('2015-01-03')
        range_end = date_parse('2015-01-06')
        range_full_remaining = range_end - range_start

        cases = []

        # A outer left touching
        a_time_start = date_parse('2015-01-01')
        a_time_end = date_parse('2015-01-03')
        a_correct = (None, None, range_full_remaining)
        cases.append(('a', a_time_start, a_time_end, a_correct))

        # B exact left, right
        b_time_start = date_parse('2015-01-03')
        b_time_end = date_parse('2015-01-06')
        b_correct = (b_time_start, b_time_end, datetime.timedelta(seconds=0))
        cases.append(('b', b_time_start, b_time_end, b_correct))

        # C outer right touching
        c_time_start = date_parse('2015-01-06')
        c_time_end = date_parse('2015-01-08')
        c_correct = (None, None, range_full_remaining)
        cases.append(('c', c_time_start, c_time_end, c_correct))

        # D inside
        d_time_start = date_parse('2015-01-04')
        d_time_end = date_parse('2015-01-05')
        d_correct = (d_time_start, d_time_end, datetime.timedelta(seconds=0))
        cases.append(('d', d_time_start, d_time_end, d_correct))

        # E span
        e_time_start = date_parse('2015-01-02')
        e_time_end = date_parse('2015-01-07')
        e_correct = (range_start, range_end, datetime.timedelta(days=2))
        cases.append(('e', e_time_start, e_time_end, e_correct))

        # F left into middle
        f_time_start = date_parse('2015-01-01')
        f_time_end = date_parse('2015-01-04')
        f_correct = (range_start, f_time_end, datetime.timedelta(days=2))
        cases.append(('f', f_time_start, f_time_end, f_correct))

        # G right into middle
        g_time_start = date_parse('2015-01-05')
        g_time_end = date_parse('2015-01-08')
        g_correct = (g_time_start, range_end, datetime.timedelta(days=2))
        cases.append(('g', g_time_start, g_time_end, g_correct))

        # H inner left touching
        h_time_start = date_parse('2015-01-03')
        h_time_end = date_parse('2015-01-04')
        h_correct = (h_time_start, h_time_end, datetime.timedelta(seconds=0))
        cases.append(('h', h_time_start, h_time_end, h_correct))

        # I inner right touching
        i_time_start = date_parse('2015-01-05')
        i_time_end = date_parse('2015-01-06')
        i_correct = (i_time_start, i_time_end, datetime.timedelta(seconds=0))
        cases.append(('i', i_time_start, i_time_end, i_correct))

        # J outer left not touching
        j_time_start = date_parse('2015-01-01')
        j_time_end = date_parse('2015-01-02')
        j_correct = (None, None, range_full_remaining)
        cases.append(('j', j_time_start, j_time_end, j_correct))

        # K outer right not touching
        k_time_start = date_parse('2015-01-07')
        k_time_end = date_parse('2015-01-08')
        k_correct = (None, None, range_full_remaining)
        cases.append(('k', k_time_start, k_time_end, k_correct))

        for case_name, ts, te, correct in cases:
            result = split_event(range_start, range_end, ts, te)
            assert result == correct


TIME_FIXTURES = (
    (datetime.timedelta(minutes=2), '00:02:00'),
    (datetime.timedelta(days=1, minutes=2), '1 day, 00:02:00'),
    (datetime.timedelta(days=2, minutes=2), '2 days, 00:02:00'),
    (datetime.timedelta(days=1, hours=2, minutes=3, seconds=5, microseconds=700000), '1 day, 02:03:05.700000'),
    (datetime.timedelta(days=-1, minutes=2), '-1 day, 00:02:00'),
    (datetime.timedelta(days=-1, minutes=2, microseconds=1), '-1 day, 00:02:00.000001'),
    (datetime.timedelta(days=-1, minutes=-2), '-2 days, 23:58:00'),
    (datetime.timedelta(days=-1, microseconds=-1), '-2 days, 23:59:59.999999'),
    (datetime.timedelta(days=-2, seconds=86399, microseconds=999999), '-2 days, 23:59:59.999999'),
)


class Test:
    @pytest.mark.parametrize('td_a,td_b_str', TIME_FIXTURES)
    def test_timedelta_str_to_td_00(self, td_a, td_b_str):
        td_a_str = timedelta_to_str(td_a)
        td_b = timedelta_str_to_td(td_b_str)
        print()
        print(f"{td_a=} {td_b=}")
        print(f"{str(td_a)=} {str(td_b)=}")
        print(td_a.total_seconds(), td_b.total_seconds())
        print(f"{td_a_str=} {td_b_str=}")
        assert td_a.total_seconds() == td_b.total_seconds()
        assert td_a == td_b
        assert td_a_str == td_b_str


def test_get_now_00():
    now = get_now()
    now2 = get_now()
    assert (now2 - now).total_seconds() < 1

    now = get_now(notz=True)
    now2 = get_now(notz=True)
    assert (now2 - now).total_seconds() < 1
    assert now.tzinfo is None
    assert now2.tzinfo is None

    now = get_now(notz=False)
    now2 = get_now(notz=False)
    assert (now2 - now).total_seconds() < 1
    assert now.tzinfo is not None
    assert now2.tzinfo is not None


def test_get_daily_ranges():
    range_start = datetime.datetime.fromisoformat('2020-01-01')
    range_end = datetime.datetime.fromisoformat('2020-01-07')
    ranges = get_daily_ranges(range_start, range_end-datetime.timedelta(days=1))
    ranges = [(ts.isoformat(), te.isoformat()) for ts, te in ranges]
    assert ranges == [('2020-01-01T00:00:00', '2020-01-02T00:00:00'),
                      ('2020-01-02T00:00:00', '2020-01-03T00:00:00'),
                      ('2020-01-03T00:00:00', '2020-01-04T00:00:00'),
                      ('2020-01-04T00:00:00', '2020-01-05T00:00:00'),
                      ('2020-01-05T00:00:00', '2020-01-06T00:00:00'),
                      ('2020-01-06T00:00:00', '2020-01-07T00:00:00')]