Skip to content

Commit

Permalink
deprecate pipeline bus v1
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Nov 5, 2024
1 parent c51a7d5 commit e18e466
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.logstash.plugins.pipeline;

import co.elastic.logstash.api.DeprecationLogger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.log.DefaultDeprecationLogger;

import java.util.Collection;
import java.util.Optional;
Expand All @@ -37,6 +39,7 @@
public interface PipelineBus {

Logger LOGGER = LogManager.getLogger(PipelineBus.class);
DeprecationLogger DEPRECATION_LOGGER = new DefaultDeprecationLogger(LOGGER);

/**
* API-stable entry-point for creating a {@link PipelineBus}
Expand All @@ -45,7 +48,9 @@ public interface PipelineBus {
static PipelineBus create() {
final String pipelineBusImplementation = System.getProperty("logstash.pipelinebus.implementation", "v2");
switch (pipelineBusImplementation) {
case "v1": return new PipelineBusV1();
case "v1":
DEPRECATION_LOGGER.deprecated("The legacy pipeline bus selected with `logstash.pipelinebus.implementation=v1` is deprecated, and will be removed in Logstash 9.0");
return new PipelineBusV1();
case "v2": return new PipelineBusV2();
default:
LOGGER.warn("unknown pipeline-bus implementation: {}", pipelineBusImplementation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.logstash.log;

import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.test.appender.ListAppender;
import org.junit.rules.ExternalResource;

import java.util.List;

public class LoggingSpyResource extends ExternalResource {

private static final String APPENDER_NAME = "spyAppender";

private final Logger loggerToSpyOn;
private final ListAppender appender = new ListAppender(APPENDER_NAME);

public LoggingSpyResource(final org.apache.logging.log4j.Logger loggerToSpyOn) {
this.loggerToSpyOn = (Logger) loggerToSpyOn;
}

@Override
protected void before() throws Throwable {
appender.start();
loggerToSpyOn.addAppender(appender);
}

@Override
protected void after() {
loggerToSpyOn.removeAppender(appender);
}

public List<LogEvent> getLogEvents() {
return List.copyOf(appender.getEvents());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.logstash.plugins.pipeline;

import org.apache.logging.log4j.LogManager;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -29,6 +31,7 @@
import org.junit.runners.Parameterized;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.log.LoggingSpyResource;

import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -410,10 +413,16 @@ int getLastBatchSize() {
}

public static class SelectionTest {
@Rule
public LoggingSpyResource loggingSpyResource = new LoggingSpyResource(LogManager.getLogger("org.logstash.deprecation.plugins.pipeline.PipelineBus"));

@Test
public void implementationExplicitV1() {
withSystemProperty("logstash.pipelinebus.implementation", "v1", () -> {
assertThat(PipelineBus.create()).isInstanceOf(PipelineBusV1.class);
assertThat(loggingSpyResource.getLogEvents()).anySatisfy(logEvent -> {
assertThat(logEvent.getMessage().getFormattedMessage()).contains("legacy pipeline bus");
});
});
}

Expand Down

0 comments on commit e18e466

Please sign in to comment.