From 85493ce864adb0ec22c5879b515bd2571cd83337 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 16 Oct 2024 16:48:25 +0200 Subject: [PATCH] Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. --- .../logstash/common/BufferedTokenizerExt.java | 67 +++++++++-- .../common/BufferedTokenizerExtTest.java | 91 +++++++++++++++ ...BufferedTokenizerExtWithDelimiterTest.java | 66 +++++++++++ ...BufferedTokenizerExtWithSizeLimitTest.java | 110 ++++++++++++++++++ 4 files changed, 322 insertions(+), 12 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 2d7b90bba7a..2c36370afb3 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -22,6 +22,7 @@ import org.jruby.Ruby; import org.jruby.RubyArray; +import org.jruby.RubyBoolean; import org.jruby.RubyClass; import org.jruby.RubyObject; import org.jruby.RubyString; @@ -40,10 +41,12 @@ public class BufferedTokenizerExt extends RubyObject { freeze(RubyUtil.RUBY.getCurrentContext()); private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); + private StringBuilder headToken = new StringBuilder(); private RubyString delimiter = NEW_LINE; private int sizeLimit; private boolean hasSizeLimit; private int inputSize; + private boolean bufferFullErrorNotified = false; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -66,7 +69,6 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { * Extract takes an arbitrary string of input data and returns an array of * tokenized entities, provided there were any available to extract. This * makes for easy processing of datagrams using a pattern like: - * * {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do} * * @param context ThreadContext @@ -77,22 +79,63 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { final RubyArray entities = data.convertToString().split(delimiter, -1); + if (!bufferFullErrorNotified) { + input.clear(); + input.addAll(entities); + } else { + // after a full buffer signal + if (input.isEmpty()) { + // after a buffer full error, the remaining part of the line, till next delimiter, + // has to be consumed, unless the input buffer doesn't still contain fragments of + // subsequent tokens. + entities.shift(context); + input.addAll(entities); + } else { + // merge last of the input with first of incoming data segment + if (!entities.isEmpty()) { + RubyString last = ((RubyString) input.pop(context)); + RubyString nextFirst = ((RubyString) entities.shift(context)); + entities.unshift(last.concat(nextFirst)); + input.addAll(entities); + } + } + } + if (hasSizeLimit) { - final int entitiesSize = ((RubyString) entities.first()).size(); + if (bufferFullErrorNotified) { + bufferFullErrorNotified = false; + if (input.isEmpty()) { + return RubyUtil.RUBY.newArray(); + } + } + final int entitiesSize = ((RubyString) input.first()).size(); if (inputSize + entitiesSize > sizeLimit) { + bufferFullErrorNotified = true; + headToken = new StringBuilder(); + inputSize = 0; + input.shift(context); // consume the token fragment that generates the buffer full throw new IllegalStateException("input buffer full"); } this.inputSize = inputSize + entitiesSize; } - input.append(entities.shift(context)); - if (entities.isEmpty()) { + + if (input.getLength() < 2) { + // this is a specialization case which avoid adding and removing from input accumulator + // when it contains just one element + headToken.append(input.shift(context)); // remove head return RubyUtil.RUBY.newArray(); } - entities.unshift(input.join(context)); - input.clear(); - input.append(entities.pop(context)); - inputSize = ((RubyString) input.first()).size(); - return entities; + + if (headToken.length() > 0) { + // if there is a pending token part, merge it with the first token segment present + // in the accumulator, and clean the pending token part. + headToken.append(input.shift(context)); // append buffer to first element and + input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array + headToken = new StringBuilder(); + } + headToken.append(input.pop(context)); // put the leftovers in headToken for later + inputSize = headToken.length(); + return input; } /** @@ -104,14 +147,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = input.join(context); - input.clear(); + final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); + headToken = new StringBuilder(); return buffer; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return input.empty_p(); + return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty()); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..5638cffd83b --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeASingleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + assertEquals(List.of(""), tokens); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java new file mode 100644 index 00000000000..aa2d197638c --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + + assertEquals(List.of("foo", "b|r"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + assertEquals(List.of("foo"), tokens); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java new file mode 100644 index 00000000000..859bd35f701 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java @@ -0,0 +1,110 @@ +package org.logstash.common; +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.*; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; + sut.init(context, args); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } +} \ No newline at end of file