Skip to content

Commit

Permalink
Improve typing related to regress/test_logical_replication.py (#9725)
Browse files Browse the repository at this point in the history
Signed-off-by: Tristan Partin <[email protected]>
  • Loading branch information
tristan957 authored Nov 11, 2024
1 parent b018bc7 commit 5be6b07
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
4 changes: 2 additions & 2 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def safe_psql(self, query: str, **kwargs: Any) -> list[tuple[Any, ...]]:
return self.safe_psql_many([query], **kwargs)[0]

def safe_psql_many(
self, queries: Iterable[str], log_query=True, **kwargs: Any
self, queries: Iterable[str], log_query: bool = True, **kwargs: Any
) -> list[list[tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
Expand All @@ -306,7 +306,7 @@ def safe_psql_many(
result.append(cur.fetchall())
return result

def safe_psql_scalar(self, query, log_query=True) -> Any:
def safe_psql_scalar(self, query: str, log_query: bool = True) -> Any:
"""
Execute query returning single row with single column.
"""
Expand Down
50 changes: 31 additions & 19 deletions test_runner/regress/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
from functools import partial
from random import choice
from string import ascii_lowercase
from typing import TYPE_CHECKING, cast

from fixtures.common_types import Lsn
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgProtocol,
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.utils import wait_until

if TYPE_CHECKING:
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PgProtocol,
VanillaPostgres,
)


def random_string(n: int):
return "".join([choice(ascii_lowercase) for _ in range(n)])


def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
env = neon_simple_env

tenant_id = env.initial_tenant
Expand Down Expand Up @@ -160,10 +167,10 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):


# Test that neon.logical_replication_max_snap_files works
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
def slot_removed(ep):
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
def slot_removed(ep: Endpoint):
assert (
endpoint.safe_psql(
ep.safe_psql(
"select count(*) from pg_replication_slots where slot_name = 'stale_slot'"
)[0][0]
== 0
Expand Down Expand Up @@ -254,7 +261,7 @@ def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonE


# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

Expand Down Expand Up @@ -336,13 +343,13 @@ def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
#
# Most pages start with a contrecord, so we don't do anything special
# to ensure that.
def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
env = neon_simple_env

env.create_branch("init")
endpoint = env.endpoints.create_start("init")
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = TenantId(cast("str", endpoint.safe_psql("show neon.tenant_id")[0][0]))
timeline_id = TimelineId(cast("str", endpoint.safe_psql("show neon.timeline_id")[0][0]))

cur = endpoint.connect().cursor()
cur.execute("create table t(key int, value text)")
Expand Down Expand Up @@ -380,7 +387,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
# logical replication bug as such, but without logical replication,
# records passed ot the WAL redo process are never large enough to hit
# the bug.
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
env = neon_simple_env

env.create_branch("init")
Expand Down Expand Up @@ -522,15 +529,20 @@ def logical_replication_wait_flush_lsn_sync(publisher: PgProtocol) -> Lsn:
because for some WAL records like vacuum subscriber won't get any data at
all.
"""
publisher_flush_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
publisher_flush_lsn = Lsn(
cast("str", publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
)

def check_caughtup():
res = publisher.safe_psql(
"""
res = cast(
"tuple[str, str, str]",
publisher.safe_psql(
"""
select sent_lsn, flush_lsn, pg_current_wal_flush_lsn() from pg_stat_replication sr, pg_replication_slots s
where s.active_pid = sr.pid and s.slot_type = 'logical';
"""
)[0]
)[0],
)
sent_lsn, flush_lsn, curr_publisher_flush_lsn = Lsn(res[0]), Lsn(res[1]), Lsn(res[2])
log.info(
f"sent_lsn={sent_lsn}, flush_lsn={flush_lsn}, publisher_flush_lsn={curr_publisher_flush_lsn}, waiting flush_lsn to reach {publisher_flush_lsn}"
Expand All @@ -545,7 +557,7 @@ def check_caughtup():
# flush_lsn reporting to publisher. Without this, subscriber may ack too far,
# losing data on restart because publisher implicitly advances positition given
# in START_REPLICATION to the confirmed_flush_lsn of the slot.
def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg):
def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
env = neon_simple_env
# use vanilla as publisher to allow writes on it when safekeeper is down
vanilla_pg.configure(
Expand Down Expand Up @@ -593,7 +605,7 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg):
# logical_replication_wait_flush_lsn_sync is expected to hang while
# safekeeper is down.
vanilla_pg.safe_psql("checkpoint;")
assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000
assert cast("int", sub.safe_psql_scalar("SELECT count(*) FROM t")) == 1000

# restart subscriber and ensure it can catch up lost tail again
sub.stop(mode="immediate")
Expand Down

1 comment on commit 5be6b07

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5473 tests run: 5240 passed, 3 failed, 230 skipped (full report)


Failures on Postgres 16

  • test_sharded_ingest[github-actions-selfhosted-1]: release-x86-64
  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
  • test_compaction_l0_memory[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharded_ingest[release-pg16-github-actions-selfhosted-1] or test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted] or test_compaction_l0_memory[release-pg16-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 31.7% (7880 of 24824 functions)
  • lines: 49.4% (62405 of 126225 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
5be6b07 at 2024-11-12T01:24:22.543Z :recycle:

Please sign in to comment.