Skip to content

Commit

Permalink
Add test_lagging_sk.
Browse files Browse the repository at this point in the history
  • Loading branch information
arssher committed Dec 8, 2023
1 parent b077ec1 commit 21464d9
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 50 deletions.
23 changes: 23 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[A
result.append(cur.fetchall())
return result

def safe_psql_scalar(self, query) -> Any:
"""
Execute query returning single row with single column.
"""
return self.safe_psql(query)[0][0]


@dataclass
class AuthKeys:
Expand Down Expand Up @@ -2681,6 +2687,13 @@ def __exit__(
):
self.stop()

# Checkpoints running endpoint and returns pg_wal size in MB.
def get_pg_wal_size(self):
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
self.safe_psql("checkpoint")
assert self.pgdata_dir is not None # please mypy
return get_dir_size(os.path.join(self.pgdata_dir, "pg_wal")) / 1024 / 1024


class EndpointFactory:
"""An object representing multiple compute endpoints."""
Expand Down Expand Up @@ -2877,6 +2890,13 @@ def list_segments(self, tenant_id, timeline_id) -> List[str]:
return segments


# Walreceiver as returned by sk's timeline status endpoint.
@dataclass
class Walreceiver:
conn_id: int
state: str


@dataclass
class SafekeeperTimelineStatus:
acceptor_epoch: int
Expand All @@ -2887,6 +2907,7 @@ class SafekeeperTimelineStatus:
backup_lsn: Lsn
peer_horizon_lsn: Lsn
remote_consistent_lsn: Lsn
walreceivers: List[Walreceiver]


@dataclass
Expand Down Expand Up @@ -2948,6 +2969,7 @@ def timeline_status(
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
return SafekeeperTimelineStatus(
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
Expand All @@ -2957,6 +2979,7 @@ def timeline_status(
backup_lsn=Lsn(resj["backup_lsn"]),
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
walreceivers=walreceivers,
)

def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
Expand Down
Loading

0 comments on commit 21464d9

Please sign in to comment.