Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: fix flaky test_allocation_csv tests #9953

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions e2e_tests/tests/experiment/test_allocation_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import io
import re
import sys
import uuid
from typing import Optional

Expand All @@ -18,6 +19,11 @@
API_URL = "/resources/allocation/allocations-csv?"


def timestamp_with_offset(delta: int) -> str:
dt = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=delta)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")


def validate_trial_csv_rows(
raw_text: str, experiment_id: int, workspace_name: Optional[str]
) -> None:
Expand Down Expand Up @@ -51,7 +57,8 @@ def test_experiment_capture() -> None:
workspaceId=w1.id,
).project

start_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Avoid any rounding or inclusion errors.
start_time = timestamp_with_offset(-2)

experiment_id = exp.create_experiment(
sess,
Expand All @@ -61,8 +68,9 @@ def test_experiment_capture() -> None:
)
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.COMPLETED)

# Avoid any rounding or inclusion errors.
end_time = timestamp_with_offset(+2)
# Check if an entry exists for experiment that just ran
end_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
assert r.status_code == requests.codes.ok, r.text
validate_trial_csv_rows(r.text, experiment_id, w1.name)
Expand Down Expand Up @@ -104,7 +112,8 @@ def test_experiment_capture() -> None:
@pytest.mark.e2e_cpu
def test_notebook_capture() -> None:
sess = api_utils.admin_session()
start_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Avoid any rounding or inclusion errors.
start_time = timestamp_with_offset(-2)

task_id = None
with cmd.interactive_command(sess, ["notebook", "start"]) as notebook:
Expand All @@ -116,12 +125,18 @@ def test_notebook_capture() -> None:

assert task_id is not None

# Avoid any rounding or inclusion errors.
end_time = timestamp_with_offset(+2)
end_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
sess = api_utils.admin_session()
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
url = f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}"
r = sess.get(url)
assert r.status_code == requests.codes.ok, r.text

assert re.search(f"{task_id}.*,NOTEBOOK", r.text) is not None
if re.search(f"{task_id}.*,NOTEBOOK", r.text) is None:
msg = f"did not find task_id={task_id} @ {url} in output:\n{r.text}"
print(msg, file=sys.stderr)
raise ValueError(msg)

workspace = cluster_utils.get_task_info(sess, "notebook", task_id).get("workspaceName", None)
assert workspace is not None
Expand All @@ -132,7 +147,8 @@ def test_notebook_capture() -> None:
@pytest.mark.e2e_cpu
def test_tensorboard_experiment_capture() -> None:
sess = api_utils.admin_session()
start_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Avoid any rounding or inclusion errors.
start_time = timestamp_with_offset(-2)

experiment_id = exp.create_experiment(
sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op")
Expand All @@ -148,11 +164,10 @@ def test_tensorboard_experiment_capture() -> None:
cluster_utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "RUNNING")
cluster_utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "TERMINATED")

# Ensure that end_time captures tensorboard
end_time = (
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=1)
).strftime("%Y-%m-%dT%H:%M:%SZ")
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
# Avoid any rounding or inclusion errors.
end_time = timestamp_with_offset(+2)
url = f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}"
r = sess.get(url)
assert r.status_code == requests.codes.ok, r.text

# Confirm Experiment is captured and valid
Expand All @@ -161,7 +176,10 @@ def test_tensorboard_experiment_capture() -> None:
assert len(matches) >= 1

# Confirm Tensorboard task is captured
assert re.search(f"{tb.task_id}.*,TENSORBOARD", r.text) is not None
if re.search(f"{tb.task_id}.*,TENSORBOARD", r.text) is None:
msg = f"did not find task_id={tb.task_id} @ {url} in output:\n{r.text}"
print(msg, file=sys.stderr)
raise ValueError(msg)

workspace = cluster_utils.get_task_info(sess, "tensorboard", tb.task_id).get(
"workspaceName", None
Expand All @@ -174,7 +192,8 @@ def test_tensorboard_experiment_capture() -> None:
@pytest.mark.e2e_cpu
def test_cmd_capture() -> None:
sess = api_utils.admin_session()
start_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Avoid any rounding or inclusion errors.
start_time = timestamp_with_offset(-2)

task_id = None
with cmd.interactive_command(sess, ["cmd", "run", "sleep 10s"]) as sleep_cmd:
Expand All @@ -186,12 +205,18 @@ def test_cmd_capture() -> None:

assert task_id is not None

end_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Avoid any rounding or inclusion errors.
end_time = timestamp_with_offset(+2)

sess = api_utils.admin_session()
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
url = f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}"
r = sess.get(url)
assert r.status_code == requests.codes.ok, r.text

assert re.search(f"{task_id}.*,COMMAND", r.text) is not None
if re.search(f"{task_id}.*,COMMAND", r.text) is None:
msg = f"did not find task_id={task_id} @ {url} in output:\n{r.text}"
print(msg, file=sys.stderr)
raise ValueError(msg)

workspace = cluster_utils.get_task_info(sess, "command", task_id).get("workspaceName", None)
assert workspace is not None
Expand Down
Loading