Skip to content

Commit

Permalink
Add logical replication test to exercise snapfiles (#8364)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sasha Krassovsky authored Aug 15, 2024
1 parent 69cb1ee commit df086cd
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions test_runner/performance/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,85 @@ def test_publisher_restart(
sub_workload.terminate()
finally:
pub_workload.terminate()


@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_snap_files(
pg_bin: PgBin,
benchmark_project_pub: NeonApiEndpoint,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a node with a replication slot. Generates pgbench into the replication slot,
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
the node and tries to peek the replication changes.
"""
test_duration_min = 60
test_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"

env = benchmark_project_pub.pgbench_env
connstr = benchmark_project_pub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env)

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cur.fetchall()[0]
assert is_super, "This benchmark won't work if we don't have superuser"

conn = psycopg2.connect(connstr)
conn.autocommit = True
cur = conn.cursor()
cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1")

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_reload_conf()")

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")

workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
with psycopg2.connect(connstr) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)

finally:
workload.terminate()

1 comment on commit df086cd

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2252 tests run: 2173 passed, 0 failed, 79 skipped (full report)


Code coverage* (full report)

  • functions: 32.4% (7219 of 22266 functions)
  • lines: 50.4% (58284 of 115732 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
df086cd at 2024-08-16T00:25:05.446Z :recycle:

Please sign in to comment.