Skip to content

Commit

Permalink
test_runner: improve wait_until (#9936)
Browse files Browse the repository at this point in the history
Improves `wait_until` by:

* Use `timeout` instead of `iterations`. This allows changing the
timeout/interval parameters independently.
* Make `timeout` and `interval` optional (default 20s and 0.5s). Most
callers don't care.
* Only output status every 1s by default, and add optional
`status_interval` parameter.
* Remove `show_intermediate_error`, this was always emitted anyway.

Most callers have been updated to use the defaults, except where they
had good reason otherwise.
  • Loading branch information
erikgrinaker authored Dec 2, 2024
1 parent 45658cc commit 5330122
Show file tree
Hide file tree
Showing 46 changed files with 234 additions and 326 deletions.
14 changes: 6 additions & 8 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ def wait_until_ready(self):
def storage_controller_ready():
assert self.ready() is True

wait_until(30, 1, storage_controller_ready)
wait_until(storage_controller_ready)
return time.time() - t1

def attach_hook_issue(
Expand Down Expand Up @@ -2574,7 +2574,7 @@ def complete():
log.info(f"any_unstable={any_unstable}")
assert not any_unstable

wait_until(20, 0.5, complete)
wait_until(complete)

def __enter__(self) -> Self:
return self
Expand Down Expand Up @@ -3973,7 +3973,7 @@ def check_migrations_done():
migration_id: int = cur.fetchall()[0][0]
assert migration_id >= num_migrations

wait_until(20, 0.5, check_migrations_done)
wait_until(check_migrations_done)

# Mock the extension part of spec passed from control plane for local testing
# endpooint.rs adds content of this file as a part of the spec.json
Expand Down Expand Up @@ -4489,12 +4489,10 @@ def are_lsns_advanced():
)
assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn()

# xxx: max wait is long because we might be waiting for reconnection from
# pageserver to this safekeeper
wait_until(30, 1, are_lsns_advanced)
wait_until(are_lsns_advanced)
client.checkpoint(tenant_id, timeline_id)
if wait_wal_removal:
wait_until(30, 1, are_segments_removed)
wait_until(are_segments_removed)

def wait_until_paused(self, failpoint: str):
msg = f"at failpoint {failpoint}"
Expand All @@ -4503,7 +4501,7 @@ def paused():
log.info(f"waiting for hitting failpoint {failpoint}")
self.assert_log_contains(msg)

wait_until(20, 0.5, paused)
wait_until(paused)


class NeonBroker(LogUtils):
Expand Down
17 changes: 3 additions & 14 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
from fixtures.remote_storage import RemoteStorage, S3Storage
from fixtures.utils import wait_until

if TYPE_CHECKING:
Expand Down Expand Up @@ -269,12 +269,7 @@ def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
iterations: int,
interval: float | None = None,
):
if interval is None:
interval = 0.25

def timeline_is_missing():
data = {}
try:
Expand All @@ -287,19 +282,17 @@ def timeline_is_missing():

raise RuntimeError(f"Timeline exists state {data.get('state')}")

wait_until(iterations, interval, func=timeline_is_missing)
wait_until(timeline_is_missing)


def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
iterations: int = 20,
interval: float | None = None,
**delete_args,
) -> None:
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)


# remote_storage must not be None, but that's easier for callers to make mypy happy
Expand Down Expand Up @@ -453,7 +446,3 @@ def many_small_layers_tenant_config() -> dict[str, Any]:
"checkpoint_distance": 1024**2,
"image_creation_threshold": 100,
}


def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 15
2 changes: 1 addition & 1 deletion test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def timeline_start_lsn_non_zero() -> Lsn:
assert s > Lsn(0)
return s

return wait_until(30, 1, timeline_start_lsn_non_zero)
return wait_until(timeline_start_lsn_non_zero)

def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
Expand Down
2 changes: 1 addition & 1 deletion test_runner/fixtures/safekeeper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def walreceivers_absent():
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
assert len(status.walreceivers) == 0

wait_until(30, 0.5, walreceivers_absent)
wait_until(walreceivers_absent)
35 changes: 17 additions & 18 deletions test_runner/fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import threading
import time
from collections.abc import Callable, Iterable
from datetime import datetime, timedelta
from hashlib import sha256
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeVar
Expand Down Expand Up @@ -380,15 +381,10 @@ def start_in_background(
if return_code is not None:
error = f"expected subprocess to run but it exited with code {return_code}"
else:
attempts = 10
try:
wait_until(
number_of_iterations=attempts,
interval=1,
func=is_started,
)
wait_until(is_started, timeout=10)
except Exception:
error = f"Failed to get correct status from subprocess in {attempts} attempts"
error = "Failed to get correct status from subprocess"
except Exception as e:
error = f"expected subprocess to start but it failed with exception: {e}"

Expand All @@ -402,28 +398,31 @@ def start_in_background(


def wait_until(
number_of_iterations: int,
interval: float,
func: Callable[[], WaitUntilRet],
show_intermediate_error: bool = False,
name: str | None = None,
timeout: float = 20.0, # seconds
interval: float = 0.5, # seconds
status_interval: float = 1.0, # seconds
) -> WaitUntilRet:
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
if name is None:
name = getattr(func, "__name__", repr(func))
deadline = datetime.now() + timedelta(seconds=timeout)
next_status = datetime.now()
last_exception = None
for i in range(number_of_iterations):
while datetime.now() <= deadline:
try:
res = func()
return func()
except Exception as e:
log.info("waiting for %s iteration %s failed: %s", func, i + 1, e)
if datetime.now() >= next_status:
log.info("waiting for %s: %s", name, e)
next_status = datetime.now() + timedelta(seconds=status_interval)
last_exception = e
if show_intermediate_error:
log.info(e)
time.sleep(interval)
continue
return res
raise Exception(f"timed out while waiting for {func}") from last_exception
raise Exception(f"timed out while waiting for {name}") from last_exception


def assert_eq(a, b) -> None:
Expand Down
6 changes: 2 additions & 4 deletions test_runner/logical_repl/test_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,22 @@ def test_clickhouse(remote_pg: RemotePostgres):
"SETTINGS materialized_postgresql_tables_list = 'table1';"
)
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
),
timeout=60,
)
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
conn.commit()
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
),
timeout=60,
)
log.debug("Sleeping before final checking if Neon is still alive")
time.sleep(3)
Expand Down
12 changes: 3 additions & 9 deletions test_runner/logical_repl/test_debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ def test_debezium(debezium):
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "John", "last_name": "Dow", "email": "[email protected]"},
),
show_intermediate_error=True,
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Insert 2 ts_ms: %s", ts_ms)
Expand All @@ -165,28 +163,24 @@ def test_debezium(debezium):
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alex", "last_name": "Row", "email": "[email protected]"},
),
show_intermediate_error=True,
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Update ts_ms: %s", ts_ms)
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alexander"},
),
show_intermediate_error=True,
timeout=60,
)
time.sleep(3)
cur.execute("select 1")
8 changes: 3 additions & 5 deletions test_runner/performance/test_branch_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape:
startup_line = "INFO version: git(-env)?:"

# find the first line of the log file so we can find the next start later
_, first_start = wait_until(5, 1, lambda: env.pageserver.assert_log_contains(startup_line))
_, first_start = wait_until(lambda: env.pageserver.assert_log_contains(startup_line))

# start without gc so we can time compaction with less noise; use shorter
# period for compaction so it starts earlier
Expand All @@ -156,16 +156,14 @@ def patch_default_tenant_config(config):
)

_, second_start = wait_until(
5, 1, lambda: env.pageserver.assert_log_contains(startup_line, first_start)
lambda: env.pageserver.assert_log_contains(startup_line, first_start),
)
env.pageserver.quiesce_tenants()

wait_and_record_startup_metrics(env.pageserver, neon_compare.zenbenchmark, "restart_after")

# wait for compaction to complete, which most likely has already done so multiple times
msg, _ = wait_until(
30,
1,
lambda: env.pageserver.assert_log_contains(
f".*tenant_id={env.initial_tenant}.*: compaction iteration complete.*", second_start
),
Expand Down Expand Up @@ -205,7 +203,7 @@ def metrics_are_filled() -> list[Sample]:
assert len(matching) == len(expected_labels)
return matching

samples = wait_until(10, 1, metrics_are_filled)
samples = wait_until(metrics_are_filled)

for sample in samples:
phase = sample.labels["phase"]
Expand Down
2 changes: 0 additions & 2 deletions test_runner/regress/test_attach_tenant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N
)

wait_until(
50,
0.1,
lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"),
)

Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def assert_broken():

# Wait for enough failures to break the circuit breaker
# This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s
wait_until(60, 1, assert_broken)
wait_until(assert_broken, timeout=60)

# Sleep for a while, during which time we expect that compaction will _not_ be retried
time.sleep(10)
Expand Down
10 changes: 5 additions & 5 deletions test_runner/regress/test_disk_usage_eviction.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def statvfs_called():
pageserver.assert_log_contains(".*running mocked statvfs.*")

# we most likely have already completed multiple runs
wait_until(10, 1, statvfs_called)
wait_until(statvfs_called)


def count_layers_per_tenant(
Expand Down Expand Up @@ -772,14 +772,14 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
)

wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
)

def less_than_max_usage_pct():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert post_eviction_total_size < 0.33 * total_size, "we requested max 33% usage"

wait_until(2, 2, less_than_max_usage_pct)
wait_until(less_than_max_usage_pct, timeout=5)

# Disk usage candidate collection only takes into account active tenants.
# However, the statvfs call takes into account the entire tenants directory,
Expand Down Expand Up @@ -825,7 +825,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
)

wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved"),
)

def more_than_min_avail_bytes_freed():
Expand All @@ -834,7 +834,7 @@ def more_than_min_avail_bytes_freed():
total_size - post_eviction_total_size >= min_avail_bytes
), f"we requested at least {min_avail_bytes} worth of free space"

wait_until(2, 2, more_than_min_avail_bytes_freed)
wait_until(more_than_min_avail_bytes_freed, timeout=5)


def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
Expand Down
6 changes: 3 additions & 3 deletions test_runner/regress/test_hot_standby.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Wait until we see that the pgbench_accounts is created + filled on replica *and*
# index is created. Otherwise index creation would conflict with
# read queries and hs feedback won't save us.
wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary))
wait_until(partial(pgbench_accounts_initialized, secondary), timeout=60)

# Test should fail if hs feedback is disabled anyway, but cross
# check that walproposer sets some xmin.
Expand All @@ -269,7 +269,7 @@ def xmin_is_not_null():
log.info(f"xmin is {slot_xmin}")
assert int(slot_xmin) > 0

wait_until(10, 1.0, xmin_is_not_null)
wait_until(xmin_is_not_null)
for _ in range(1, 5):
# in debug mode takes about 5-7s
balance = secondary.safe_psql_scalar("select sum(abalance) from pgbench_accounts")
Expand All @@ -286,7 +286,7 @@ def xmin_is_null():
log.info(f"xmin is {slot_xmin}")
assert slot_xmin is None

wait_until(10, 1.0, xmin_is_null)
wait_until(xmin_is_null)


# Test race condition between WAL replay and backends performing queries
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_layers_from_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def future_layer_is_gone_from_index_part():
future_layers = set(get_future_layers())
assert future_layer not in future_layers

wait_until(10, 0.5, future_layer_is_gone_from_index_part)
wait_until(future_layer_is_gone_from_index_part)

# We already make deletion stuck here, but we don't necessarily hit the failpoint
# because deletions are batched.
Expand Down
Loading

1 comment on commit 5330122

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7144 tests run: 6825 passed, 1 failed, 318 skipped (full report)


Failures on Postgres 16

  • test_sharded_ingest[github-actions-selfhosted-vanilla-1]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharded_ingest[release-pg16-github-actions-selfhosted-vanilla-1]"
Flaky tests (2)

Postgres 15

Postgres 14

Code coverage* (full report)

  • functions: 30.4% (8272 of 27226 functions)
  • lines: 47.8% (65236 of 136512 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
5330122 at 2024-12-02T12:50:41.591Z :recycle:

Please sign in to comment.