diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java index 4a05bd80a7a..487793d62bf 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java @@ -19,10 +19,12 @@ package org.logstash.plugins.pipeline; +import co.elastic.logstash.api.DeprecationLogger; import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.ext.JrubyEventExtLibrary; +import org.logstash.log.DefaultDeprecationLogger; import java.util.Collection; import java.util.Optional; @@ -37,6 +39,7 @@ public interface PipelineBus { Logger LOGGER = LogManager.getLogger(PipelineBus.class); + DeprecationLogger DEPRECATION_LOGGER = new DefaultDeprecationLogger(LOGGER); /** * API-stable entry-point for creating a {@link PipelineBus} @@ -45,7 +48,9 @@ public interface PipelineBus { static PipelineBus create() { final String pipelineBusImplementation = System.getProperty("logstash.pipelinebus.implementation", "v2"); switch (pipelineBusImplementation) { - case "v1": return new PipelineBusV1(); + case "v1": + DEPRECATION_LOGGER.deprecated("The legacy pipeline bus selected with `logstash.pipelinebus.implementation=v1` is deprecated, and will be removed in a future release of Logstash"); + return new PipelineBusV1(); case "v2": return new PipelineBusV2(); default: LOGGER.warn("unknown pipeline-bus implementation: {}", pipelineBusImplementation); diff --git a/logstash-core/src/test/java/org/logstash/log/LoggingSpyResource.java b/logstash-core/src/test/java/org/logstash/log/LoggingSpyResource.java new file mode 100644 index 00000000000..d169460cc7e --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/log/LoggingSpyResource.java @@ -0,0 +1,34 @@ +package org.logstash.log; + +import org.apache.logging.log4j.core.*; +import org.apache.logging.log4j.test.appender.ListAppender; +import org.junit.rules.ExternalResource; + +import java.util.List; + +public class LoggingSpyResource extends ExternalResource { + + private static final String APPENDER_NAME = "spyAppender"; + + private final Logger loggerToSpyOn; + private final ListAppender appender = new ListAppender(APPENDER_NAME); + + public LoggingSpyResource(final org.apache.logging.log4j.Logger loggerToSpyOn) { + this.loggerToSpyOn = (Logger) loggerToSpyOn; + } + + @Override + protected void before() throws Throwable { + appender.start(); + loggerToSpyOn.addAppender(appender); + } + + @Override + protected void after() { + loggerToSpyOn.removeAppender(appender); + } + + public List getLogEvents() { + return List.copyOf(appender.getEvents()); + } +} diff --git a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java index 268ed8d0949..38bad307e81 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java @@ -19,16 +19,20 @@ package org.logstash.plugins.pipeline; +import org.apache.logging.log4j.LogManager; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.logstash.RubyUtil; import org.logstash.ext.JrubyEventExtLibrary; +import org.logstash.log.LoggingSpyResource; import java.time.Duration; import java.util.ArrayList; @@ -36,6 +40,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -45,364 +50,368 @@ import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; -@RunWith(Parameterized.class) +@RunWith(Enclosed.class) public class PipelineBusTest { - static String address = "fooAddr"; - static String otherAddress = "fooAddr"; - static Collection addresses = Arrays.asList(address, otherAddress); - @Parameterized.Parameters(name = "{0}") - public static Collection> data() { - return Set.of(PipelineBusV1.Testable.class, PipelineBusV2.Testable.class); - } - - @Parameterized.Parameter - public Class busClass; + @RunWith(Parameterized.class) + public static class ImplementationTest { - PipelineBus.Testable bus; - TestPipelineInput input; - TestPipelineOutput output; + static String address = "fooAddr"; + static String otherAddress = "fooAddr"; + static Collection addresses = Arrays.asList(address, otherAddress); - @Before - public void setup() throws ReflectiveOperationException { - bus = busClass.getDeclaredConstructor().newInstance(); - input = new TestPipelineInput(); - output = new TestPipelineOutput(); - } + @Parameterized.Parameters(name = "{0}") + public static Collection> data() { + return Set.of(PipelineBusV1.Testable.class, PipelineBusV2.Testable.class); + } - @Test - public void subscribeUnsubscribe() throws InterruptedException { - assertThat(bus.listen(input, address)).isTrue(); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input); - })); + @Parameterized.Parameter + public Class busClass; - bus.unlisten(input, address); + PipelineBus.Testable bus; + TestPipelineInput input; + TestPipelineOutput output; - // Key should have been pruned - assertThat(bus.getAddressState(address)).isNotPresent(); - } + @Before + public void setup() throws ReflectiveOperationException { + bus = busClass.getDeclaredConstructor().newInstance(); + input = new TestPipelineInput(); + output = new TestPipelineOutput(); + } - @Test - public void senderRegisterUnregister() { - bus.registerSender(output, addresses); + @Test + public void subscribeUnsubscribe() throws InterruptedException { + assertThat(bus.listen(input, address)).isTrue(); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input); + })); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { - assertThat(addressState.getOutputs()).contains(output); - }); + bus.unlisten(input, address); - bus.unregisterSender(output, addresses); + // Key should have been pruned + assertThat(bus.getAddressState(address)).isNotPresent(); + } - // We should have pruned this address - assertThat(bus.getAddressState(address)).isNotPresent(); - } + @Test + public void senderRegisterUnregister() { + bus.registerSender(output, addresses); - @Test - public void activeSenderPreventsPrune() throws InterruptedException { - bus.registerSender(output, addresses); - bus.listen(input, address); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { + assertThat(addressState.getOutputs()).contains(output); + }); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).contains(output); - })); + bus.unregisterSender(output, addresses); - bus.setBlockOnUnlisten(false); - bus.unlisten(input, address); + // We should have pruned this address + assertThat(bus.getAddressState(address)).isNotPresent(); + } - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isNull(); - assertThat(addressState.getOutputs()).contains(output); - })); + @Test + public void activeSenderPreventsPrune() throws InterruptedException { + bus.registerSender(output, addresses); + bus.listen(input, address); - bus.unregisterSender(output, addresses); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).contains(output); + })); - assertThat(bus.getAddressState(address)).isNotPresent(); - } + bus.setBlockOnUnlisten(false); + bus.unlisten(input, address); - @Test - public void multipleSendersPreventPrune() throws InterruptedException { - // begin with 1:1 single output to input - bus.registerSender(output, Collections.singleton(address)); - bus.listen(input, address); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).contains(output); - })); - bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); - assertThat(input.eventCount.longValue()).isEqualTo(1L); - - // attach another output2 as a sender - final TestPipelineOutput output2 = new TestPipelineOutput(); - bus.registerSender(output2, Collections.singleton(address)); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).contains(output, output2); - })); - bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); - bus.sendEvents(output2, Collections.singletonList(rubyEvent()), false); - assertThat(input.eventCount.longValue()).isEqualTo(3L); - - // unlisten with first input, simulating a pipeline restart - assertThat(bus.isBlockOnUnlisten()).isFalse(); - bus.unlisten(input, address); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isNull(); - assertThat(addressState.getOutputs()).contains(output, output2); - })); - - // unregister one of the two senders, ensuring that the address state remains in-tact - bus.unregisterSender(output, Collections.singleton(address)); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isNull(); - assertThat(addressState.getOutputs()).contains(output2); - })); - - // listen with a new input, emulating the completion of a pipeline restart - final TestPipelineInput input2 = new TestPipelineInput(); - assertThat(bus.listen(input2, address)).isTrue(); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input2); - assertThat(addressState.getOutputs()).contains(output2); - })); - bus.sendEvents(output2, Collections.singletonList(rubyEvent()), false); - assertThat(input2.eventCount.longValue()).isEqualTo(1L); - - // shut down our remaining sender, ensuring address state remains in-tact - bus.unregisterSender(output2, Collections.singleton(address)); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input2); - assertThat(addressState.getOutputs()).isEmpty(); - })); - - // upon unlistening, ensure orphan address state has been cleaned up - bus.unlisten(input2, address); - assertThat(bus.getAddressState(address)).isNotPresent(); - } + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isNull(); + assertThat(addressState.getOutputs()).contains(output); + })); + bus.unregisterSender(output, addresses); - @Test - public void activeListenerPreventsPrune() throws InterruptedException { - bus.registerSender(output, addresses); - bus.listen(input, address); - bus.unregisterSender(output, addresses); + assertThat(bus.getAddressState(address)).isNotPresent(); + } - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).isEmpty(); - })); + @Test + public void multipleSendersPreventPrune() throws InterruptedException { + // begin with 1:1 single output to input + bus.registerSender(output, Collections.singleton(address)); + bus.listen(input, address); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).contains(output); + })); + bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); + assertThat(input.eventCount.longValue()).isEqualTo(1L); + + // attach another output2 as a sender + final TestPipelineOutput output2 = new TestPipelineOutput(); + bus.registerSender(output2, Collections.singleton(address)); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).contains(output, output2); + })); + bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); + bus.sendEvents(output2, Collections.singletonList(rubyEvent()), false); + assertThat(input.eventCount.longValue()).isEqualTo(3L); + + // unlisten with first input, simulating a pipeline restart + assertThat(bus.isBlockOnUnlisten()).isFalse(); + bus.unlisten(input, address); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isNull(); + assertThat(addressState.getOutputs()).contains(output, output2); + })); + + // unregister one of the two senders, ensuring that the address state remains in-tact + bus.unregisterSender(output, Collections.singleton(address)); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isNull(); + assertThat(addressState.getOutputs()).contains(output2); + })); + + // listen with a new input, emulating the completion of a pipeline restart + final TestPipelineInput input2 = new TestPipelineInput(); + assertThat(bus.listen(input2, address)).isTrue(); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input2); + assertThat(addressState.getOutputs()).contains(output2); + })); + bus.sendEvents(output2, Collections.singletonList(rubyEvent()), false); + assertThat(input2.eventCount.longValue()).isEqualTo(1L); + + // shut down our remaining sender, ensuring address state remains in-tact + bus.unregisterSender(output2, Collections.singleton(address)); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input2); + assertThat(addressState.getOutputs()).isEmpty(); + })); + + // upon unlistening, ensure orphan address state has been cleaned up + bus.unlisten(input2, address); + assertThat(bus.getAddressState(address)).isNotPresent(); + } - bus.setBlockOnUnlisten(false); - bus.unlisten(input, address); - assertThat(bus.getAddressState(address)).isNotPresent(); - } + @Test + public void activeListenerPreventsPrune() throws InterruptedException { + bus.registerSender(output, addresses); + bus.listen(input, address); + bus.unregisterSender(output, addresses); - @Test - public void registerUnregisterListenerUpdatesOutputs() { - bus.registerSender(output, addresses); - bus.listen(input, address); - - assertThat(bus.getAddressStates(output)).hasValueSatisfying((addressStates) -> { - assertThat(addressStates).hasSize(1); - }); - - bus.unregisterSender(output, addresses); - assertThat(bus.getAddressStates(output)).isNotPresent(); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).isEmpty(); - }); - - bus.registerSender(output, addresses); - assertThat(bus.getAddressStates(output)).hasValueSatisfying((addressStates) -> { - assertThat(addressStates).hasSize(1); - }); - assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { - assertThat(addressState.getInput()).isSameAs(input); - assertThat(addressState.getOutputs()).contains(output); - }); - } + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).isEmpty(); + })); - @Test - public void listenUnlistenUpdatesOutputReceivers() throws InterruptedException { - bus.registerSender(output, addresses); - bus.listen(input, address); + bus.setBlockOnUnlisten(false); + bus.unlisten(input, address); - bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); - assertThat(input.eventCount.longValue()).isEqualTo(1L); + assertThat(bus.getAddressState(address)).isNotPresent(); + } - bus.unlisten(input, address); + @Test + public void registerUnregisterListenerUpdatesOutputs() { + bus.registerSender(output, addresses); + bus.listen(input, address); + + assertThat(bus.getAddressStates(output)).hasValueSatisfying((addressStates) -> { + assertThat(addressStates).hasSize(1); + }); + + bus.unregisterSender(output, addresses); + assertThat(bus.getAddressStates(output)).isNotPresent(); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).isEmpty(); + }); + + bus.registerSender(output, addresses); + assertThat(bus.getAddressStates(output)).hasValueSatisfying((addressStates) -> { + assertThat(addressStates).hasSize(1); + }); + assertThat(bus.getAddressState(address)).hasValueSatisfying((addressState) -> { + assertThat(addressState.getInput()).isSameAs(input); + assertThat(addressState.getOutputs()).contains(output); + }); + } - TestPipelineInput newInput = new TestPipelineInput(); - bus.listen(newInput, address); + @Test + public void listenUnlistenUpdatesOutputReceivers() throws InterruptedException { + bus.registerSender(output, addresses); + bus.listen(input, address); - bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); + bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); + assertThat(input.eventCount.longValue()).isEqualTo(1L); - // The new event went to the new input, not the old one - assertThat(newInput.eventCount.longValue()).isEqualTo(1L); - assertThat(input.eventCount.longValue()).isEqualTo(1L); - } + bus.unlisten(input, address); - @Test - public void sendingEmptyListToNowhereStillReturns() { - bus.registerSender(output, List.of("not_an_address")); - bus.sendEvents(output, Collections.emptyList(), true); - } + TestPipelineInput newInput = new TestPipelineInput(); + bus.listen(newInput, address); - @Test - public void missingInputEventuallySucceeds() throws InterruptedException { - bus.registerSender(output, addresses); - - // bus.sendEvent should block at this point since there is no attached listener - // For this test we want to make sure that the background thread has had time to actually block - // since if we start the input too soon we aren't testing anything - // The below code attempts to make sure this happens, though it's hard to be deterministic - // without making sendEvent take weird arguments the non-test code really doesn't need - CountDownLatch sendLatch = new CountDownLatch(1); - Thread sendThread = new Thread(() -> { - sendLatch.countDown(); - bus.sendEvents(output, Collections.singleton(rubyEvent()), true); - }); - sendThread.start(); - - // Try to ensure that the send actually happened. The latch gets us close, - // the sleep takes us the full way (hopefully) - sendLatch.await(); - Thread.sleep(1000); - - bus.listen(input, address); - - // This would block if there's an error in the code - sendThread.join(); - - assertThat(input.eventCount.longValue()).isEqualTo(1L); - } + bus.sendEvents(output, Collections.singletonList(rubyEvent()), false); - @Test - public void whenInDefaultNonBlockingModeInputsShutdownInstantly() throws InterruptedException { - // Test confirms the default. If we decide to change the default we should change this test - assertThat(bus.isBlockOnUnlisten()).isFalse(); + // The new event went to the new input, not the old one + assertThat(newInput.eventCount.longValue()).isEqualTo(1L); + assertThat(input.eventCount.longValue()).isEqualTo(1L); + } - bus.registerSender(output, addresses); - bus.listen(input, address); + @Test + public void sendingEmptyListToNowhereStillReturns() { + bus.registerSender(output, List.of("not_an_address")); + bus.sendEvents(output, Collections.emptyList(), true); + } - bus.unlisten(input, address); // This test would block forever if this is not non-block - bus.unregisterSender(output, addresses); - } + @Test + public void missingInputEventuallySucceeds() throws InterruptedException { + bus.registerSender(output, addresses); + + // bus.sendEvent should block at this point since there is no attached listener + // For this test we want to make sure that the background thread has had time to actually block + // since if we start the input too soon we aren't testing anything + // The below code attempts to make sure this happens, though it's hard to be deterministic + // without making sendEvent take weird arguments the non-test code really doesn't need + CountDownLatch sendLatch = new CountDownLatch(1); + Thread sendThread = new Thread(() -> { + sendLatch.countDown(); + bus.sendEvents(output, Collections.singleton(rubyEvent()), true); + }); + sendThread.start(); + + // Try to ensure that the send actually happened. The latch gets us close, + // the sleep takes us the full way (hopefully) + sendLatch.await(); + Thread.sleep(1000); + + bus.listen(input, address); + + // This would block if there's an error in the code + sendThread.join(); + + assertThat(input.eventCount.longValue()).isEqualTo(1L); + } - @Test - public void whenInBlockingModeInputsShutdownLast() throws InterruptedException { - bus.registerSender(output, addresses); - bus.listen(input, address); + @Test + public void whenInDefaultNonBlockingModeInputsShutdownInstantly() throws InterruptedException { + // Test confirms the default. If we decide to change the default we should change this test + assertThat(bus.isBlockOnUnlisten()).isFalse(); - bus.setBlockOnUnlisten(true); + bus.registerSender(output, addresses); + bus.listen(input, address); - Thread unlistenThread = new Thread( () -> { - try { - bus.unlisten(input, address); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - unlistenThread.start(); + bus.unlisten(input, address); // This test would block forever if this is not non-block + bus.unregisterSender(output, addresses); + } - // This should unblock the listener thread - bus.unregisterSender(output, addresses); - TimeUnit.SECONDS.toMillis(30); - unlistenThread.join(Duration.ofSeconds(30).toMillis()); - assertThat(unlistenThread.getState()).isEqualTo(Thread.State.TERMINATED); + @Test + public void whenInBlockingModeInputsShutdownLast() throws InterruptedException { + bus.registerSender(output, addresses); + bus.listen(input, address); - assertThat(bus.getAddressState(address)).isNotPresent(); - } + bus.setBlockOnUnlisten(true); - @Test - public void blockingShutdownDeadlock() throws InterruptedException { - final ExecutorService executor = Executors.newFixedThreadPool(10); - try { - for (int i = 0; i < 100; i++) { - bus.registerSender(output, addresses); - bus.listen(input, address); - bus.setBlockOnUnlisten(true); - - // we use a CountDownLatch to increase the likelihood - // of simultaneous execution - final CountDownLatch startLatch = new CountDownLatch(2); - final CompletableFuture unlistenFuture = CompletableFuture.runAsync(asRunnable(() -> { - startLatch.countDown(); - startLatch.await(); + Thread unlistenThread = new Thread(() -> { + try { bus.unlisten(input, address); - }), executor); - final CompletableFuture unregisterFuture = CompletableFuture.runAsync(asRunnable(() -> { - startLatch.countDown(); - startLatch.await(); - bus.unregisterSender(output, addresses); - }), executor); - - // ensure that our tasks all exit successfully, quickly - assertThatCode(() -> CompletableFuture.allOf(unlistenFuture, unregisterFuture).get(1, TimeUnit.SECONDS)) - .withThreadDumpOnError() - .withFailMessage("Expected unlisten and unregisterSender to not deadlock, but they did not return in a reasonable amount of time in the <%s>th iteration", i) - .doesNotThrowAnyException(); - } - } finally { - executor.shutdownNow(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + unlistenThread.start(); + + // This should unblock the listener thread + bus.unregisterSender(output, addresses); + unlistenThread.join(Duration.ofSeconds(30).toMillis()); + assertThat(unlistenThread.getState()).isEqualTo(Thread.State.TERMINATED); + + assertThat(bus.getAddressState(address)).isNotPresent(); } - } - - @FunctionalInterface - interface ExceptionalRunnable { - void run() throws E; - } - private Runnable asRunnable(final ExceptionalRunnable exceptionalRunnable) { - return () -> { + @Test + public void blockingShutdownDeadlock() throws InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(10); try { - exceptionalRunnable.run(); - } catch (Throwable e) { - throw new RuntimeException(e); + for (int i = 0; i < 100; i++) { + bus.registerSender(output, addresses); + bus.listen(input, address); + bus.setBlockOnUnlisten(true); + + // we use a CountDownLatch to increase the likelihood + // of simultaneous execution + final CountDownLatch startLatch = new CountDownLatch(2); + final CompletableFuture unlistenFuture = CompletableFuture.runAsync(asRunnable(() -> { + startLatch.countDown(); + startLatch.await(); + bus.unlisten(input, address); + }), executor); + final CompletableFuture unregisterFuture = CompletableFuture.runAsync(asRunnable(() -> { + startLatch.countDown(); + startLatch.await(); + bus.unregisterSender(output, addresses); + }), executor); + + // ensure that our tasks all exit successfully, quickly + assertThatCode(() -> CompletableFuture.allOf(unlistenFuture, unregisterFuture).get(1, TimeUnit.SECONDS)) + .withThreadDumpOnError() + .withFailMessage("Expected unlisten and unregisterSender to not deadlock, but they did not return in a reasonable amount of time in the <%s>th iteration", i) + .doesNotThrowAnyException(); + } + } finally { + executor.shutdownNow(); } - }; - } + } + + @FunctionalInterface + interface ExceptionalRunnable { + void run() throws E; + } + private Runnable asRunnable(final ExceptionalRunnable exceptionalRunnable) { + return () -> { + try { + exceptionalRunnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } - @Test - public void whenInputFailsOutputRetryOnlyNotYetDelivered() throws InterruptedException { - bus.registerSender(output, addresses); - int expectedReceiveInvocations = 2; - CountDownLatch sendsCoupleOfCallsLatch = new CountDownLatch(expectedReceiveInvocations); - int positionOfFailure = 1; - input = new TestFailPipelineInput(sendsCoupleOfCallsLatch, positionOfFailure); - bus.listen(input, address); - final List events = new ArrayList<>(); - events.add(rubyEvent()); - events.add(rubyEvent()); - events.add(rubyEvent()); + @Test + public void whenInputFailsOutputRetryOnlyNotYetDelivered() throws InterruptedException { + bus.registerSender(output, addresses); + int expectedReceiveInvocations = 2; + CountDownLatch sendsCoupleOfCallsLatch = new CountDownLatch(expectedReceiveInvocations); + int positionOfFailure = 1; + input = new TestFailPipelineInput(sendsCoupleOfCallsLatch, positionOfFailure); + bus.listen(input, address); - CountDownLatch senderThreadStarted = new CountDownLatch(1); - Thread sendThread = new Thread(() -> { - senderThreadStarted.countDown(); + final List events = new ArrayList<>(); + events.add(rubyEvent()); + events.add(rubyEvent()); + events.add(rubyEvent()); - // Exercise - bus.sendEvents(output, events, true); - }); - sendThread.start(); + CountDownLatch senderThreadStarted = new CountDownLatch(1); + Thread sendThread = new Thread(() -> { + senderThreadStarted.countDown(); - senderThreadStarted.await(); // Ensure server thread is started + // Exercise + bus.sendEvents(output, events, true); + }); + sendThread.start(); - // Ensure that send actually happened a couple of times. - // Send method retry mechanism sleeps 1 second on each retry! - boolean coupleOfCallsDone = sendsCoupleOfCallsLatch.await(3, TimeUnit.SECONDS); - sendThread.join(); + senderThreadStarted.await(); // Ensure server thread is started - // Verify - assertThat(coupleOfCallsDone).isTrue(); - assertThat(((TestFailPipelineInput)input).getLastBatchSize()).isEqualTo(events.size() - positionOfFailure); - } + // Ensure that send actually happened a couple of times. + // Send method retry mechanism sleeps 1 second on each retry! + boolean coupleOfCallsDone = sendsCoupleOfCallsLatch.await(3, TimeUnit.SECONDS); + sendThread.join(); - private JrubyEventExtLibrary.RubyEvent rubyEvent() { - return JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY); + // Verify + assertThat(coupleOfCallsDone).isTrue(); + assertThat(((TestFailPipelineInput) input).getLastBatchSize()).isEqualTo(events.size() - positionOfFailure); + } + + private JrubyEventExtLibrary.RubyEvent rubyEvent() { + return JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY); + } } static class TestPipelineInput implements PipelineInput { @@ -462,4 +471,54 @@ int getLastBatchSize() { return lastBatchSize; } } -} \ No newline at end of file + + public static class SelectionTest { + @Rule + public LoggingSpyResource loggingSpyResource = new LoggingSpyResource(LogManager.getLogger("org.logstash.deprecation.plugins.pipeline.PipelineBus")); + + @Test + public void implementationExplicitV1() { + withSystemProperty("logstash.pipelinebus.implementation", "v1", () -> { + assertThat(PipelineBus.create()).isInstanceOf(PipelineBusV1.class); + assertThat(loggingSpyResource.getLogEvents()).anySatisfy(logEvent -> { + assertThat(logEvent.getMessage().getFormattedMessage()).contains("legacy pipeline bus"); + }); + }); + } + + @Test + public void implementationExplicitV2() { + withSystemProperty("logstash.pipelinebus.implementation", "v2", () -> { + assertThat(PipelineBus.create()).isInstanceOf(PipelineBusV2.class); + }); + } + + @Test + public void implementationImplicit() { + withSystemProperty("logstash.pipelinebus.implementation", null, () -> { + assertThat(PipelineBus.create()).isInstanceOf(PipelineBusV2.class); + }); + } + + + static synchronized void withSystemProperty(final String propertyName, + final String temporaryPropertyValue, + final Runnable runnable) { + final String previousPropertyValue; + if (Objects.nonNull(temporaryPropertyValue)) { + previousPropertyValue = System.setProperty(propertyName, temporaryPropertyValue); + } else { + previousPropertyValue = System.clearProperty(propertyName); + } + try { + runnable.run(); + } finally { + if (Objects.nonNull(previousPropertyValue)) { + System.setProperty(propertyName, previousPropertyValue); + } else if (Objects.nonNull(temporaryPropertyValue)) { + System.clearProperty(propertyName); + } + } + } + } +}