diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index abad4b44b802..1bc01d83776d 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -585,6 +585,22 @@ pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_compression_image_in_bytes_total", + "Size of uncompressed data written into image layers" + ) + .expect("failed to define a metric") +}); + +pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_compression_image_out_bytes_total", + "Size of compressed image layer written" + ) + .expect("failed to define a metric") +}); + pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a88a1e642958..74050510b01f 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -737,6 +737,9 @@ struct ImageLayerWriterInner { key_range: Range, lsn: Lsn, + // Total uncompressed bytes passed into put_image + uncompressed_bytes: u64, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, } @@ -792,6 +795,7 @@ impl ImageLayerWriterInner { lsn, tree: tree_builder, blob_writer, + uncompressed_bytes: 0, }; Ok(writer) @@ -810,6 +814,7 @@ impl ImageLayerWriterInner { ) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); let compression = self.conf.image_compression; + self.uncompressed_bytes += img.len() as u64; let (_img, res) = self .blob_writer .write_blob_maybe_compressed(img, ctx, compression) @@ -835,6 +840,11 @@ impl ImageLayerWriterInner { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; + // Calculate compression ratio + let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header + crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes); + crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); + let mut file = self.blob_writer.into_inner(); // Write out the index diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2765ff916e63..70d54e0895eb 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1135,6 +1135,7 @@ def __init__(self, config: NeonEnvBuilder): "listen_http_addr": f"localhost:{pageserver_port.http}", "pg_auth_type": pg_auth_type, "http_auth_type": http_auth_type, + "image_compression": "zstd", } if self.pageserver_virtual_file_io_engine is not None: ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index f321c09b2729..be787e064262 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -6,7 +6,10 @@ import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + generate_uploads_and_deletions, +) from fixtures.pageserver.http import PageserverApiException from fixtures.utils import wait_until from fixtures.workload import Workload @@ -142,6 +145,10 @@ def test_sharding_compaction( "image_layer_creation_check_threshold": 0, } + # Disable compression, as we can't estimate the size of layers with compression enabled + # TODO: implement eager layer cutting during compaction + neon_env_builder.pageserver_config_override = "image_compression='disabled'" + neon_env_builder.num_pageservers = 1 if shard_count is None else shard_count env = neon_env_builder.init_start( initial_tenant_conf=TENANT_CONF, @@ -320,3 +327,87 @@ def assert_broken(): or 0 ) == 0 assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*") + + +@pytest.mark.parametrize("enabled", [True, False]) +def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool): + tenant_conf = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{128 * 1024}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers as eagerly as possible + "image_creation_threshold": "1", + "image_layer_creation_check_threshold": "0", + } + + # Explicitly enable/disable compression, rather than using default + if enabled: + neon_env_builder.pageserver_config_override = "image_compression='zstd'" + else: + neon_env_builder.pageserver_config_override = "image_compression='disabled'" + + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + pageserver = env.pageserver + ps_http = env.pageserver.http_client() + with env.endpoints.create_start( + "main", tenant_id=tenant_id, pageserver_id=pageserver.id + ) as endpoint: + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + # Generate around 800k worth of easily compressible data to store + for v in range(100): + endpoint.safe_psql( + f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))" + ) + # run compaction to create image layers + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + + layer_map = ps_http.layer_map_info(tenant_id, timeline_id) + image_layer_count = 0 + delta_layer_count = 0 + for layer in layer_map.historic_layers: + if layer.kind == "Image": + image_layer_count += 1 + elif layer.kind == "Delta": + delta_layer_count += 1 + assert image_layer_count > 0 + assert delta_layer_count > 0 + + log.info(f"images: {image_layer_count}, deltas: {delta_layer_count}") + + bytes_in = pageserver.http_client().get_metric_value( + "pageserver_compression_image_in_bytes_total" + ) + bytes_out = pageserver.http_client().get_metric_value( + "pageserver_compression_image_out_bytes_total" + ) + assert bytes_in is not None + assert bytes_out is not None + log.info(f"Compression ratio: {bytes_out/bytes_in} ({bytes_out} in, {bytes_out} out)") + + if enabled: + # We are writing high compressible repetitive plain text, expect excellent compression + EXPECT_RATIO = 0.2 + assert bytes_out / bytes_in < EXPECT_RATIO + else: + # Nothing should be compressed if we disabled it. + assert bytes_out >= bytes_in + + # Destroy the endpoint and create a new one to resetthe caches + with env.endpoints.create_start( + "main", tenant_id=tenant_id, pageserver_id=pageserver.id + ) as endpoint: + for v in range(100): + res = endpoint.safe_psql( + f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)" + ) + assert res[0][0] == 1 diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index fb8b7b22fa71..3c834f430b08 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -230,6 +230,9 @@ def _eviction_env( neon_env_builder.num_pageservers = num_pageservers neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + # Disable compression support for EvictionEnv to get larger layer sizes + neon_env_builder.pageserver_config_override = "image_compression='disabled'" + # initial tenant will not be present on this pageserver env = neon_env_builder.init_configs() env.start()