Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test_uploads_and_deletions test #7758

Merged
merged 11 commits into from
May 15, 2024
78 changes: 78 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
Expand All @@ -79,6 +80,7 @@
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
)
Expand Down Expand Up @@ -4401,3 +4403,79 @@ def parse_project_git_version_output(s: str) -> str:
return commit

raise ValueError(f"unable to parse --version output: '{s}'")


def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""

if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None

if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None

ps_http = pageserver.http_client()

with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")

gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0

# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
62 changes: 61 additions & 1 deletion test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import enum
import json
import os
from typing import Optional

import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
from fixtures.pageserver.http import PageserverApiException
from fixtures.workload import Workload

AGGRESIVE_COMPACTION_TENANT_CONF = {
Expand Down Expand Up @@ -190,3 +192,61 @@ def test_sharding_compaction(

# Assert that everything is still readable
workload.validate()


class CompactionAlgorithm(str, enum.Enum):
LEGACY = "Legacy"
TIERED = "Tiered"


@pytest.mark.parametrize(
"compaction_algorithm", [CompactionAlgorithm.LEGACY, CompactionAlgorithm.TIERED]
)
def test_uploads_and_deletions(
neon_env_builder: NeonEnvBuilder,
compaction_algorithm: CompactionAlgorithm,
):
"""
:param compaction_algorithm: the compaction algorithm to use.
"""

tenant_conf = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

# TODO remove these allowed errors
# https://github.com/neondatabase/neon/issues/7707
# https://github.com/neondatabase/neon/issues/7759
allowed_errors = [
".*duplicated L1 layer.*",
".*delta layer created with.*duplicate values.*",
".*assertion failed: self.lsn_range.start <= lsn.*",
".*HTTP request handler task panicked: task.*panicked.*",
]
if compaction_algorithm == CompactionAlgorithm.TIERED:
env.pageserver.allowed_errors.extend(allowed_errors)

try:
generate_uploads_and_deletions(env, pageserver=env.pageserver)
except PageserverApiException as e:
log.info(f"Obtained PageserverApiException: {e}")

# The errors occur flakily and no error is ensured to occur,
# however at least one of them occurs.
if compaction_algorithm == CompactionAlgorithm.TIERED:
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
if not found_allowed_error:
raise Exception("None of the allowed_errors occured in the log")
83 changes: 2 additions & 81 deletions test_runner/regress/test_pageserver_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,21 @@
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
S3Scrubber,
flush_ep_to_pageserver,
last_flush_lsn_upload,
generate_uploads_and_deletions,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.utils import print_gc_result, wait_until
from fixtures.utils import wait_until
from fixtures.workload import Workload

# A tenant configuration that is convenient for generating uploads and deletions
Expand All @@ -59,82 +56,6 @@
}


def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""

if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None

if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None

ps_http = pageserver.http_client()

with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)

# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")

gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0

# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)


def read_all(
env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None
):
Expand Down
Loading