Skip to content

Commit

Permalink
test_runner: remove globals
Browse files Browse the repository at this point in the history
This commit reverts 0c68972

Co-authored-by: Matthias <[email protected]>
  • Loading branch information
bayandin and MMeent committed Sep 13, 2023
1 parent 1697e7b commit 3379161
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 148 deletions.
19 changes: 11 additions & 8 deletions test_runner/regress/test_ddl_forwarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
raise ValueError("Invalid op")


fail = False


def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
def ddl_forward_handler(
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
if ddl.fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
Expand All @@ -72,6 +71,7 @@ def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: st
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
self.fail = False
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
Expand All @@ -82,7 +82,7 @@ def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: st
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)

def __enter__(self):
Expand All @@ -103,6 +103,9 @@ def send(self, query: str) -> List[Tuple[Any, ...]]:
def wait(self, timeout=3):
self.server.wait(timeout=timeout)

def failures(self, bool):
self.fail = bool

def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
Expand Down Expand Up @@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
assert ddl.dbs == {"stork": "cork"}

with pytest.raises(psycopg2.InternalError):
global fail
fail = True
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()

ddl.failures(False)
conn.close()
67 changes: 34 additions & 33 deletions test_runner/regress/test_gc_aggressive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,45 @@

# Test configuration
#
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
# UPDATEs on it, using {num_connections} separate connections.
num_connections = 10
num_rows = 100000
updates_to_perform = 10000
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000

updates_performed = 0

# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
updates_performed = 0

# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await endpoint.connect_async()

while updates_performed < updates_to_perform:
updates_performed += 1
id = random.randrange(1, num_rows)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM

loop = asyncio.get_running_loop()

# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)

loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)

def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed

with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")


# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for _ in range(num_connections):
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))

Expand Down Expand Up @@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
Expand All @@ -91,14 +91,15 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)


#
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
num_index_uploads = 0

neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)

Expand Down Expand Up @@ -160,5 +161,5 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")

after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
log.info(f"{after - before} new index uploads during test")
assert after - before < 5
88 changes: 41 additions & 47 deletions test_runner/regress/test_metric_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,15 @@
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response


# ==============================================================================
# Storage metrics tests
# ==============================================================================

initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
}

metric_kinds_checked = set([])


#
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
log.info("received events:")
log.info(events)

for event in events:
assert event["tenant_id"] == str(
initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]

check = checks.get(metric_name)
# calm down mypy
if check is not None:
assert check(event["value"]), f"{metric_name} isn't valid"
global metric_kinds_checked
metric_kinds_checked.add(metric_name)

return Response(status=200)


@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
Expand All @@ -81,6 +40,43 @@ def test_metric_collection(
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

metric_kinds_checked = set([])
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
}

#
# verify that metrics look minimally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
log.info("received events:")
log.info(events)

for event in events:
assert event["tenant_id"] == str(
neon_env_builder.initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]

check = checks.get(metric_name)
# calm down mypy
if check is not None:
assert check(event["value"]), f"{metric_name} isn't valid"
metric_kinds_checked.add(metric_name)

return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
Expand All @@ -98,9 +94,6 @@ def test_metric_collection(

log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")

# Set initial tenant of the test, that we expect the logs from
global initial_tenant
initial_tenant = neon_env_builder.initial_tenant
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
Expand Down Expand Up @@ -147,14 +140,15 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
global remote_uploaded

remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
else:
assert remote_uploaded == 0

# wait longer than collecting interval and check that all requests are served
time.sleep(3)
httpserver.check()
global metric_kinds_checked, checks
expected_checks = set(checks.keys())
assert len(metric_kinds_checked) == len(
checks
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_ondemand_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def get_resident_physical_size():
# they are present only in the remote storage, only locally, or both.
# It should not change.
assert filled_current_physical == get_api_current_physical_size()
endpoint_old.stop()


#
Expand Down
Loading

0 comments on commit 3379161

Please sign in to comment.