From ea930861ef414718096b12f481547704581da90c Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 22 May 2024 08:23:18 -0700 Subject: [PATCH] PQ: avoid blocking writer when precisely full (#16176) * pq: avoid blocking writer when queue is precisely full A PQ is considered full (and therefore needs to block before releasing the writer) when its persisted size on disk _exceeds_ its `queue.max_bytes` capacity. This removes an edge-case preemptive block when the persisted size after writing an event _meets_ its `queue.max_bytes` precisely AND its current head page has insufficient room to also accept a hypothetical future event. Fixes: elastic/logstash#16172 * docs: PQ `queue.max_bytes` cannot be less than `queue.page_capacity` --- docs/static/persistent-queues.asciidoc | 2 +- .../java/org/logstash/ackedqueue/Queue.java | 20 ++++------- .../org/logstash/ackedqueue/QueueTest.java | 35 ++++++++++++++++++- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/docs/static/persistent-queues.asciidoc b/docs/static/persistent-queues.asciidoc index 51630f9421d..eb6676f3331 100644 --- a/docs/static/persistent-queues.asciidoc +++ b/docs/static/persistent-queues.asciidoc @@ -61,7 +61,7 @@ The total of `queue.max_bytes` for _all_ queues should be lower than the capacity of your disk. + TIP: If you are using persistent queues to protect against data loss, but don't -require much buffering, you can set `queue.max_bytes` to a smaller value. +require much buffering, you can set `queue.max_bytes` to a smaller value as long as it is not less than the value of `queue.page_capacity`. A smaller value produces smaller queues and improves queue performance. `queue.checkpoint.acks`:: Sets the number of acked events before forcing a checkpoint. diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 0a7e4a04ccb..cb3c4e29701 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -506,18 +506,13 @@ private void behead() throws IOException { } /** - *

Checks if the Queue is full, with "full" defined as either of:

- *

Assuming a maximum size of the queue larger than 0 is defined:

+ *

Checks if the Queue is full, with "full" defined as either of: *

- *

or assuming a max unread count larger than 0, is defined "full" is also defined as:

- * * @return True iff the queue is full */ @@ -535,8 +530,7 @@ private boolean isMaxBytesReached() { return false; } - final long persistedByteSize = getPersistedByteSize(); - return ((persistedByteSize > this.maxBytes) || (persistedByteSize == this.maxBytes && !this.headPage.hasSpace(1))); + return getPersistedByteSize() > this.maxBytes; } private boolean isMaxUnreadReached() { diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index fd075f9ca5e..1b69fb1bec5 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -519,9 +519,12 @@ public void reachMaxUnreadWithAcking() throws IOException, InterruptedException, public void reachMaxSizeTest() throws IOException, InterruptedException { Queueable element = new StringElement("0123456789"); // 10 bytes + int pageSize = computeCapacityForMmapPageIO(element, 10); + int queueMaxSize = (pageSize * 10) - 1; // 100th will overflow max capacity while still on 10th page + // allow 10 elements per page but only 100 events in total Settings settings = TestSettings.persistedQueueSettings( - computeCapacityForMmapPageIO(element, 10), computeCapacityForMmapPageIO(element, 100), dataPath + pageSize, queueMaxSize, dataPath ); try (Queue q = new Queue(settings)) { q.open(); @@ -542,6 +545,36 @@ public void reachMaxSizeTest() throws IOException, InterruptedException { } } + @Test(timeout = 50_000) + public void preciselyMaxSizeTest() throws IOException, InterruptedException { + Queueable element = new StringElement("0123456789"); // 10 bytes + + int pageSize = computeCapacityForMmapPageIO(element, 10); + int queueMaxSize = (pageSize * 10); // 100th will precisely fit max capacity + + // allow 10 elements per page but only 100 events in total + Settings settings = TestSettings.persistedQueueSettings( + pageSize, queueMaxSize, dataPath + ); + try (Queue q = new Queue(settings)) { + q.open(); + + int elementCount = 100; // should be able to write 100 events before getting full + for (int i = 0; i < elementCount; i++) { + q.write(element); + } + + assertThat(q.isFull(), is(false)); + + // we expect this next write call to block so let's wrap it in a Future + executor.submit(() -> q.write(element)); + while (!q.isFull()) { + Thread.sleep(10); + } + assertThat(q.isFull(), is(true)); + } + } + @Test(timeout = 50_000) public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedException, ExecutionException {