Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved V2Batch #332

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/main/java/org/logstash/beats/V2Batch.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package org.logstash.beats;

import java.io.Closeable;
import java.util.Iterator;
import java.util.NoSuchElementException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.util.Iterator;

/**
* Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use.
*/
public class V2Batch implements Batch {
public class V2Batch implements Batch, Closeable {
private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer();
private int written = 0;
private int read = 0;
private static final int SIZE_OF_INT = 4;
private int batchSize;
private int highestSequence = -1;
Expand All @@ -27,20 +28,27 @@ public byte getProtocol() {
return Protocol.VERSION_2;
}

public Iterator<Message> iterator(){
internalBuffer.resetReaderIndex();
public Iterator<Message> iterator() {
return new Iterator<Message>() {
private int read = 0;
private ByteBuf readerBuffer = internalBuffer.asReadOnly();
{
readerBuffer.resetReaderIndex();
}
@Override
public boolean hasNext() {
return read < written;
}

@Override
public Message next() {
int sequenceNumber = internalBuffer.readInt();
int readableBytes = internalBuffer.readInt();
Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes));
internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes);
if (read >= written) {
throw new NoSuchElementException();
}
int sequenceNumber = readerBuffer.readInt();
int readableBytes = readerBuffer.readInt();
Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes));
readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes);
message.setBatch(V2Batch.this);
read++;
return message;
Expand Down Expand Up @@ -101,4 +109,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
public void release() {
internalBuffer.release();
}

@Override
public void close() {
release();
}

}
77 changes: 37 additions & 40 deletions src/test/java/org/logstash/beats/V2BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,49 @@ public class V2BatchTest {

@Test
public void testIsEmpty() {
V2Batch batch = new V2Batch();
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
try (V2Batch batch = new V2Batch()){
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
}
}

@Test
public void testSize() {
V2Batch batch = new V2Batch();
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
try (V2Batch batch = new V2Batch()) {
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
}
}

@Test
public void TestGetProtocol() {
assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol());
public void testGetProtocol() {
try (V2Batch batch = new V2Batch()) {
assertEquals(Protocol.VERSION_2, batch.getProtocol());
}
}

@Test
public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);

for(int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
for (int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
}
assertTrue(batch.isComplete());
}

assertTrue(batch.isComplete());
}

@Test
public void testBigBatch() {
V2Batch batch = new V2Batch();
int size = 4096;
assertEquals(0, batch.size());
try {
try (V2Batch batch = new V2Batch()) {
int size = 4096;
assertEquals(0, batch.size());
ByteBuf content = messageContents();
for (int i = 0; i < size; i++) {
batch.addMessage(i, content, content.readableBytes());
Expand All @@ -71,8 +72,6 @@ public void testBigBatch() {
for (Message message : batch) {
assertEquals(message.getSequence(), i++);
}
}finally {
batch.release();
}
}

Expand All @@ -91,17 +90,15 @@ public void testHighSequence(){
assertEquals(startSequenceNumber + numberOfEvent, batch.getHighestSequence());
}


@Test
public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());

assertFalse(batch.isComplete());
public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isComplete());
}
}

public static ByteBuf messageContents() {
Expand All @@ -114,4 +111,4 @@ public static ByteBuf messageContents() {
throw new RuntimeException(e);
}
}
}
}