Skip to content

Commit

Permalink
tests: add optional cursor to log_contains + fix truthiness issues …
Browse files Browse the repository at this point in the history
…in callers (#6960)

Extracted from #6953

Part of #5899

Core Change
-----------

In #6953, we need the ability to scan the log _after_ a specific line
and ignore anything before that line.

This PR changes `log_contains` to returns a tuple of `(matching line,
cursor)`.
Hand that cursor to a subsequent `log_contains` call to search the log
for the next occurrence of the pattern.

Other Changes
-------------

- Inspect all the callsites of `log_contains` to handle the new tuple
return type.
- Above inspection unveiled many callers aren't using `assert
log_contains(...) is not None` but some weaker version of the code that
breaks if `log_contains` ever returns a not-None but falsy value. Fix
that.
- Above changes unveiled that `test_remote_storage_upload_queue_retries`
was using `wait_until` incorrectly; after fixing the usage, I had to
raise the `wait_until` timeout. So, maybe this will fix its flakiness.
  • Loading branch information
problame authored Mar 1, 2024
1 parent ee93700 commit e9e77ee
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 74 deletions.
27 changes: 24 additions & 3 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,6 +2180,11 @@ def __exit__(
self.stop(immediate=True)


@dataclass
class LogCursor:
_line_no: int


class NeonPageserver(PgProtocol):
"""
An object representing a running pageserver.
Expand Down Expand Up @@ -2343,7 +2348,18 @@ def assert_no_metric_errors(self):
value = self.http_client().get_metric_value(metric)
assert value == 0, f"Nonzero {metric} == {value}"

def log_contains(self, pattern: str) -> Optional[str]:
def assert_log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Tuple[str, LogCursor]:
"""Convenient for use inside wait_until()"""

res = self.log_contains(pattern, offset=offset)
assert res is not None
return res

def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Optional[Tuple[str, LogCursor]]:
"""Check that the pageserver log contains a line that matches the given regex"""
logfile = self.workdir / "pageserver.log"
if not logfile.exists():
Expand All @@ -2357,12 +2373,17 @@ def log_contains(self, pattern: str) -> Optional[str]:
# no guarantee it is already present in the log file. This hasn't
# been a problem in practice, our python tests are not fast enough
# to hit that race condition.
skip_until_line_no = 0 if offset is None else offset._line_no
cur_line_no = 0
with logfile.open("r") as f:
for line in f:
if cur_line_no < skip_until_line_no:
cur_line_no += 1
continue
if contains_re.search(line):
# found it!
return line

cur_line_no += 1
return (line, LogCursor(cur_line_no))
return None

def tenant_attach(
Expand Down
6 changes: 3 additions & 3 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def assert_tenant_state(
tenant: TenantId,
expected_state: str,
message: Optional[str] = None,
):
) -> None:
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
Expand Down Expand Up @@ -292,7 +292,7 @@ def timeline_delete_wait_completed(
iterations: int = 20,
interval: Optional[float] = None,
**delete_args,
):
) -> None:
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval)

Expand All @@ -302,7 +302,7 @@ def assert_prefix_empty(
remote_storage: Optional[RemoteStorage],
prefix: Optional[str] = None,
allowed_postfix: Optional[str] = None,
):
) -> None:
assert remote_storage is not None
response = list_prefix(remote_storage, prefix)
keys = response["KeyCount"]
Expand Down
19 changes: 18 additions & 1 deletion test_runner/fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,12 @@ def start_in_background(
return spawned_process


def wait_until(number_of_iterations: int, interval: float, func: Fn):
WaitUntilRet = TypeVar("WaitUntilRet")


def wait_until(
number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet]
) -> WaitUntilRet:
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
Expand All @@ -387,6 +392,18 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
raise Exception("timed out while waiting for %s" % func) from last_exception


def assert_eq(a, b) -> None:
assert a == b


def assert_gt(a, b) -> None:
assert a > b


def assert_ge(a, b) -> None:
assert a >= b


def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
"""
Fast way to populate data.
Expand Down
9 changes: 5 additions & 4 deletions test_runner/regress/test_attach_tenant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N
]
)

def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")

wait_until(50, 0.1, log_contains_bad_request)
wait_until(
50,
0.1,
lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"),
)


def test_null_body(negative_env: NegativeTests):
Expand Down
20 changes: 9 additions & 11 deletions test_runner/regress/test_disk_usage_eviction.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def pageserver_start_with_disk_usage_eviction(
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)

def statvfs_called():
assert pageserver.log_contains(".*running mocked statvfs.*")
pageserver.assert_log_contains(".*running mocked statvfs.*")

# we most likely have already completed multiple runs
wait_until(10, 1, statvfs_called)
Expand Down Expand Up @@ -533,7 +533,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
assert actual_change >= target, "eviction must always evict more than target"

time.sleep(1) # give log time to flush
assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE)
env.neon_env.pageserver.assert_log_contains(GLOBAL_LRU_LOG_LINE)
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)


Expand Down Expand Up @@ -767,7 +767,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)

assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO")
env.neon_env.pageserver.assert_log_contains(".*statvfs failed.*EIO")
env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO")


Expand Down Expand Up @@ -801,10 +801,9 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)

def relieved_log_message():
assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved")

wait_until(10, 1, relieved_log_message)
wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
)

def less_than_max_usage_pct():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
Expand Down Expand Up @@ -845,10 +844,9 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)

def relieved_log_message():
assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved")

wait_until(10, 1, relieved_log_message)
wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
)

def more_than_min_avail_bytes_freed():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_duplicate_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])

time.sleep(10) # let compaction to be performed
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
env.pageserver.assert_log_contains("compact-level0-phase1-return-same")


def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
Expand Down
11 changes: 7 additions & 4 deletions test_runner/regress/test_layers_from_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,13 @@ def future_layer_is_gone_from_index_part():

# NB: the layer file is unlinked index part now, but, because we made the delete
# operation stuck, the layer file itself is still in the remote_storage
def delete_at_pause_point():
assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}")

wait_until(10, 0.5, delete_at_pause_point)
wait_until(
10,
0.5,
lambda: env.pageserver.assert_log_contains(
f".*{tenant_id}.*at failpoint.*{failpoint_name}"
),
)
future_layer_path = env.pageserver_remote_storage.remote_layer_path(
tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach
)
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
def assert_logged():
if not log_expected:
return
assert env.pageserver.log_contains(f".*{msg_id}.*")
env.pageserver.assert_log_contains(f".*{msg_id}.*")

wait_until(10, 0.5, assert_logged)

Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_pageserver_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def assert_header_written():

main_pageserver.start()

def assert_deletions_submitted(n: int):
def assert_deletions_submitted(n: int) -> None:
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n

# After restart, issue a flush to kick the deletion frontend to do recovery.
Expand Down
52 changes: 28 additions & 24 deletions test_runner/regress/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import print_gc_result, query_scalar, wait_until
from fixtures.utils import (
assert_eq,
assert_ge,
assert_gt,
print_gc_result,
query_scalar,
wait_until,
)
from requests import ReadTimeout


Expand Down Expand Up @@ -120,10 +127,10 @@ def test_remote_storage_backup_and_restore(
log.info(f"upload of checkpoint {checkpoint_number} is done")

# Check that we had to retry the uploads
assert env.pageserver.log_contains(
env.pageserver.assert_log_contains(
".*failed to perform remote task UploadLayer.*, will retry.*"
)
assert env.pageserver.log_contains(
env.pageserver.assert_log_contains(
".*failed to perform remote task UploadMetadata.*, will retry.*"
)

Expand Down Expand Up @@ -292,9 +299,9 @@ def get_queued_count(file_kind, op_kind):
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0

wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0))
wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0))
wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0))

# let all future operations queue up
configure_storage_sync_failpoints("return")
Expand Down Expand Up @@ -322,17 +329,17 @@ def churn_while_failpoints_active(result):
churn_while_failpoints_active_thread.start()

# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0)
wait_until(10, 0.5, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0))
wait_until(10, 0.5, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2))
wait_until(10, 0.5, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0))

# unblock churn operations
configure_storage_sync_failpoints("off")

# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0))
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0))
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0))

# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
Expand Down Expand Up @@ -884,26 +891,23 @@ def wait_upload_queue_empty(
wait_until(
2,
1,
lambda: get_queued_count(
client, tenant_id, timeline_id, file_kind="layer", op_kind="upload"
)
== 0,
lambda: assert_eq(
get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="upload"), 0
),
)
wait_until(
2,
1,
lambda: get_queued_count(
client, tenant_id, timeline_id, file_kind="index", op_kind="upload"
)
== 0,
lambda: assert_eq(
get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload"), 0
),
)
wait_until(
2,
1,
lambda: get_queued_count(
client, tenant_id, timeline_id, file_kind="layer", op_kind="delete"
)
== 0,
lambda: assert_eq(
get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="delete"), 0
),
)


Expand Down
4 changes: 2 additions & 2 deletions test_runner/regress/test_sharding_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_sharding_service_smoke(
# Marking a pageserver offline should migrate tenants away from it.
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})

def node_evacuated(node_id: int):
def node_evacuated(node_id: int) -> None:
counts = get_node_shard_counts(env, tenant_ids)
assert counts[node_id] == 0

Expand Down Expand Up @@ -405,7 +405,7 @@ def handler(request: Request):

env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})

def node_evacuated(node_id: int):
def node_evacuated(node_id: int) -> None:
counts = get_node_shard_counts(env, [env.initial_tenant])
assert counts[node_id] == 0

Expand Down
12 changes: 6 additions & 6 deletions test_runner/regress/test_tenant_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ def delete_tenant():
return ps_http.tenant_delete(tenant_id)

def hit_remove_failpoint():
assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}")
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}")

def hit_run_failpoint():
assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}")
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}")

with concurrent.futures.ThreadPoolExecutor() as executor:
background_200_req = executor.submit(delete_tenant)
Expand Down Expand Up @@ -612,12 +612,12 @@ def timeline_create():
Thread(target=timeline_create).start()

def hit_initdb_upload_failpoint():
assert env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")

wait_until(100, 0.1, hit_initdb_upload_failpoint)

def creation_connection_timed_out():
assert env.pageserver.log_contains(
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)

Expand All @@ -636,7 +636,7 @@ def tenant_delete_inner():
Thread(target=tenant_delete).start()

def deletion_arrived():
assert env.pageserver.log_contains(
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)

Expand All @@ -663,7 +663,7 @@ def deletion_arrived():
)

# Ensure that creation cancelled and deletion didn't end up in broken state or encountered the leftover temp file
assert env.pageserver.log_contains(CANCELLED_ERROR)
env.pageserver.assert_log_contains(CANCELLED_ERROR)
assert not env.pageserver.log_contains(
".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion"
)
Expand Down
Loading

1 comment on commit e9e77ee

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2525 tests run: 2389 passed, 1 failed, 135 skipped (full report)


Failures on Postgres 14

  • test_heavy_write_workload[neon_on-github-actions-selfhosted-10-5-5]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_heavy_write_workload[neon_on-release-pg14-github-actions-selfhosted-10-5-5]"
Flaky tests (4)

Postgres 16

  • test_crafted_wal_end[last_wal_record_crossing_segment]: debug

Postgres 15

  • test_secondary_downloads: debug
  • test_remote_storage_upload_queue_retries: debug

Postgres 14

  • test_delete_timeline_client_hangup: release

Code coverage* (full report)

  • functions: 28.8% (6930 of 24084 functions)
  • lines: 47.4% (42555 of 89817 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
e9e77ee at 2024-03-01T10:43:31.653Z :recycle:

Please sign in to comment.