Skip to content

Commit

Permalink
refactor: globals in tests (#5298)
Browse files Browse the repository at this point in the history
Refactor tests to have less globals.

This will allow to hopefully write more complex tests for our new metric
collection requirements in #5297. Includes reverted work from #4761
related to test globals.

Co-authored-by: Alexander Bayandin <[email protected]>
Co-authored-by: MMeent <[email protected]>
  • Loading branch information
3 people authored Sep 13, 2023
1 parent 1697e7b commit ffd146c
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 172 deletions.
1 change: 1 addition & 0 deletions test_runner/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
Expand Down
45 changes: 45 additions & 0 deletions test_runner/fixtures/httpserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Tuple

import pytest
from pytest_httpserver import HTTPServer

# TODO: mypy fails with:
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
# from fixtures.neon_fixtures import PortDistributor

# compared to the fixtures from pytest_httpserver with same names, these are
# always function scoped, so you can check and stop the server in tests.


@pytest.fixture(scope="function")
def httpserver_ssl_context():
return None


@pytest.fixture(scope="function")
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
host, port = httpserver_listen_address
if not host:
host = HTTPServer.DEFAULT_LISTEN_HOST
if not port:
port = HTTPServer.DEFAULT_LISTEN_PORT

server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context)
server.start()
yield server
server.clear()
if server.is_running():
server.stop()


@pytest.fixture(scope="function")
def httpserver(make_httpserver):
server = make_httpserver
yield server
server.clear()


@pytest.fixture(scope="function")
def httpserver_listen_address(port_distributor) -> Tuple[str, int]:
port = port_distributor.get_port()
return ("localhost", port)
6 changes: 0 additions & 6 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)


@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)


@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
Expand Down
19 changes: 11 additions & 8 deletions test_runner/regress/test_ddl_forwarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
raise ValueError("Invalid op")


fail = False


def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
def ddl_forward_handler(
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
if ddl.fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
Expand All @@ -72,6 +71,7 @@ def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: st
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
self.fail = False
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
Expand All @@ -82,7 +82,7 @@ def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: st
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)

def __enter__(self):
Expand All @@ -103,6 +103,9 @@ def send(self, query: str) -> List[Tuple[Any, ...]]:
def wait(self, timeout=3):
self.server.wait(timeout=timeout)

def failures(self, bool):
self.fail = bool

def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
Expand Down Expand Up @@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
assert ddl.dbs == {"stork": "cork"}

with pytest.raises(psycopg2.InternalError):
global fail
fail = True
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()

ddl.failures(False)
conn.close()
67 changes: 34 additions & 33 deletions test_runner/regress/test_gc_aggressive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,45 @@

# Test configuration
#
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
# UPDATEs on it, using {num_connections} separate connections.
num_connections = 10
num_rows = 100000
updates_to_perform = 10000
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000

updates_performed = 0

# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
updates_performed = 0

# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await endpoint.connect_async()

while updates_performed < updates_to_perform:
updates_performed += 1
id = random.randrange(1, num_rows)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM

loop = asyncio.get_running_loop()

# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)

loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)

def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed

with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")


# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for _ in range(num_connections):
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))

Expand Down Expand Up @@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
Expand All @@ -91,14 +91,15 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)


#
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
num_index_uploads = 0

neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)

Expand Down Expand Up @@ -160,5 +161,5 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")

after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
log.info(f"{after - before} new index uploads during test")
assert after - before < 5
Loading

1 comment on commit ffd146c

@github-actions
Copy link

@github-actions github-actions bot commented on ffd146c Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2540 tests run: 2414 passed, 0 failed, 126 skipped (full report)


Flaky tests (2)

Postgres 16

  • test_partial_evict_tenant: debug

Postgres 14

  • test_download_remote_layers_api[local_fs]: debug

Code coverage (full report)

  • functions: 53.0% (7667 of 14453 functions)
  • lines: 81.0% (44773 of 55281 lines)

The comment gets automatically updated with the latest test results
ffd146c at 2023-09-14T09:56:26.449Z :recycle:

Please sign in to comment.