diff --git a/docs/static/troubleshoot/health-pipeline-flow-worker-utilization.asciidoc b/docs/static/troubleshoot/health-pipeline-flow-worker-utilization.asciidoc new file mode 100644 index 00000000000..f778c43fbf4 --- /dev/null +++ b/docs/static/troubleshoot/health-pipeline-flow-worker-utilization.asciidoc @@ -0,0 +1,44 @@ +[[health-report-pipeline-flow-worker-utilization]] +=== Health Report Pipeline Flow: Worker Utilization + +The Pipeline indicator has a `flow:worker_utilization` probe that is capable of producing one of several diagnoses about blockages in the pipeline. + +A pipeline is considered "blocked" when its workers are fully-utilized, because if they are consistently spending 100% of their time processing events, they are unable to pick up new events from the queue. +This can cause back-pressure to cascade to upstream services, which can result in data loss or duplicate processing depending on upstream configuration. + +The issue typically stems from one or more causes: + +* a downstream resource being blocked, +* a plugin consuming more resources than expected, and/or +* insufficient resources being allocated to the pipeline. + +To address the issue, observe the <> from the <>, and identify which plugins have the highest `worker_utilization`. +This will tell you which plugins are spending the most of the pipeline's worker resources. + +* If the offending plugin connects to a downstream service or another pipeline that is exerting back-pressure, the issue needs to be addressed in the downstream service or pipeline. +* If the offending plugin connects to a downstream service with high network latency, throughput for the pipeline may be improved by <>. +* If the offending plugin is a computation-heavy filter such as `grok` or `kv`, its configuration may need to be tuned to eliminate wasted computation. + +[[health-report-pipeline-flow-worker-utilization-diagnosis-blocked-5m]] +==== [[blocked-5m]]Blocked Pipeline (5 minutes) + +A pipeline that has been completely blocked for five minutes or more represents a critical blockage to the flow of events through your pipeline that needs to be addressed immediately to avoid or limit data loss. +See above for troubleshooting steps. + +[[health-report-pipeline-flow-worker-utilization-diagnosis-nearly-blocked-5m]] +==== [[nearly-blocked-5m]]Nearly Blocked Pipeline (5 minutes) + +A pipeline that has been nearly blocked for five minutes or more may be creating intermittent blockage to the flow of events through your pipeline, which can result in the risk of data loss. +See above for troubleshooting steps. + +[[health-report-pipeline-flow-worker-utilization-diagnosis-blocked-1m]] +==== [[blocked-1m]]Blocked Pipeline (1 minute) + +A pipeline that has been completely blocked for one minute or more represents a high-risk or upcoming blockage to the flow of events through your pipeline that likely needs to be addressed soon to avoid or limit data loss. +See above for troubleshooting steps. + +[[health-report-pipeline-flow-worker-utilization-diagnosis-nearly-blocked-1m]] +==== [[nearly-blocked-1m]]Nearly Blocked Pipeline (1 minute) + +A pipeline that has been nearly blocked for one minute or more may be creating intermittent blockage to the flow of events through your pipeline, which can result in the risk of data loss. +See above for troubleshooting steps. diff --git a/docs/static/troubleshoot/troubleshooting.asciidoc b/docs/static/troubleshoot/troubleshooting.asciidoc index 66bb60f45e5..e8bd68ce0c0 100644 --- a/docs/static/troubleshoot/troubleshooting.asciidoc +++ b/docs/static/troubleshoot/troubleshooting.asciidoc @@ -29,3 +29,4 @@ include::ts-plugins-general.asciidoc[] include::ts-plugins.asciidoc[] include::ts-other-issues.asciidoc[] include::health-pipeline-status.asciidoc[] +include::health-pipeline-flow-worker-utilization.asciidoc[] diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index d1f8e006995..4ab6c684603 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -168,17 +168,17 @@ def pipeline_details(pipeline_id) return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN) end - status = pipeline_state.synchronize do |sync_state| - case - when sync_state.loading? then PipelineIndicator::Status::LOADING - when sync_state.crashed? then PipelineIndicator::Status::TERMINATED - when sync_state.running? then PipelineIndicator::Status::RUNNING - when sync_state.finished? then PipelineIndicator::Status::FINISHED - else PipelineIndicator::Status::UNKNOWN - end + pipeline_state.synchronize do |sync_state| + status = case + when sync_state.loading? then PipelineIndicator::Status::LOADING + when sync_state.crashed? then PipelineIndicator::Status::TERMINATED + when sync_state.running? then PipelineIndicator::Status::RUNNING + when sync_state.finished? then PipelineIndicator::Status::FINISHED + else PipelineIndicator::Status::UNKNOWN + end + + PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation) end - - return PipelineIndicator::Details.new(status) end def auto_reload? diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 068e529ef4f..c169ee05380 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -35,12 +35,16 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; @@ -82,6 +86,7 @@ import org.logstash.execution.queue.QueueWriter; import org.logstash.ext.JRubyAbstractQueueWriteClientExt; import org.logstash.ext.JRubyWrappedWriteClientExt; +import org.logstash.health.PipelineIndicator; import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.FlowMetric; @@ -161,7 +166,7 @@ public class AbstractPipelineExt extends RubyBasicObject { private QueueReadClientBase filterQueueClient; - private ArrayList flowMetrics = new ArrayList<>(); + private transient final ScopedFlowMetrics scopedFlowMetrics = new ScopedFlowMetrics(); private @SuppressWarnings("rawtypes") RubyArray inputs; private @SuppressWarnings("rawtypes") RubyArray filters; private @SuppressWarnings("rawtypes") RubyArray outputs; @@ -563,34 +568,34 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final LongCounter eventsInCounter = initOrGetCounterMetric(context, eventsNamespace, IN_KEY); final FlowMetric inputThroughput = createFlowMetric(INPUT_THROUGHPUT_KEY, eventsInCounter, uptimeInPreciseSeconds); - this.flowMetrics.add(inputThroughput); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, inputThroughput); storeMetric(context, flowNamespace, inputThroughput); final LongCounter eventsFilteredCounter = initOrGetCounterMetric(context, eventsNamespace, FILTERED_KEY); final FlowMetric filterThroughput = createFlowMetric(FILTER_THROUGHPUT_KEY, eventsFilteredCounter, uptimeInPreciseSeconds); - this.flowMetrics.add(filterThroughput); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, filterThroughput); storeMetric(context, flowNamespace, filterThroughput); final LongCounter eventsOutCounter = initOrGetCounterMetric(context, eventsNamespace, OUT_KEY); final FlowMetric outputThroughput = createFlowMetric(OUTPUT_THROUGHPUT_KEY, eventsOutCounter, uptimeInPreciseSeconds); - this.flowMetrics.add(outputThroughput); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, outputThroughput); storeMetric(context, flowNamespace, outputThroughput); final TimerMetric queuePushWaitInMillis = initOrGetTimerMetric(context, eventsNamespace, PUSH_DURATION_KEY); final FlowMetric backpressureFlow = createFlowMetric(QUEUE_BACKPRESSURE_KEY, queuePushWaitInMillis, uptimeInPreciseMillis); - this.flowMetrics.add(backpressureFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, backpressureFlow); storeMetric(context, flowNamespace, backpressureFlow); final TimerMetric durationInMillis = initOrGetTimerMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY); final FlowMetric concurrencyFlow = createFlowMetric(WORKER_CONCURRENCY_KEY, durationInMillis, uptimeInPreciseMillis); - this.flowMetrics.add(concurrencyFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, concurrencyFlow); storeMetric(context, flowNamespace, concurrencyFlow); final int workerCount = getSetting(context, SettingKeyDefinitions.PIPELINE_WORKERS).convertToInteger().getIntValue(); final UpScaledMetric percentScaledDurationInMillis = new UpScaledMetric(durationInMillis, 100); final UpScaledMetric availableWorkerTimeInMillis = new UpScaledMetric(uptimeInPreciseMillis, workerCount); final FlowMetric utilizationFlow = createFlowMetric(WORKER_UTILIZATION_KEY, percentScaledDurationInMillis, availableWorkerTimeInMillis); - this.flowMetrics.add(utilizationFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, utilizationFlow); storeMetric(context, flowNamespace, utilizationFlow); initializePqFlowMetrics(context, flowNamespace, uptimeMetric); @@ -600,10 +605,22 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { @JRubyMethod(name = "collect_flow_metrics") public final IRubyObject collectFlowMetrics(final ThreadContext context) { - this.flowMetrics.forEach(FlowMetric::capture); + this.scopedFlowMetrics.captureAll(); return context.nil; } + // short-term limits the scope of what is included in the flow observations + public final PipelineIndicator.FlowObservation collectWorkerUtilizationFlowObservation() { + return this.collectFlowObservation(WORKER_UTILIZATION_KEY.asJavaString()::equals); + } + + public final PipelineIndicator.FlowObservation collectFlowObservation(final Predicate filter) { + Map> collect = this.scopedFlowMetrics.getFlowMetrics(ScopedFlowMetrics.Scope.WORKER).stream() + .filter(fm -> filter.test(fm.getName())) + .collect(Collectors.toUnmodifiableMap(FlowMetric::getName, FlowMetric::getValue)); + return new PipelineIndicator.FlowObservation(collect); + } + private static FlowMetric createFlowMetric(final RubySymbol name, final Metric numeratorMetric, final Metric denominatorMetric) { @@ -671,12 +688,13 @@ private void initializePqFlowMetrics(final ThreadContext context, final RubySymb final Supplier eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null); final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds); - this.flowMetrics.add(growthEventsFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, growthEventsFlow); storeMetric(context, flowNamespace, growthEventsFlow); final Supplier queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null); final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds); - this.flowMetrics.add(growthBytesFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, growthBytesFlow); + storeMetric(context, flowNamespace, growthBytesFlow); } } @@ -705,7 +723,7 @@ private void initializePluginThroughputFlowMetric(final ThreadContext context, f final LongCounter eventsOut = initOrGetCounterMetric(context, eventsNamespace, OUT_KEY); final FlowMetric throughputFlow = createFlowMetric(PLUGIN_THROUGHPUT_KEY, eventsOut, uptimeInPreciseSeconds); - this.flowMetrics.add(throughputFlow); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, throughputFlow); final RubySymbol[] flowNamespace = buildNamespace(PLUGINS_KEY, INPUTS_KEY, RubyUtil.RUBY.newString(id).intern(), FLOW_KEY); storeMetric(context, flowNamespace, throughputFlow); @@ -718,12 +736,12 @@ private void initializePluginWorkerFlowMetrics(final ThreadContext context, fina final TimerMetric durationInMillis = initOrGetTimerMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY); final LongCounter counterEvents = initOrGetCounterMetric(context, eventsNamespace, IN_KEY); final FlowMetric workerCostPerEvent = createFlowMetric(WORKER_MILLIS_PER_EVENT_KEY, durationInMillis, counterEvents); - this.flowMetrics.add(workerCostPerEvent); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, workerCostPerEvent); final UpScaledMetric percentScaledDurationInMillis = new UpScaledMetric(durationInMillis, 100); final UpScaledMetric availableWorkerTimeInMillis = new UpScaledMetric(uptimeInPreciseMillis, workerCount); final FlowMetric workerUtilization = createFlowMetric(WORKER_UTILIZATION_KEY, percentScaledDurationInMillis, availableWorkerTimeInMillis); - this.flowMetrics.add(workerUtilization); + this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, workerUtilization); final RubySymbol[] flowNamespace = buildNamespace(PLUGINS_KEY, key, RubyUtil.RUBY.newString(id).intern(), FLOW_KEY); storeMetric(context, flowNamespace, workerCostPerEvent); @@ -884,4 +902,33 @@ public IRubyObject isShutdownRequested(final ThreadContext context) { public final RubyString getLastErrorEvaluationReceived(final ThreadContext context) { return RubyString.newString(context.runtime, lastErrorEvaluationReceived); } + + private static class ScopedFlowMetrics { + enum Scope { + WORKER, + PLUGIN + } + private final Map> flowsByScope = new ConcurrentHashMap<>(); + + void register(final Scope scope, final FlowMetric metric) { + flowsByScope.compute(scope, (s, scopedFlows) -> { + if (scopedFlows == null) { + return List.of(metric); + } else { + final ArrayList mutable = new ArrayList<>(scopedFlows.size() + 1); + mutable.addAll(scopedFlows); + mutable.add(metric); + return List.copyOf(mutable); + } + }); + } + + void captureAll() { + flowsByScope.values().stream().flatMap(List::stream).forEach(FlowMetric::capture); + } + + List getFlowMetrics(final Scope scope) { + return flowsByScope.getOrDefault(scope, List.of()); + } + } } diff --git a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java index 3f6821fec4f..ce6a5ed10bc 100644 --- a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java +++ b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java @@ -21,9 +21,13 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.logstash.instrument.metrics.MetricKeys; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import java.util.OptionalDouble; +import java.util.stream.Collectors; import static org.logstash.health.Status.*; @@ -37,6 +41,7 @@ public static PipelineIndicator forPipeline(final String pipelineId, final PipelineDetailsProvider pipelineDetailsProvider) { PipelineIndicator pipelineIndicator = new PipelineIndicator(new DetailsSupplier(pipelineId, pipelineDetailsProvider)); pipelineIndicator.attachProbe("status", new StatusProbe()); + pipelineIndicator.attachProbe("flow:worker_utilization", new FlowWorkerUtilizationProbe()); return pipelineIndicator; } @@ -81,13 +86,21 @@ public void serialize(Status value, JsonGenerator gen, SerializerProvider serial @JsonSerialize(using = Details.JsonSerializer.class) public static class Details implements Observation { private final Status status; + private final FlowObservation flow; - public Details(final Status status) { + public Details(Status status, FlowObservation flow) { this.status = Objects.requireNonNull(status, "status cannot be null"); + this.flow = Objects.requireNonNullElse(flow, FlowObservation.EMPTY); + } + + public Details(final Status status) { + this(status, null); } + public Status getStatus() { return this.status; } + public FlowObservation getFlow() { return this.flow; } public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer
{ @Override @@ -96,11 +109,52 @@ public void serialize(final Details details, final SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); jsonGenerator.writeObjectField("status", details.getStatus()); + FlowObservation flow = details.getFlow(); + if (flow != null && !flow.isEmpty()) { + jsonGenerator.writeObjectField("flow", flow); + } jsonGenerator.writeEndObject(); } } } + @JsonSerialize(using = FlowObservation.JsonSerializer.class) + public static class FlowObservation { + private static FlowObservation EMPTY = new FlowObservation(Map.of()); + final Map> capture; + + public FlowObservation(final Map> capture) { + this.capture = Objects.requireNonNull(capture, "capture cannot be null").entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, (e) -> Map.copyOf(e.getValue()))); + } + + public OptionalDouble getRate(final String flowMetricName, final String flowMetricWindow) { + final Map flowMetrics = capture.get(flowMetricName); + if (flowMetrics == null) { + return OptionalDouble.empty(); + } + final Double rate = flowMetrics.get(flowMetricWindow); + if (rate == null) { + return OptionalDouble.empty(); + } else { + return OptionalDouble.of(rate); + } + } + + public boolean isEmpty() { + return capture.isEmpty() || capture.values().stream().allMatch(Map::isEmpty); + } + + static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(FlowObservation value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeObject(value.capture); + } + } + } + + /** * This interface is implemented by the ruby-Agent */ @@ -171,7 +225,7 @@ public Analysis analyze(final Details details) { case UNKNOWN: default: return Analysis.builder() - .withStatus(YELLOW) + .withStatus(UNKNOWN) .withDiagnosis(db -> db .withId(diagnosisId("unknown")) .withCause("pipeline is not known; it may have been recently deleted or failed to start") @@ -192,4 +246,91 @@ static String impactId(final String state) { } + + static class FlowWorkerUtilizationProbe implements Probe
{ + static final String LAST_1_MINUTE = "last_1_minute"; + static final String LAST_5_MINUTES = "last_5_minutes"; + + static final String WORKER_UTILIZATION = MetricKeys.WORKER_UTILIZATION_KEY.asJavaString(); + + static final Impact.Builder BLOCKED_PROCESSING = Impact.builder() + .withId(impactId("blocked_processing")) + .withDescription("the pipeline is blocked") + .withAdditionalImpactArea(ImpactArea.PIPELINE_EXECUTION); + + static final HelpUrl HELP_URL = new HelpUrl("health-report-pipeline-flow-worker-utilization"); + + @Override + public Analysis analyze(Details observation) { + + final OptionalDouble lastFiveMinutes = observation.flow.getRate(WORKER_UTILIZATION, LAST_5_MINUTES); + final OptionalDouble lastOneMinute = observation.flow.getRate(WORKER_UTILIZATION, LAST_1_MINUTE); + + if (lastFiveMinutes.isPresent()) { + if (lastFiveMinutes.getAsDouble() > 99.999) { + return Analysis.builder() + .withStatus(RED) + .withDiagnosis(db -> db + .withId(diagnosisId("5m-blocked")) + .withCause(diagnosisCause("completely blocked", "five minutes", false)) + .withAction("address bottleneck or add resources") + .withHelpUrl(HELP_URL.withAnchor("blocked-5m").toString())) + .withImpact(BLOCKED_PROCESSING.withSeverity(1).build()) + .build(); + } else if (lastFiveMinutes.getAsDouble() >= 95.00) { + final boolean isRecovering = lastOneMinute.isPresent() && lastOneMinute.getAsDouble() <= 80.00; + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("5m-nearly-blocked")) + .withCause(diagnosisCause("nearly blocked", "five minutes", isRecovering)) + .withAction(isRecovering ? "continue to monitor" : "address bottleneck or add resources") + .withHelpUrl(HELP_URL.withAnchor("nearly-blocked-5m").toString())) + .withImpact(BLOCKED_PROCESSING.withSeverity(1).build()) + .build(); + } + } + + if (lastOneMinute.isPresent()) { + if (lastOneMinute.getAsDouble() > 99.999) { + return Analysis.builder().withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("1m-blocked")) + .withCause(diagnosisCause("completely blocked", "one minute", false)) + .withAction("address bottleneck or add resources") + .withHelpUrl(HELP_URL.withAnchor("blocked-1m").toString())) + .withImpact(BLOCKED_PROCESSING.withSeverity(2).build()) + .build(); + } else if (lastOneMinute.getAsDouble() >= 95.00) { + return Analysis.builder().withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("1m-nearly-blocked")) + .withCause(diagnosisCause("nearly blocked", "one minute", false)) + .withAction("address bottleneck or add resources") + .withHelpUrl(HELP_URL.withAnchor("nearly-blocked-1m").toString())) + .withImpact(BLOCKED_PROCESSING.withSeverity(2).build()) + .build(); + } + } + + + return Analysis.builder().build(); + } + + static String diagnosisCause(String condition, String period, boolean isRecovering) { + final StringBuilder cause = new StringBuilder("pipeline workers have been ").append(condition).append(" for at least ").append(period); + if (isRecovering) { + cause.append(", but they appear to be recovering"); + } + return cause.toString(); + } + + static String diagnosisId(final String state) { + return String.format("logstash:health:pipeline:flow:worker_utilization:diagnosis:%s", state); + } + + static String impactId(final String state) { + return String.format("logstash:health:pipeline:flow:impact:%s", state); + } + } } diff --git a/logstash-core/src/main/java/org/logstash/health/Probe.java b/logstash-core/src/main/java/org/logstash/health/Probe.java index ec9cf71d2b4..32d0b0bf808 100644 --- a/logstash-core/src/main/java/org/logstash/health/Probe.java +++ b/logstash-core/src/main/java/org/logstash/health/Probe.java @@ -45,7 +45,7 @@ public static class Builder { private final Impact impact; public Builder() { - this(Status.UNKNOWN, null, null); + this(Status.GREEN, null, null); } public Builder(final Status status, diff --git a/logstash-core/src/test/java/org/logstash/health/PipelineIndicatorTest.java b/logstash-core/src/test/java/org/logstash/health/PipelineIndicatorTest.java new file mode 100644 index 00000000000..3b242dee7c9 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/PipelineIndicatorTest.java @@ -0,0 +1,182 @@ +package org.logstash.health; + +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@RunWith(Enclosed.class) +public class PipelineIndicatorTest { + public static class StatusProbeTest { + private static final PipelineIndicator.StatusProbe statusProbe = new PipelineIndicator.StatusProbe(); + + @Test + public void testRunning() { + final PipelineIndicator.Details details = detailsForStatus(PipelineIndicator.Status.RUNNING); + + final Probe.Analysis analysis = statusProbe.analyze(details); + assertThat(analysis.status, is(Status.GREEN)); + assertThat(analysis.diagnosis, is(nullValue())); + assertThat(analysis.impact, is(nullValue())); + } + + @Test + public void testLoading() { + final PipelineIndicator.Details details = detailsForStatus(PipelineIndicator.Status.LOADING); + + final Probe.Analysis analysis = statusProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis, is(notNullValue())); + assertThat(analysis.diagnosis.cause, containsString("loading")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-status.html#loading")); + assertThat(analysis.impact, is(notNullValue())); + assertThat(analysis.impact.id, containsString("not_processing")); + assertThat(analysis.impact.impactAreas, contains(ImpactArea.PIPELINE_EXECUTION)); + } + + @Test + public void testFinished() { + final PipelineIndicator.Details details = detailsForStatus(PipelineIndicator.Status.FINISHED); + + final Probe.Analysis analysis = statusProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis, is(notNullValue())); + assertThat(analysis.diagnosis.cause, containsString("finished")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-status.html#finished")); + assertThat(analysis.impact, is(notNullValue())); + assertThat(analysis.impact.id, containsString("not_processing")); + assertThat(analysis.impact.impactAreas, contains(ImpactArea.PIPELINE_EXECUTION)); + } + + @Test + public void testTerminated() { + final PipelineIndicator.Details details = detailsForStatus(PipelineIndicator.Status.TERMINATED); + + final Probe.Analysis analysis = statusProbe.analyze(details); + assertThat(analysis.status, is(Status.RED)); + assertThat(analysis.diagnosis, is(notNullValue())); + assertThat(analysis.diagnosis.cause, containsString("error")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-status.html#terminated")); + assertThat(analysis.impact, is(notNullValue())); + assertThat(analysis.impact.id, containsString("not_processing")); + assertThat(analysis.impact.impactAreas, contains(ImpactArea.PIPELINE_EXECUTION)); + } + + @Test + public void testUnknown() { + final PipelineIndicator.Details details = detailsForStatus(PipelineIndicator.Status.UNKNOWN); + + final Probe.Analysis analysis = statusProbe.analyze(details); + assertThat(analysis.status, is(Status.UNKNOWN)); + assertThat(analysis.diagnosis, is(notNullValue())); + assertThat(analysis.diagnosis.cause, containsString("not known")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-status.html#unknown")); + assertThat(analysis.impact, is(notNullValue())); + assertThat(analysis.impact.id, containsString("not_processing")); + assertThat(analysis.impact.impactAreas, contains(ImpactArea.PIPELINE_EXECUTION)); + } + + static PipelineIndicator.Details detailsForStatus(PipelineIndicator.Status status) { + return new PipelineIndicator.Details(status); + } + } + + public static class FlowWorkerUtilizationProbeTest { + + private static final String WORKER_UTILIZATION = "worker_utilization"; + private static final PipelineIndicator.FlowWorkerUtilizationProbe flowWorkerUtilizationProbe = new PipelineIndicator.FlowWorkerUtilizationProbe(); + + @Test + public void testFlowWorkerUtilizationNew() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.GREEN)); + } + + @Test + public void testFlowWorkerUtilizationOK() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 38.4, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.GREEN)); + } + + @Test + public void testFlowWorkerUtilizationNearlyBlockedOneMinute() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 97.1, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis.cause, containsString("nearly blocked")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-flow-worker-utilization.html#nearly-blocked-1m")); + } + + @Test + public void testFlowWorkerUtilizationCompletelyBlockedOneMinute() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 100, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis.cause, containsString("completely blocked")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-flow-worker-utilization.html#blocked-1m")); + } + + @Test + public void testFlowWorkerUtilizationNearlyBlockedFiveMinutes() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 97.1, 96.1, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis.cause, containsString("nearly blocked")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-flow-worker-utilization.html#nearly-blocked-5m")); + } + + @Test + public void testFlowWorkerUtilizationCompletelyBlockedFiveMinutes() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 100, 100, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.RED)); + assertThat(analysis.diagnosis.cause, containsString("completely blocked")); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-flow-worker-utilization.html#blocked-5m")); + } + + @Test + public void testFlowWorkerUtilizationNearlyBlockedFiveMinutesRecovering() throws Exception { + final PipelineIndicator.Details details = detailsForFlow(Map.of(WORKER_UTILIZATION, flowAnalysis(30.2, 79, 97, 87.0))); + + Probe.Analysis analysis = flowWorkerUtilizationProbe.analyze(details); + assertThat(analysis.status, is(Status.YELLOW)); + assertThat(analysis.diagnosis.cause, both(containsString("nearly blocked")).and(containsString("recovering"))); + assertThat(analysis.diagnosis.helpUrl, containsString("/health-report-pipeline-flow-worker-utilization.html#nearly-blocked-5m")); + } + + private static PipelineIndicator.Details detailsForFlow(final Map> flow) { + return detailsForFlow(PipelineIndicator.Status.RUNNING, flow); + } + + private static PipelineIndicator.Details detailsForFlow(final PipelineIndicator.Status status, final Map> flow) { + return new PipelineIndicator.Details(status, new PipelineIndicator.FlowObservation(flow)); + } + + private static Map flowAnalysis(final double current, final double lifetime) { + return Map.of("current", current, "lifetime", lifetime); + } + private static Map flowAnalysis(final double current, final double lastOneMinute, final double lifetime) { + return Map.of("current", current, "last_1_minute", lastOneMinute, "lifetime", lifetime); + } + private static Map flowAnalysis(final double current, final double lastOneMinute, final double lastFiveMinutes, final double lifetime) { + return Map.of("current", current, "last_1_minute", lastOneMinute, "last_5_minutes", lastFiveMinutes, "lifetime", lifetime); + } + } +} \ No newline at end of file