diff --git a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java index 6813f3e5877..b0f85a36b0e 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java @@ -37,6 +37,7 @@ import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import java.util.Collection; +import java.util.Optional; /** * JRuby extension @@ -161,7 +162,7 @@ private RubyArray workerStates(final ThreadContext context, final RubyHash batch final RubyArray result = context.runtime.newArray(); ((Iterable) pipeline.callMethod(context, "worker_threads")) .forEach(thread -> { - final long nativeThreadId = ((RubyThread) thread).getNativeThread().getId(); + final RubyHash hash = RubyHash.newHash(context.runtime); IRubyObject status = thread.callMethod(context, "status"); if (status.isNil()) { @@ -170,8 +171,15 @@ private RubyArray workerStates(final ThreadContext context, final RubyHash batch hash.op_aset(context, STATUS_KEY, status); hash.op_aset(context, ALIVE_KEY, thread.callMethod(context, "alive?")); hash.op_aset(context, INDEX_KEY, context.runtime.newFixnum(result.size())); - final IRubyObject batch = batchMap.op_aref(context, context.runtime.newFixnum(nativeThreadId)); - hash.op_aset(context, INFLIGHT_COUNT_KEY, extractBatchSize(context, batch)); + + IRubyObject batchSize = Optional.of((RubyThread) thread) + .map(RubyThread::getNativeThread) + .map(Thread::getId) + .map(id -> batchMap.op_aref(context, context.runtime.newFixnum(id))) + .map(batch -> extractBatchSize(context, batch)) + .orElse(context.runtime.newFixnum(0L)); + + hash.op_aset(context, INFLIGHT_COUNT_KEY, batchSize); result.add(hash); }); return result;