Skip to content

Commit

Permalink
Revert "Bugfix for BufferedTokenizer to completely consume lines in c…
Browse files Browse the repository at this point in the history
…ase of lines bigger then sizeLimit (elastic#16482) (elastic#16580)"

This reverts commit 2c98211.
  • Loading branch information
yaauie committed Nov 18, 2024
1 parent 913d705 commit 526e6e6
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
Expand All @@ -41,12 +40,10 @@ public class BufferedTokenizerExt extends RubyObject {
freeze(RubyUtil.RUBY.getCurrentContext());

private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
private StringBuilder headToken = new StringBuilder();
private RubyString delimiter = NEW_LINE;
private int sizeLimit;
private boolean hasSizeLimit;
private int inputSize;
private boolean bufferFullErrorNotified = false;

public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -69,6 +66,7 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
* Extract takes an arbitrary string of input data and returns an array of
* tokenized entities, provided there were any available to extract. This
* makes for easy processing of datagrams using a pattern like:
*
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
*
* @param context ThreadContext
Expand All @@ -79,63 +77,22 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
@SuppressWarnings("rawtypes")
public RubyArray extract(final ThreadContext context, IRubyObject data) {
final RubyArray entities = data.convertToString().split(delimiter, -1);
if (!bufferFullErrorNotified) {
input.clear();
input.addAll(entities);
} else {
// after a full buffer signal
if (input.isEmpty()) {
// after a buffer full error, the remaining part of the line, till next delimiter,
// has to be consumed, unless the input buffer doesn't still contain fragments of
// subsequent tokens.
entities.shift(context);
input.addAll(entities);
} else {
// merge last of the input with first of incoming data segment
if (!entities.isEmpty()) {
RubyString last = ((RubyString) input.pop(context));
RubyString nextFirst = ((RubyString) entities.shift(context));
entities.unshift(last.concat(nextFirst));
input.addAll(entities);
}
}
}

if (hasSizeLimit) {
if (bufferFullErrorNotified) {
bufferFullErrorNotified = false;
if (input.isEmpty()) {
return RubyUtil.RUBY.newArray();
}
}
final int entitiesSize = ((RubyString) input.first()).size();
final int entitiesSize = ((RubyString) entities.first()).size();
if (inputSize + entitiesSize > sizeLimit) {
bufferFullErrorNotified = true;
headToken = new StringBuilder();
inputSize = 0;
input.shift(context); // consume the token fragment that generates the buffer full
throw new IllegalStateException("input buffer full");
}
this.inputSize = inputSize + entitiesSize;
}

if (input.getLength() < 2) {
// this is a specialization case which avoid adding and removing from input accumulator
// when it contains just one element
headToken.append(input.shift(context)); // remove head
input.append(entities.shift(context));
if (entities.isEmpty()) {
return RubyUtil.RUBY.newArray();
}

if (headToken.length() > 0) {
// if there is a pending token part, merge it with the first token segment present
// in the accumulator, and clean the pending token part.
headToken.append(input.shift(context)); // append buffer to first element and
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
headToken = new StringBuilder();
}
headToken.append(input.pop(context)); // put the leftovers in headToken for later
inputSize = headToken.length();
return input;
entities.unshift(input.join(context));
input.clear();
input.append(entities.pop(context));
inputSize = ((RubyString) input.first()).size();
return entities;
}

/**
Expand All @@ -147,14 +104,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
*/
@JRubyMethod
public IRubyObject flush(final ThreadContext context) {
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
headToken = new StringBuilder();
final IRubyObject buffer = input.join(context);
input.clear();
return buffer;
}

@JRubyMethod(name = "empty?")
public IRubyObject isEmpty(final ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
return input.empty_p();
}

}

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 526e6e6

Please sign in to comment.