Skip to content
Snippets Groups Projects
Commit dc583365 authored by Eric Pershey's avatar Eric Pershey
Browse files

adding tests and a few new functions.

parent ee75083b
No related branches found
No related tags found
No related merge requests found
......@@ -11,9 +11,23 @@ V = TypeVar("V")
def merge_dictionaries(dct_a: Dict[K, V], dct_b: Dict[K, V], default: object, operation: Callable) -> Dict[K, V]:
"""given two dictionaries, merge them. If the key is in both, use the operation to combine
the two values."""
dct_c = OrderedDict()
for key_a, value_a in dct_a.items():
dct_c[key_a] = dct_a[key_a]
for key_b, value_b in dct_b.items():
dct_c[key_b] = operation(dct_c.get(key_b, default), value_b)
return dct_c
class partial_arg_kw:
"""a helper function to apply arguments and kwargs to a function.
partial args are applied first and kwargs are applied after the partial kwargs."""
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __call__(self, *args, **kwargs):
return self.func(*self.args, *args, **kwargs, **self.kwargs)
......@@ -82,6 +82,8 @@ def spark_hostnames_to_bitmask(hostnames):
def spark_join_bitmasks(bitmask_lst: list, operation="or") -> LiteBitmaskSlots:
if operation == 'or':
if type(bitmask_lst) == list:
# FIXME: need to check for len(bitmask_lst) > 0: or this will throw an exception.
# also, consider adding length to join_bitmasks to return a zero bitmask.
bitmask = join_bitmasks(bitmask_lst, operation=operation)
else:
raise NotImplementedError(f"type {type(bitmask_lst)} not supported")
......
......@@ -212,6 +212,23 @@ class IntervalRange:
range_lst.append(x1)
return range_lst
def get_range_tuple(self, interval):
"""Return a range from the start to end at an interval, in tuples of interval start to end"""
range_lst = []
if type(interval) == timedelta:
interval_count = int(timedelta_to_secondtime(self.delta) / timedelta_to_secondtime(interval))
if (timedelta_to_secondtime(self.delta) % timedelta_to_secondtime(interval)) > 0:
interval_count += 1
else:
interval_count = int(self.delta / interval)
if (self.delta % interval) > 0:
interval_count += 1
for interval_index in range(interval_count):
x1 = self.start + (interval_index * interval)
x2 = self.start + ((interval_index + 1) * interval)
range_lst.append((x1, x2))
return range_lst
def in_range(self, point):
"""For something to be in a range it must be >= start and < the end"""
if self.start <= point < self.end:
......
......@@ -16,7 +16,9 @@ from Octeres.forthwith import (
epoch_to_datetime,
FORMAT_DATE_FULL,
get_daily_ranges,
IntervalRange,
)
from Octeres.util import iterate_chain
def input_datetimes():
......@@ -219,3 +221,17 @@ def test_get_daily_ranges():
("2020-01-05T00:00:00", "2020-01-06T00:00:00"),
("2020-01-06T00:00:00", "2020-01-07T00:00:00"),
]
def test_IntervalRange_00():
range_start = datetime.datetime.fromisoformat("2020-01-01")
range_end = datetime.datetime.fromisoformat("2020-01-07")
ir = IntervalRange(range_start, range_end)
interval = datetime.timedelta(minutes=15)
for tb, ts, ta in iterate_chain(ir.get_range(interval)):
if tb and ts:
assert ts - tb == interval
intervals = ir.get_range_tuple(interval)
for ts, te in intervals:
assert te - ts == interval
assert (range_end - range_start).total_seconds() / (15 * 60 ) == len(intervals)
......@@ -5,18 +5,30 @@
"""
Unit tests for timeline.
"""
import cProfile
import datetime
import pstats
from _decimal import Underflow, Overflow
from pprint import pformat
import random
from pstats import SortKey
from typing import List
import numpy as np
import numpy.testing as npt
import pandas as pd
from dateutil.parser import parse as date_parse
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
from Ocean.schema_lookup import schema_bitmask
from Octeres.bitmask import BASE_DTYPE_MAX
from Octeres.bitmask import eb
from Octeres.timeline import Dependency_Funcs, TLCollisionOverlap, TimelinePit
from Octeres.bitmask.bitmask_lite import LiteBitmask, LiteBitmaskSlots
from Octeres.data_generation import EventGeneration
from Octeres.forthwith import FORMAT_DATE_DAY
from Octeres.timeline import Dependency_Funcs, TLCollisionOverlap, TimelinePit, TimelineParallel
from Octeres.timeline import Event_Handler
from Octeres.timeline import (
Timeline,
......@@ -27,9 +39,20 @@ from Octeres.timeline import (
)
from Octeres.timeline import TimelineDict, TimelineMask
from Octeres.timeline import reduce_holes, Point_in_Time
from Octeres.util import superprint
from Octeres.util import superprint, df_to_lstofdct
import pytest
try:
import dask
except ImportError:
dask = None
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 512)
pd.set_option("display.max_colwidth", None)
pd.set_option("display.float_format", "{:.6f}".format)
class Test_Timeline:
@classmethod
......@@ -40,9 +63,9 @@ class Test_Timeline:
base_mask = eb.zeros(8)
cls.tl = Timeline(machine_name, range_start, range_end, base_mask)
def test_get_possible_unit_seconds(self):
def test_possible_unit_seconds(self):
tl = self.tl
possible_unit_seconds = tl.get_possible_unit_seconds()
possible_unit_seconds = tl.possible_unit_seconds
correct_unit_seconds = (tl.range_end - tl.range_start).total_seconds() * len(tl.base_mask)
assert possible_unit_seconds == correct_unit_seconds
......@@ -180,7 +203,7 @@ class Test_Timeline:
base_mask = self.tl.base_mask
tl = self.tl
possible_unit_seconds = tl.get_possible_unit_seconds()
possible_unit_seconds = tl.possible_unit_seconds
event_handler = Event_Handler()
event_lst = list()
......@@ -753,122 +776,112 @@ def test_get_mask_sum_03c():
value = pit.get_mask_sum()
# def test_get_mask_sum_04():
# pit = Point_in_Time()
#
# bitmasks = [
# eb.array([0, 0, 0, 1, 1, 0, 0, 0]),
# eb.array([0, 0, 0, 1, 1, 0, 1, 0]),
# eb.array([0, 1, 1, 1, 1, 0, 0, 0]),
# eb.array([1, 0, 0, 1, 1, 0, 0, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.POSITIVE,
# )
# pit.positive_add(dct)
# value = pit.get_mask_sum()
# npt.assert_array_equal(value, eb.array([1, 1, 1, 4, 4, 0, 1, 0]))
#
# def test_get_mask_sum_05():
# pit = Point_in_Time()
#
# bitmasks = [
# eb.array([0, 0, 0, 1, 1, 0, 0, 0]),
# eb.array([0, 0, 0, 1, 1, 0, 1, 0]),
# eb.array([0, 1, 1, 1, 1, 0, 0, 0]),
# eb.array([1, 0, 0, 1, 1, 0, 0, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.POSITIVE,
# )
# pit.positive_add(dct)
# value = pit.get_mask_sum()
# npt.assert_array_equal(value, eb.array([1, 1, 1, 4, 4, 0, 1, 0]))
#
# bitmasks = [
# eb.array([0, 0, 1, 1, 0, 0, 0, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.NEGATIVE,
# )
# pit.negative_add(dct)
# value = pit.get_mask_sum()
# npt.assert_array_equal(value, eb.array([1, 1, 0, 3, 4, 0, 1, 0]))
#
# def test_get_mask_sum_06():
# pit = Point_in_Time()
#
# bitmasks = [
# eb.array([0, 0, 0, 1, 1, 0, 0, 0]),
# eb.array([0, 0, 0, 1, 1, 0, 1, 0]),
# eb.array([0, 1, 1, 1, 1, 0, 0, 0]),
# eb.array([1, 0, 0, 1, 1, 0, 0, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.POSITIVE,
# )
# pit.positive_add(dct)
# value = pit.get_mask_sum()
# npt.assert_array_equal(value, eb.array([1, 1, 1, 4, 4, 0, 1, 0]))
#
# bitmasks = [
# eb.array([0, 0, 1, 1, 0, 0, 0, 0]),
# eb.array([0, 0, 0, 0, 0, 0, 1, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.NEGATIVE,
# )
# pit.negative_add(dct)
# value = pit.get_mask_sum()
# npt.assert_array_equal(value, eb.array([1, 1, 0, 3, 4, 0, 0, 0]))
#
# def test_get_mask_sum_07():
# pit = Point_in_Time()
#
# bitmasks = [
# eb.array([0, 0, 1, 1, 0, 0, 0, 0]),
# eb.array([0, 0, 0, 0, 0, 0, 1, 0]),
# ]
# for idx, bitmask in enumerate(bitmasks):
# dct: TLPointInTime = TLPointInTime(
# pk=str(idx),
# name=str(idx),
# category="a",
# bitmask=bitmask,
# ts=date_parse('2020-01-01'),
# direction=Event_Direction.NEGATIVE,
# )
# pit.negative_add(dct)
# value = pit.get_mask_sum()
# # underflow!!
# npt.assert_array_equal(value, eb.array([0, 0, 255, 255, 0, 0, 255, 0]))
@pytest.mark.skipif(dask is None, reason="could not import dask")
def test_the_gauntlet():
# requires dask, dask[distributed]
machine_name = "test_machine"
dtype = "U2"
empty_value = " "
test_value = ".."
bit_total = 4000 # 100000
bitmask_class = LiteBitmask
time_seconds = 3600 # 86400
range_start = datetime.datetime.strptime("2024-01-01", FORMAT_DATE_DAY)
range_end = range_start + datetime.timedelta(seconds=time_seconds)
eg = EventGeneration(
time_seconds,
bit_total,
dtype=dtype,
empty_value=empty_value,
test_value=test_value,
visualize=False,
bitmask_class=bitmask_class,
seed=42,
)
characters = []
characters.extend(list(range(97, 122 + 1)))
characters.extend(list(range(65, 90 + 1)))
characters.extend(list(range(48, 57 + 1)))
characters = list(map(chr, characters))
event_names = eg.generate_names_n_level(characters, 3)
events = eg.generate_non_overlapping_scattered_events_v0(event_names)
event_lst = events.event_lst
correct_unit_seconds = time_seconds * bit_total
correct_possible_unit_seconds = (range_end - range_start).total_seconds() * bit_total
import dask.dataframe as dd
pdf = pd.DataFrame(event_lst)
ddf = dd.from_pandas(pdf, npartitions=8)
ddf = ddf.drop(['x', 'y'], axis=1)
ddf['bitmask'] = ddf['bitmask'].apply(lambda b: b.to_dict(), meta=('bitmask', 'object'))
ddf['time_start'] = ddf['ts'].apply(lambda tsi: pd.to_datetime(range_start + datetime.timedelta(seconds=tsi, microseconds=int(random.uniform(0, 1) * 1000000)), utc=False), meta=('ts', "object"))
ddf['time_end'] = ddf['te'].apply(lambda tsi: pd.to_datetime(range_start + datetime.timedelta(seconds=tsi, microseconds=int(random.uniform(0, 1) * 1000000)), utc=False), meta=('te', 'object'))
ddf['pk'] = ddf['name']
ddf['event_type_name'] = 'job'
pdf2 = ddf.compute()
event_lst = df_to_lstofdct(pdf2)
for event_dct in event_lst:
event_dct['bitmask'] = LiteBitmask.from_dict(event_dct['bitmask'])
# base_mask = bitmask_class.zeros(bit_total)
# tl = Timeline(machine_name, range_start, range_end, base_mask)
# possible_unit_seconds = tl.possible_unit_seconds
# with cProfile.Profile() as pr:
# timeline_lst = tl.prepare_event_timeline(event_lst)
# timeline_dct = tl.group_timeline(timeline_lst)
# timeline_sorted = tl.sort_timeline(timeline_dct)
# mask_timeline = tl.normalize_timeline_pit(timeline_sorted)
# mask_timeline = tl.sum_timeline(mask_timeline, test_negative=False)
# # tl.print_timeline(mask_timeline, binary=True)
# mask_timeline = tl.flatten_timeline(mask_timeline)
# mask_timeline = tl.isolate_timeline_range(mask_timeline)
# # tl.print_timeline(mask_timeline)
# mag_timeline = tl.mask_timeline_to_mag_timeline(mask_timeline)
# consumed_unit_seconds = tl.calculate_area(mag_timeline)
# assert round(abs(consumed_unit_seconds - correct_unit_seconds), 4) == 0
# assert possible_unit_seconds == correct_possible_unit_seconds
# profile_result = pstats.Stats(pr)
# profile_result.sort_stats(SortKey.CUMULATIVE).print_stats(4)
# print(f"{len(event_lst)=} {len(timeline_lst)=}")
base_mask = bitmask_class.zeros(bit_total)
tl = TimelineParallel(machine_name, range_start, range_end, base_mask, processes=16)
possible_unit_seconds = tl.possible_unit_seconds
with cProfile.Profile() as pr:
tl.load(event_lst)
tl.run()
consumed_unit_seconds = tl.calculate_area()
# assert round(abs(consumed_unit_seconds - correct_unit_seconds), 4) == 0
# assert possible_unit_seconds == correct_possible_unit_seconds
profile_result = pstats.Stats(pr)
profile_result.sort_stats(SortKey.CUMULATIVE).print_stats(16)
# print(f"{len(event_lst)=} {len(timeline_lst)=}")
import ipdb; ipdb.set_trace()
from dask.distributed import Client, Queue
client = Client()
spark: DataFrame = SparkSession.builder.appName("test_the_gauntlet").getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "UTC")
schema = StructType(
[
StructField("name", StringType(), False),
StructField("ts", TimestampType(), False), # aurora_crayex_alerts, obtain from query
StructField("te", TimestampType(), False), # goes back to level1 -> table
schema_bitmask,
])
sdf = spark.createDataFrame(pdf2, schema=schema) # if you don't provide a schema, the intervals in the bitmask won't cast right.
sdf.show(32, truncate=False)
pandas_partitions = Queue()
def convert_to_pandas(iterator):
for partition in iterator:
print(partition)
yield pd.DataFrame(list(partition))
print(f"{sdf.rdd.getNumPartitions()=}")
pdf3 = sdf.rdd.mapPartitions(convert_to_pandas).collect()
def spark_to_multi_df(df: pd.DataFrame, address):
with Client(address) as client:
[future] = client.scatter([df])
pandas_partitions.put(future)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment