Skip to content

Commit

Permalink
PQ: avoid blocking writer when precisely full (#16176)
Browse files Browse the repository at this point in the history
* pq: avoid blocking writer when queue is precisely full

A PQ is considered full (and therefore needs to block before releasing the
writer) when its persisted size on disk _exceeds_ its `queue.max_bytes`
capacity.

This removes an edge-case preemptive block when the persisted size after
writing an event _meets_ its `queue.max_bytes` precisely AND its current
head page has insufficient room to also accept a hypothetical future event.

Fixes: #16172

* docs: PQ `queue.max_bytes` cannot be less than `queue.page_capacity`
  • Loading branch information
yaauie authored May 22, 2024
1 parent d0bdc33 commit ea93086
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/static/persistent-queues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ The total of `queue.max_bytes` for _all_ queues should be
lower than the capacity of your disk.
+
TIP: If you are using persistent queues to protect against data loss, but don't
require much buffering, you can set `queue.max_bytes` to a smaller value.
require much buffering, you can set `queue.max_bytes` to a smaller value as long as it is not less than the value of `queue.page_capacity`.
A smaller value produces smaller queues and improves queue performance.

`queue.checkpoint.acks`:: Sets the number of acked events before forcing a checkpoint.
Expand Down
20 changes: 7 additions & 13 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,18 +506,13 @@ private void behead() throws IOException {
}

/**
* <p>Checks if the Queue is full, with "full" defined as either of:</p>
* <p>Assuming a maximum size of the queue larger than 0 is defined:</p>
* <p>Checks if the Queue is full, with "full" defined as either of:
* <ul>
* <li>The sum of the size of all allocated pages is more than the allowed maximum Queue
* size</li>
* <li>The sum of the size of all allocated pages equal to the allowed maximum Queue size
* and the current head page has no remaining capacity.</li>
* </ul>
* <p>or assuming a max unread count larger than 0, is defined "full" is also defined as:</p>
* <ul>
* <li>The current number of unread events exceeds or is equal to the configured maximum
* number of allowed unread events.</li>
* <li>{@code maxUnread} is non-zero and is met or exceeded by the total
* number of unread events on all pages ({@code unreadCount})</li>
* <li>{@code maxBytes} is non-zero and is exceeded by the sum of the sizes
* of all allocated tail pages plus the portion of the current head page
* containing events</li>
* </ul>
* @return True iff the queue is full
*/
Expand All @@ -535,8 +530,7 @@ private boolean isMaxBytesReached() {
return false;
}

final long persistedByteSize = getPersistedByteSize();
return ((persistedByteSize > this.maxBytes) || (persistedByteSize == this.maxBytes && !this.headPage.hasSpace(1)));
return getPersistedByteSize() > this.maxBytes;
}

private boolean isMaxUnreadReached() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,12 @@ public void reachMaxUnreadWithAcking() throws IOException, InterruptedException,
public void reachMaxSizeTest() throws IOException, InterruptedException {
Queueable element = new StringElement("0123456789"); // 10 bytes

int pageSize = computeCapacityForMmapPageIO(element, 10);
int queueMaxSize = (pageSize * 10) - 1; // 100th will overflow max capacity while still on 10th page

// allow 10 elements per page but only 100 events in total
Settings settings = TestSettings.persistedQueueSettings(
computeCapacityForMmapPageIO(element, 10), computeCapacityForMmapPageIO(element, 100), dataPath
pageSize, queueMaxSize, dataPath
);
try (Queue q = new Queue(settings)) {
q.open();
Expand All @@ -542,6 +545,36 @@ public void reachMaxSizeTest() throws IOException, InterruptedException {
}
}

@Test(timeout = 50_000)
public void preciselyMaxSizeTest() throws IOException, InterruptedException {
Queueable element = new StringElement("0123456789"); // 10 bytes

int pageSize = computeCapacityForMmapPageIO(element, 10);
int queueMaxSize = (pageSize * 10); // 100th will precisely fit max capacity

// allow 10 elements per page but only 100 events in total
Settings settings = TestSettings.persistedQueueSettings(
pageSize, queueMaxSize, dataPath
);
try (Queue q = new Queue(settings)) {
q.open();

int elementCount = 100; // should be able to write 100 events before getting full
for (int i = 0; i < elementCount; i++) {
q.write(element);
}

assertThat(q.isFull(), is(false));

// we expect this next write call to block so let's wrap it in a Future
executor.submit(() -> q.write(element));
while (!q.isFull()) {
Thread.sleep(10);
}
assertThat(q.isFull(), is(true));
}
}

@Test(timeout = 50_000)
public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedException, ExecutionException {

Expand Down

0 comments on commit ea93086

Please sign in to comment.