Skip to content

Commit

Permalink
Adding Spring Cloud Stream Version To Message Headers For Easier Debu…
Browse files Browse the repository at this point in the history
…gging Of Issues.

Fix checkstyles
Resolves #3027
  • Loading branch information
omercelikceng authored and olegz committed Nov 5, 2024
1 parent 17ac635 commit a8fb34d
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dump.rdb
.apt_generated
artifacts
**/dependency-reduced-pom.xml
core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java

node
node_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.cloud.stream.function;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import org.junit.jupiter.api.BeforeAll;
Expand All @@ -26,9 +29,14 @@
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.test.EnableTestBinder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.cloud.stream.utils.BuildInformationProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -43,7 +51,6 @@
/**
* @author Omer Celik
*/

public class HeaderTests {

@BeforeAll
Expand All @@ -63,10 +70,8 @@ void checkWithEmptyPojo() {

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination");
MessageHeaders headers = messageReceived.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");

checkCommonHeaders(messageReceived.getHeaders());
}
}

Expand All @@ -75,20 +80,20 @@ void checkIfHeaderProvidedInData() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) {

StreamBridge streamBridge = context.getBean(StreamBridge.class);
String jsonPayload = "{\"name\":\"Omer\"}";
streamBridge.send("myBinding-out-0",
MessageBuilder.withPayload(jsonPayload.getBytes())
.setHeader("anyHeader", "anyValue")
.build(),
MimeTypeUtils.APPLICATION_JSON);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "myBinding-out-0");
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(headers.get("anyHeader")).isEqualTo("anyValue");

checkCommonHeaders(result.getHeaders());
assertThat(result.getHeaders().get("anyHeader")).isEqualTo("anyValue");
}
}

Expand All @@ -99,16 +104,35 @@ void checkGenericMessageSent() {
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"surname\":\"Celik\"}";
InputDestination input = context.getBean(InputDestination.class);
input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0");

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");

checkCommonHeaders(result.getHeaders());
}
}

@Test
void checkGenericMessageSentUsingStreamBridge() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"anyFieldName\":\"anyValue\"}";
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
GenericMessage<String> message = new GenericMessage<>(jsonPayload);
streamBridge.send("uppercase-in-0", message);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");

checkCommonHeaders(result.getHeaders());
}
}

Expand All @@ -127,11 +151,96 @@ void checkMessageWrappedFunctionalConsumer() {

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> message = target.receive(5, "uppercase-out-0");
MessageHeaders headers = message.getHeaders();
assertThat(headers).isNotNull();

checkCommonHeaders(message.getHeaders());
}

@Test
void checkStringToMapMessageStreamListener() {
ApplicationContext context = new SpringApplicationBuilder(
StringToMapMessageConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToPojo() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToString() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToStringConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neso\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToByteArray() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToByteArrayConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neptune\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkStringToPojoInboundContentTypeHeader() {
ApplicationContext context = new SpringApplicationBuilder(
StringToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Mercury\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes(),
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_JSON_VALUE))));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoMessageToStringMessage() {
ApplicationContext context = new SpringApplicationBuilder(
PojoMessageToStringMessageConfiguration.class)
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Earth\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
MessageHeaders headers = outputMessage.getHeaders();
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
}

private void checkCommonHeaders(MessageHeaders headers) {
assertThat(headers).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
}

@EnableAutoConfiguration
Expand All @@ -156,6 +265,97 @@ public Function<String, String> uppercase() {
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToMapMessageConfiguration {
@Bean
public Function<Message<Map<?, ?>>, String> echo() {
return value -> {
assertThat(value.getPayload() instanceof Map).isTrue();
return (String) value.getPayload().get("name");
};
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToPojoConfiguration {

@Bean
public Function<Planet, Planet> echo() {
return value -> value;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToStringConfiguration {

@Bean
public Function<Planet, String> echo() {
return Planet::toString;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToByteArrayConfiguration {

@Bean
public Function<Planet, byte[]> echo() {
return value -> value.toString().getBytes(StandardCharsets.UTF_8);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToPojoConfiguration {

@Bean
public Function<String, Planet> echo(JsonMapper mapper) {
return value -> mapper.fromJson(value, Planet.class);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoMessageToStringMessageConfiguration {

@Bean
public Function<Message<Planet>, Message<String>> echo() {
return value -> MessageBuilder.withPayload(value.getPayload().toString())
.setHeader("expected-content-type", MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
}
}

public static class Planet {

private String name;

Planet() {
this(null);
}

Planet(String name) {
this.name = name;
}

public String getName() {
return this.name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return this.name;
}

}

public static class EmptyPojo {

}
Expand Down
Loading

0 comments on commit a8fb34d

Please sign in to comment.