-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bugfix for BufferedTokenizer to completely consume lines in case of l…
…ines bigger then sizeLimit (#16482) (#16580) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce) Co-authored-by: Andrea Selva <[email protected]>
- Loading branch information
1 parent
15cdf5c
commit 2c98211
Showing
4 changed files
with
322 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.logstash.common; | ||
|
||
import org.jruby.RubyArray; | ||
import org.jruby.RubyString; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.logstash.RubyTestBase; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.util.List; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.logstash.RubyUtil.RUBY; | ||
|
||
@SuppressWarnings("unchecked") | ||
public final class BufferedTokenizerExtTest extends RubyTestBase { | ||
|
||
private BufferedTokenizerExt sut; | ||
private ThreadContext context; | ||
|
||
@Before | ||
public void setUp() { | ||
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); | ||
context = RUBY.getCurrentContext(); | ||
IRubyObject[] args = {}; | ||
sut.init(context, args); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeASingleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); | ||
|
||
assertEquals(List.of("foo"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldMergeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); | ||
assertEquals(List.of("foobar"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); | ||
|
||
assertEquals(List.of("foo", "bar"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldIgnoreEmptyPayload() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); | ||
assertEquals(List.of("foo"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeEmptyPayloadWithNewline() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n")); | ||
assertEquals(List.of(""), tokens); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); | ||
assertEquals(List.of("", "", ""), tokens); | ||
} | ||
} |
66 changes: 66 additions & 0 deletions
66
logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.logstash.common; | ||
|
||
import org.jruby.RubyArray; | ||
import org.jruby.RubyString; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.logstash.RubyTestBase; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.util.List; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.logstash.RubyUtil.RUBY; | ||
|
||
@SuppressWarnings("unchecked") | ||
public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { | ||
|
||
private BufferedTokenizerExt sut; | ||
private ThreadContext context; | ||
|
||
@Before | ||
public void setUp() { | ||
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); | ||
context = RUBY.getCurrentContext(); | ||
IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; | ||
sut.init(context, args); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); | ||
|
||
assertEquals(List.of("foo", "b|r"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldIgnoreEmptyPayload() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); | ||
assertEquals(List.of("foo"), tokens); | ||
} | ||
} |
110 changes: 110 additions & 0 deletions
110
logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package org.logstash.common; | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
import org.jruby.RubyArray; | ||
import org.jruby.RubyString; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.logstash.RubyTestBase; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.util.List; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.junit.Assert.*; | ||
import static org.logstash.RubyUtil.RUBY; | ||
|
||
@SuppressWarnings("unchecked") | ||
public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { | ||
|
||
private BufferedTokenizerExt sut; | ||
private ThreadContext context; | ||
|
||
@Before | ||
public void setUp() { | ||
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); | ||
context = RUBY.getCurrentContext(); | ||
IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; | ||
sut.init(context, args); | ||
} | ||
|
||
@Test | ||
public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); | ||
|
||
assertEquals(List.of("foo", "bar"), tokens); | ||
} | ||
|
||
@Test | ||
public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { | ||
Exception thrownException = assertThrows(IllegalStateException.class, () -> { | ||
sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); | ||
}); | ||
assertThat(thrownException.getMessage(), containsString("input buffer full")); | ||
} | ||
|
||
@Test | ||
public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { | ||
Exception thrownException = assertThrows(IllegalStateException.class, () -> { | ||
sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); | ||
}); | ||
assertThat(thrownException.getMessage(), containsString("input buffer full")); | ||
|
||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); | ||
assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); | ||
} | ||
|
||
@Test | ||
public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { | ||
sut.extract(context, RubyUtil.RUBY.newString("aaaa")); | ||
|
||
Exception thrownException = assertThrows(IllegalStateException.class, () -> { | ||
sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); | ||
}); | ||
assertThat(thrownException.getMessage(), containsString("input buffer full")); | ||
|
||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); | ||
assertEquals(List.of("bbbb"), tokens); | ||
} | ||
|
||
@Test | ||
public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { | ||
sut.extract(context, RubyUtil.RUBY.newString("aaaa")); | ||
|
||
//first buffer full on 13 "a" letters | ||
Exception thrownException = assertThrows(IllegalStateException.class, () -> { | ||
sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); | ||
}); | ||
assertThat(thrownException.getMessage(), containsString("input buffer full")); | ||
|
||
// second buffer full on 11 "b" letters | ||
Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { | ||
sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); | ||
}); | ||
assertThat(secondThrownException.getMessage(), containsString("input buffer full")); | ||
|
||
// now should resemble processing on c and d | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); | ||
assertEquals(List.of("ccccc", "ddd"), tokens); | ||
} | ||
} |