diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 91d7e3446e3d..050c09c1e549 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -149,12 +149,16 @@ def test_subscriber_lag( check_pgbench_still_running(pub_workload, "pub") check_pgbench_still_running(sub_workload, "sub") - with ( - psycopg2.connect(pub_connstr) as pub_conn, - psycopg2.connect(sub_connstr) as sub_conn, - ): - with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur: - lag = measure_logical_replication_lag(sub_cur, pub_cur) + pub_conn = psycopg2.connect(pub_connstr) + sub_conn = psycopg2.connect(sub_connstr) + pub_conn.autocommit = True + sub_conn.autocommit = True + + with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur: + lag = measure_logical_replication_lag(sub_cur, pub_cur) + + pub_conn.close() + sub_conn.close() log.info(f"Replica lagged behind master by {lag} seconds") zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER) @@ -206,6 +210,7 @@ def test_publisher_restart( sub_conn = psycopg2.connect(sub_connstr) pub_conn.autocommit = True sub_conn.autocommit = True + with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur: pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'") pub_exists = len(pub_cur.fetchall()) != 0 @@ -222,6 +227,7 @@ def test_publisher_restart( sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1") initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur) + pub_conn.close() sub_conn.close() @@ -248,12 +254,17 @@ def test_publisher_restart( ["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env, ) - with ( - psycopg2.connect(pub_connstr) as pub_conn, - psycopg2.connect(sub_connstr) as sub_conn, - ): - with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur: - lag = measure_logical_replication_lag(sub_cur, pub_cur) + + pub_conn = psycopg2.connect(pub_connstr) + sub_conn = psycopg2.connect(sub_connstr) + pub_conn.autocommit = True + sub_conn.autocommit = True + + with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur: + lag = measure_logical_replication_lag(sub_cur, pub_cur) + + pub_conn.close() + sub_conn.close() log.info(f"Replica lagged behind master by {lag} seconds") zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER) @@ -288,58 +299,56 @@ def test_snap_files( env = benchmark_project_pub.pgbench_env connstr = benchmark_project_pub.connstr - with psycopg2.connect(connstr) as conn: - conn.autocommit = True - with conn.cursor() as cur: - cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") - is_super = cast("bool", cur.fetchall()[0][0]) - assert is_super, "This benchmark won't work if we don't have superuser" + conn = psycopg2.connect(connstr) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") + is_super = cast("bool", cur.fetchall()[0][0]) + assert is_super, "This benchmark won't work if we don't have superuser" + + conn.close() pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=env) conn = psycopg2.connect(connstr) conn.autocommit = True - cur = conn.cursor() - cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1") - - with psycopg2.connect(connstr) as conn: - conn.autocommit = True - with conn.cursor() as cur: - cur.execute("SELECT pg_reload_conf()") - - with psycopg2.connect(connstr) as conn: - conn.autocommit = True - with conn.cursor() as cur: - cur.execute( - """ - DO $$ - BEGIN - IF EXISTS ( - SELECT 1 - FROM pg_replication_slots - WHERE slot_name = 'slotter' - ) THEN - PERFORM pg_drop_replication_slot('slotter'); - END IF; - END $$; + + with conn.cursor() as cur: + cur.execute( """ - ) - cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = 'slotter' + ) THEN + PERFORM pg_drop_replication_slot('slotter'); + END IF; + END $$; + """ + ) + cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + + conn.close() workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) try: start = time.time() prev_measurement = time.time() while time.time() - start < test_duration_min * 60: - with psycopg2.connect(connstr) as conn: - with conn.cursor() as cur: - cur.execute( - "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" - ) - check_pgbench_still_running(workload) - cur.execute( - "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" - ) + conn = psycopg2.connect(connstr) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())") + + conn.close() # Measure storage if time.time() - prev_measurement > test_interval_min * 60: diff --git a/test_runner/performance/test_physical_replication.py b/test_runner/performance/test_physical_replication.py index 8b368977df04..d56f6dce09b0 100644 --- a/test_runner/performance/test_physical_replication.py +++ b/test_runner/performance/test_physical_replication.py @@ -102,15 +102,21 @@ def test_ro_replica_lag( check_pgbench_still_running(master_workload) check_pgbench_still_running(replica_workload) time.sleep(sync_interval_min * 60) + + conn_master = psycopg2.connect(master_connstr) + conn_replica = psycopg2.connect(replica_connstr) + conn_master.autocommit = True + conn_replica.autocommit = True + with ( - psycopg2.connect(master_connstr) as conn_master, - psycopg2.connect(replica_connstr) as conn_replica, + conn_master.cursor() as cur_master, + conn_replica.cursor() as cur_replica, ): - with ( - conn_master.cursor() as cur_master, - conn_replica.cursor() as cur_replica, - ): - lag = measure_replication_lag(cur_master, cur_replica) + lag = measure_replication_lag(cur_master, cur_replica) + + conn_master.close() + conn_replica.close() + log.info(f"Replica lagged behind master by {lag} seconds") zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER) finally: @@ -219,11 +225,15 @@ def test_replication_start_stop( pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10"], env=master_env) # Sync replicas - with psycopg2.connect(master_connstr) as conn_master: - with conn_master.cursor() as cur_master: - for i in range(num_replicas): - conn_replica = psycopg2.connect(replica_connstr[i]) - measure_replication_lag(cur_master, conn_replica.cursor()) + conn_master = psycopg2.connect(master_connstr) + conn_master.autocommit = True + + with conn_master.cursor() as cur_master: + for i in range(num_replicas): + conn_replica = psycopg2.connect(replica_connstr[i]) + measure_replication_lag(cur_master, conn_replica.cursor()) + + conn_master.close() master_pgbench = pg_bin.run_nonblocking( [ @@ -277,17 +287,22 @@ def replica_enabled(iconfig: int = iconfig): time.sleep(configuration_test_time_sec) - with psycopg2.connect(master_connstr) as conn_master: - with conn_master.cursor() as cur_master: - for ireplica in range(num_replicas): - replica_conn = psycopg2.connect(replica_connstr[ireplica]) - lag = measure_replication_lag(cur_master, replica_conn.cursor()) - zenbenchmark.record( - f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER - ) - log.info( - f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}" - ) + conn_master = psycopg2.connect(master_connstr) + conn_master.autocommit = True + + with conn_master.cursor() as cur_master: + for ireplica in range(num_replicas): + replica_conn = psycopg2.connect(replica_connstr[ireplica]) + lag = measure_replication_lag(cur_master, replica_conn.cursor()) + zenbenchmark.record( + f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER + ) + log.info( + f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}" + ) + + conn_master.close() + master_pgbench.terminate() except Exception as e: error_occurred = True