Skip to content

Commit

Permalink
Simplify test_wal_page_boundary_start test (#8214)
Browse files Browse the repository at this point in the history
All the code to ensure the WAL record lands at a page boundary was
unnecessary for reproducing the original problem. In fact, it's a pretty
basic test that checks that outbound replication (= neon as publisher)
still works after restarting the endpoint. It just used to be very
broken before commit 5ceccdc, which also added this test.

To verify that:

1. Check out commit f3af5f4 (because the next commit, 7dd58e1,
fixed the same bug in a different way, making it infeasible to revert
the bug fix in an easy way)
2. Revert the bug fix from commit 5ceccdc with this:

```
diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c
index 7debb6325..9f03bbd99 100644
--- a/pgxn/neon/walproposer_pg.c
+++ b/pgxn/neon/walproposer_pg.c
@@ -1437,8 +1437,10 @@ XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
 	 *
 	 * #5749
 	 */
+#if 0
 	if (!wp->config->syncSafekeepers)
 		XLogUpdateWalBuffers(buf, recptr, nbytes);
+#endif

 	while (nbytes > 0)
 	{
```

3. Run the test_wal_page_boundary_start regression test. It fails, as
expected

4. Apply this commit to the test, and run it again. It still fails, with
the same error mentioned in issue #5749:

```
PG:2024-06-30 20:49:08.805 GMT [1248196] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  starting logical decoding for slot "sub1"
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  Streaming transactions committing after 0/1532330, reading WAL from 0/1531C78.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  logical decoding found consistent point at 0/1531C78
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  There are no running transactions.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.568 GMT [1467972] ERROR:  could not find record while sending logically-decoded data: invalid contrecord length 312 (expected 6) at 0/1533FD8
```
  • Loading branch information
hlinnaka authored and VladLazar committed Jul 8, 2024
1 parent 2cc73ba commit a3728bd
Showing 1 changed file with 9 additions and 51 deletions.
60 changes: 9 additions & 51 deletions test_runner/regress/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from string import ascii_lowercase

import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
Expand All @@ -13,7 +12,7 @@
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.utils import query_scalar, wait_until
from fixtures.utils import wait_until


def random_string(n: int):
Expand Down Expand Up @@ -326,12 +325,17 @@ def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
assert "could not receive data from WAL stream" not in logs


# Test compute start at LSN page of which starts with contrecord
# https://github.com/neondatabase/neon/issues/5749
# Test replication of WAL record spanning page boundary (with contrecord) after
# compute restart and WAL write of the page.
#
# See https://github.com/neondatabase/neon/issues/5749
#
# Most pages start with a contrecord, so we don't do anything special
# to ensure that.
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env

env.neon_cli.create_branch("init")
Expand All @@ -356,52 +360,6 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()

with endpoint.cursor() as cur:
# measure how much space logical message takes. Sometimes first attempt
# creates huge message and then it stabilizes, have no idea why.
for _ in range(3):
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
# Non-transactional logical message doesn't write WAL, only XLogInsert's
# it, so use transactional. Which is a bit problematic as transactional
# necessitates commit record. Alternatively we can do smth like
# select neon_xlogflush(pg_current_wal_insert_lsn());
# but isn't much better + that particular call complains on 'xlog flush
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
# page headers.
payload = "blahblah"
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
log.info(
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
)

# and write logical message spanning exactly as we want
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
offs = int(curr_lsn) % 8192
till_page = 8192 - offs
payload_len = (
till_page - logical_message_base - 8
) # not sure why 8 is here, it is deduced from experiments
log.info(f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}")

# payload_len above would go exactly till the page boundary; but we want contrecord, so make it slightly longer
payload_len += 8

cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
# The calculations to hit the page boundary are very fuzzy, so just
# ignore test if we fail to reach it.
if not (int(supposedly_contrecord_end) % 8192 == 32):
pytest.skip("missed page boundary, bad luck")

cur.execute("insert into replication_example values (2, 3)")

wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop().start()

Expand Down

0 comments on commit a3728bd

Please sign in to comment.