diff --git a/tests/e2e/test_read_only_run.py b/tests/e2e/test_read_only_run.py index be19316..6e9fa24 100644 --- a/tests/e2e/test_read_only_run.py +++ b/tests/e2e/test_read_only_run.py @@ -7,6 +7,8 @@ timezone, ) +from neptune_fetcher import ReadOnlyRun + NEPTUNE_PROJECT = os.getenv("NEPTUNE_E2E_PROJECT") @@ -14,6 +16,12 @@ def unique_path(prefix): return f"{prefix}__{datetime.now(timezone.utc).isoformat('-', 'seconds')}__{str(uuid.uuid4())[-4:]}" +def refresh(ro_run: ReadOnlyRun): + """Create a new ReadOnlyRun instance with the same project and custom_id, + which is basically a "refresh" operation""" + return ReadOnlyRun(read_only_project=ro_run.project, custom_id=ro_run["sys/custom_run_id"].fetch()) + + def random_series(length=10, start_step=0): """Return a 2-tuple of step and value lists, both of length `length`""" assert length > 0 @@ -85,7 +93,7 @@ def test_series_no_prefetch(run, ro_run): assert df["value"].tolist() == values -def test_series_with_prefetch(run, ro_run): +def test_single_series_with_prefetch(run, ro_run): path = unique_path("test_series/series_with_prefetch") steps, values = random_series() @@ -95,13 +103,31 @@ def test_series_with_prefetch(run, ro_run): run.wait_for_processing() - ro_run.prefetch_series_values([path]) + ro_run.prefetch_series_values([path], use_threads=True) df = ro_run[path].fetch_values() assert df["step"].tolist() == steps assert df["value"].tolist() == values +def test_multiple_series_with_prefetch(run, ro_run): + path_base = unique_path("test_series/many_series_with_prefetch") + data = {f"{path_base}-{i}": i for i in range(20)} + + run.log_metrics(data, step=1) + run.wait_for_processing() + + ro_run = refresh(ro_run) + paths = [p for p in ro_run.field_names if p.startswith(path_base)] + assert len(paths) == len(data), "Not all data was logged" + + ro_run.prefetch_series_values(paths, use_threads=True) + for path in paths: + df = ro_run[path].fetch_values() + assert df["step"].tolist() == [1] + assert df["value"].tolist() == [data[path]] + + def test_series_fetch_and_append(run, ro_run): """Fetch a series, then append, then fetch again -- the new data points should be there"""