Skip to content

Commit

Permalink
Improved V2Batch: iterators don't change V2Batch internal state any m…
Browse files Browse the repository at this point in the history
…ore. It now implement Closable, for auto-release.
  • Loading branch information
fbacchella committed Jun 24, 2018
1 parent 95b94d3 commit c2fed53
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 48 deletions.
31 changes: 23 additions & 8 deletions src/main/java/org/logstash/beats/V2Batch.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.logstash.beats;

import java.io.Closeable;
import java.util.Iterator;
import java.util.NoSuchElementException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.util.Iterator;

/**
* Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use.
*/
Expand All @@ -26,20 +28,27 @@ public byte getProtocol() {
return Protocol.VERSION_2;
}

public Iterator<Message> iterator(){
internalBuffer.resetReaderIndex();
public Iterator<Message> iterator() {
return new Iterator<Message>() {
private int read = 0;
private ByteBuf readerBuffer = internalBuffer.asReadOnly();
{
readerBuffer.resetReaderIndex();
}
@Override
public boolean hasNext() {
return read < written;
}

@Override
public Message next() {
int sequenceNumber = internalBuffer.readInt();
int readableBytes = internalBuffer.readInt();
Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes));
internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes);
if (read >= written) {
throw new NoSuchElementException();
}
int sequenceNumber = readerBuffer.readInt();
int readableBytes = readerBuffer.readInt();
Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes));
readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes);
message.setBatch(V2Batch.this);
read++;
return message;
Expand Down Expand Up @@ -92,4 +101,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
public void release() {
internalBuffer.release();
}

@Override
public void close() {
release();
}

}
78 changes: 38 additions & 40 deletions src/test/java/org/logstash/beats/V2BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,49 @@ public class V2BatchTest {

@Test
public void testIsEmpty() {
V2Batch batch = new V2Batch();
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
try (V2Batch batch = new V2Batch()){
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
}
}

@Test
public void testSize() {
V2Batch batch = new V2Batch();
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
try (V2Batch batch = new V2Batch()) {
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
}
}

@Test
public void TestGetProtocol() {
assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol());
public void testGetProtocol() {
try (V2Batch batch = new V2Batch()) {
assertEquals(Protocol.VERSION_2, batch.getProtocol());
}
}

@Test
public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);

for(int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
for (int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
}
assertTrue(batch.isComplete());
}

assertTrue(batch.isComplete());
}

@Test
public void testBigBatch() {
V2Batch batch = new V2Batch();
int size = 4096;
assertEquals(0, batch.size());
try {
try (V2Batch batch = new V2Batch()) {
int size = 4096;
assertEquals(0, batch.size());
ByteBuf content = messageContents();
for (int i = 0; i < size; i++) {
batch.addMessage(i, content, content.readableBytes());
Expand All @@ -70,22 +71,19 @@ public void testBigBatch() {
for (Message message : batch) {
assertEquals(message.getSequence(), i++);
}
}finally {
batch.release();
}
}

}

@Test
public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());

assertFalse(batch.isComplete());
public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isComplete());
}
}

public static ByteBuf messageContents() {
Expand All @@ -98,4 +96,4 @@ public static ByteBuf messageContents() {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit c2fed53

Please sign in to comment.