From a89b78c0a9e94dd803ee395cfd371aa719dcfd82 Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:17:41 -0500 Subject: [PATCH 1/5] use ProducerFlowProperties constructor that supports full session pass-through --- .../pom.xml | 2 +- .../outbound/JCSMPOutboundMessageHandler.java | 8 ++++++-- .../provisioning/SolaceProvisioningUtil.java | 19 ++----------------- .../JCSMPOutboundMessageHandlerTest.java | 3 +++ .../binder/util/JmsCompatibilityIT.java | 5 ----- .../binder/SolaceMessageChannelBinder.java | 9 ++++++--- ...laceMessageChannelBinderConfiguration.java | 2 +- 7 files changed, 19 insertions(+), 29 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml index f9b63a69..b2e7edf2 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml @@ -73,7 +73,7 @@ com.solacesystems - sol-jms + sol-jms-jakarta ${solace.jms.version} test diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java index 3296c181..4e27c4e4 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java @@ -16,6 +16,7 @@ import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; import com.solacesystems.jcsmp.Topic; @@ -48,6 +49,7 @@ public class JCSMPOutboundMessageHandler implements MessageHandler, Lifecycle { private final DestinationType configDestinationType; private final Destination configDestination; private final JCSMPSession jcsmpSession; + private final JCSMPProperties jcsmpProperties; private final MessageChannel errorChannel; private final JCSMPSessionProducerManager producerManager; private final ExtendedProducerProperties properties; @@ -63,6 +65,7 @@ public class JCSMPOutboundMessageHandler implements MessageHandler, Lifecycle { public JCSMPOutboundMessageHandler(ProducerDestination destination, JCSMPSession jcsmpSession, + JCSMPProperties jcsmpProperties, MessageChannel errorChannel, JCSMPSessionProducerManager producerManager, ExtendedProducerProperties properties, @@ -72,6 +75,7 @@ public JCSMPOutboundMessageHandler(ProducerDestination destination, JCSMPFactory.onlyInstance().createTopic(destination.getName()) : JCSMPFactory.onlyInstance().createQueue(destination.getName()); this.jcsmpSession = jcsmpSession; + this.jcsmpProperties = jcsmpProperties; this.errorChannel = errorChannel; this.producerManager = producerManager; this.properties = properties; @@ -239,10 +243,10 @@ public void start() { if (properties.getExtension().isTransacted()) { LOGGER.info("Creating transacted session ", id); transactedSession = jcsmpSession.createTransactedSession(); - producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), + producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpProperties), producerEventHandler); } else { - producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), + producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpProperties), producerEventHandler); } } catch (Exception e) { diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProvisioningUtil.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProvisioningUtil.java index eaa202ee..6823688f 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProvisioningUtil.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProvisioningUtil.java @@ -8,7 +8,6 @@ import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPProperties; -import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.ProducerFlowProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; @@ -51,22 +50,8 @@ public static EndpointProperties getErrorQueueEndpointProperties(SolaceConsumerP return endpointProperties; } - public static ProducerFlowProperties getProducerFlowProperties(JCSMPSession jcsmpSession) { - ProducerFlowProperties producerFlowProperties = new ProducerFlowProperties(); - - // SOL-118898: - // PUB_ACK_WINDOW_SIZE & ACK_EVENT_MODE aren't automatically used as default values for - // ProducerFlowProperties. - Integer pubAckWindowSize = (Integer) jcsmpSession.getProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE); - if (pubAckWindowSize != null) { - producerFlowProperties.setWindowSize(pubAckWindowSize); - } - String ackEventMode = (String) jcsmpSession.getProperty(JCSMPProperties.ACK_EVENT_MODE); - if (ackEventMode != null) { - producerFlowProperties.setAckEventMode(ackEventMode); - } - - return producerFlowProperties; + public static ProducerFlowProperties getProducerFlowProperties(JCSMPProperties jcsmpProperties) { + return new ProducerFlowProperties(jcsmpProperties); } public static ConsumerFlowProperties getConsumerFlowProperties( diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java index cd2e48da..5bf42fad 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java @@ -69,6 +69,7 @@ public class JCSMPOutboundMessageHandlerTest { private ArgumentCaptor producerFlowPropertiesCaptor; private ExtendedProducerProperties producerProperties; private JCSMPSessionProducerManager sessionProducerManager; + @Mock private JCSMPProperties jcsmpProperties; @Mock private JCSMPSession session; @Mock private TransactedSession transactedSession; @Mock private XMLMessageProducer messageProducer; @@ -105,6 +106,7 @@ public void init(@Mock MessageChannel errChannel, messageHandler = new JCSMPOutboundMessageHandler( dest, session, + jcsmpProperties, errChannel, sessionProducerManager, producerProperties, @@ -500,6 +502,7 @@ public void test_dynamic_destinationName_with_destinationType_configured_on_mess messageHandler = new JCSMPOutboundMessageHandler( dest, session, + jcsmpProperties, null, new JCSMPSessionProducerManager(session), new ExtendedProducerProperties<>(producerProperties), diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java index d9e42ab6..77804c27 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java @@ -43,11 +43,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.util.SerializationUtils; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; import java.util.ArrayList; import java.util.Base64; import java.util.Enumeration; diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java index 21011a11..05a71db9 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java @@ -19,6 +19,7 @@ import com.solacesystems.jcsmp.Context; import com.solacesystems.jcsmp.Endpoint; import com.solacesystems.jcsmp.EndpointProperties; +import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Queue; import com.solacesystems.jcsmp.XMLMessage; @@ -52,6 +53,7 @@ public class SolaceMessageChannelBinder DisposableBean { private final JCSMPSession jcsmpSession; + private final JCSMPProperties jcsmpProperties; private final Context jcsmpContext; private final JCSMPSessionProducerManager sessionProducerManager; private final AtomicBoolean consumersRemoteStopFlag = new AtomicBoolean(false); @@ -61,12 +63,13 @@ public class SolaceMessageChannelBinder private static final SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy = new SolaceMessageHeaderErrorMessageStrategy(); @Nullable private SolaceBinderHealthAccessor solaceBinderHealthAccessor; - public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, SolaceEndpointProvisioner solaceEndpointProvisioner) { - this(jcsmpSession, null, solaceEndpointProvisioner); + public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, JCSMPProperties jcsmpProperties, SolaceEndpointProvisioner solaceEndpointProvisioner) { + this(jcsmpSession, jcsmpProperties, null, solaceEndpointProvisioner); } - public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, Context jcsmpContext, SolaceEndpointProvisioner solaceEndpointProvisioner) { + public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, JCSMPProperties jcsmpProperties, Context jcsmpContext, SolaceEndpointProvisioner solaceEndpointProvisioner) { super(new String[0], solaceEndpointProvisioner); this.jcsmpSession = jcsmpSession; + this.jcsmpProperties = jcsmpProperties; this.jcsmpContext = jcsmpContext; this.sessionProducerManager = new JCSMPSessionProducerManager(jcsmpSession); } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java index d1ba42de..890c8649 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java @@ -97,7 +97,7 @@ private void initSession() throws JCSMPException { SolaceMessageChannelBinder solaceMessageChannelBinder(SolaceEndpointProvisioner solaceEndpointProvisioner, @Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor, @Nullable SolaceMeterAccessor solaceMeterAccessor) { - SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, context, solaceEndpointProvisioner); + SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, jcsmpProperties, context, solaceEndpointProvisioner); binder.setExtendedBindingProperties(solaceExtendedBindingProperties); binder.setSolaceMeterAccessor(solaceMeterAccessor); if (solaceBinderHealthAccessor != null) { From 6395c29c9ad519804918df57e1a32dcc96f43279 Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:43:55 -0500 Subject: [PATCH 2/5] update test integration sub-module --- solace-integration-test-support | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solace-integration-test-support b/solace-integration-test-support index 3586058f..0574e130 160000 --- a/solace-integration-test-support +++ b/solace-integration-test-support @@ -1 +1 @@ -Subproject commit 3586058ff45285d8949fce404800b3a85e1e1f41 +Subproject commit 0574e130e801efea3135c591a90be7546c5c791b From 4a5945434fe548b45eddd95492716a957779bdc3 Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:55:20 -0500 Subject: [PATCH 3/5] fix jakarta imports --- .../binder/util/JmsCompatibilityIT.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java index 77804c27..9b77caed 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java @@ -27,6 +27,11 @@ import com.solacesystems.jcsmp.XMLMessageProducer; import com.solacesystems.jms.SolConnectionFactory; import com.solacesystems.jms.SolJmsUtility; +import jakarta.jms.Connection; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.ObjectMessage; +import jakarta.jms.Session; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.assertj.core.api.SoftAssertions; @@ -276,20 +281,19 @@ public void testPayloadFromSpringToJms(SoftAssertions softly) throws Exception { jmsConsumer.setMessageListener(msg -> { LOGGER.info("Got message {}", msg); try { - if (msg instanceof javax.jms.BytesMessage) { - javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) msg; + if (msg instanceof jakarta.jms.BytesMessage bytesMessage) { byte[] payload = new byte[(int) bytesMessage.getBodyLength()]; softly.assertThat(bytesMessage.readBytes(payload)).isEqualTo(bytesMessage.getBodyLength()); softly.assertThat(payload).isEqualTo("test".getBytes()); processedMessageTypes.add(BytesMessage.class); - } else if (msg instanceof javax.jms.TextMessage) { - softly.assertThat(((javax.jms.TextMessage) msg).getText()).isEqualTo("test"); + } else if (msg instanceof jakarta.jms.TextMessage textMessage) { + softly.assertThat(textMessage.getText()).isEqualTo("test"); processedMessageTypes.add(TextMessage.class); - } else if (msg instanceof javax.jms.StreamMessage) { - softly.assertThat(((javax.jms.StreamMessage) msg).readString()).isEqualTo("test"); + } else if (msg instanceof jakarta.jms.StreamMessage streamMessage) { + softly.assertThat(streamMessage.readString()).isEqualTo("test"); processedMessageTypes.add(StreamMessage.class); - } else if (msg instanceof javax.jms.MapMessage) { - softly.assertThat(((javax.jms.MapMessage) msg).getString("test")).isEqualTo("test"); + } else if (msg instanceof jakarta.jms.MapMessage mapMessage) { + softly.assertThat(mapMessage.getString("test")).isEqualTo("test"); processedMessageTypes.add(MapMessage.class); } else { throw new IllegalStateException(String.format("Message type %s has no test", msg.getClass())); @@ -317,20 +321,20 @@ public void testPayloadFromSpringToJms(SoftAssertions softly) throws Exception { @Test public void testPayloadFromJmsToSpring(JCSMPSession jcsmpSession, SoftAssertions softly) throws Exception { - List messages = new ArrayList<>(); + List messages = new ArrayList<>(); { - javax.jms.BytesMessage bytesMessage = jmsSession.createBytesMessage(); + jakarta.jms.BytesMessage bytesMessage = jmsSession.createBytesMessage(); bytesMessage.writeBytes("test".getBytes()); messages.add(bytesMessage); messages.add(jmsSession.createTextMessage("test")); - javax.jms.StreamMessage streamMessage = jmsSession.createStreamMessage(); + jakarta.jms.StreamMessage streamMessage = jmsSession.createStreamMessage(); streamMessage.writeString("test"); messages.add(streamMessage); - javax.jms.MapMessage mapMessage = jmsSession.createMapMessage(); + jakarta.jms.MapMessage mapMessage = jmsSession.createMapMessage(); mapMessage.setString("test", "test"); messages.add(mapMessage); } @@ -353,11 +357,11 @@ public void onReceive(BytesXMLMessage bytesXMLMessage) { } else if (msg.getPayload() instanceof String) { softly.assertThat(msg.getPayload()).isEqualTo("test"); processedMessageTypes.add(TextMessage.class); - } else if (msg.getPayload() instanceof SDTStream) { - softly.assertThat(((SDTStream) msg.getPayload()).readString()).isEqualTo("test"); + } else if (msg.getPayload() instanceof SDTStream sdtStream) { + softly.assertThat(sdtStream.readString()).isEqualTo("test"); processedMessageTypes.add(StreamMessage.class); - } else if (msg.getPayload() instanceof SDTMap) { - softly.assertThat(((SDTMap) msg.getPayload()).getString("test")).isEqualTo("test"); + } else if (msg.getPayload() instanceof SDTMap sdtMap) { + softly.assertThat(sdtMap.getString("test")).isEqualTo("test"); processedMessageTypes.add(MapMessage.class); } } catch (Exception e) { @@ -374,7 +378,7 @@ public void onException(JCSMPException e) {} jcsmpSession.addSubscription(JCSMPFactory.onlyInstance().createTopic(topicName)); messageConsumer.start(); - for (javax.jms.Message message : messages) { + for (jakarta.jms.Message message : messages) { jmsProducer.send(message); } @@ -397,7 +401,7 @@ public void testSerializedPayloadFromSpringToJms(SoftAssertions softly) throws E try { softly.assertThat(msg.getBooleanProperty(SolaceBinderHeaders.SERIALIZED_PAYLOAD)).isTrue(); - javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) msg; + jakarta.jms.BytesMessage bytesMessage = (jakarta.jms.BytesMessage) msg; byte[] receivedPayload = new byte[(int) bytesMessage.getBodyLength()]; bytesMessage.readBytes(receivedPayload); softly.assertThat(SerializationUtils.deserialize(receivedPayload)).isEqualTo(payload); From a68a3b6cc489d5295b0cce2fa84f82bc3f2913a0 Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Tue, 19 Nov 2024 13:06:47 -0500 Subject: [PATCH 4/5] fix test failure --- .../binder/outbound/JCSMPOutboundMessageHandlerTest.java | 7 ++++--- .../cloud/stream/binder/SolaceMessageChannelBinder.java | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java index 5bf42fad..e6d5e616 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; @@ -69,7 +70,7 @@ public class JCSMPOutboundMessageHandlerTest { private ArgumentCaptor producerFlowPropertiesCaptor; private ExtendedProducerProperties producerProperties; private JCSMPSessionProducerManager sessionProducerManager; - @Mock private JCSMPProperties jcsmpProperties; + @Spy private JCSMPProperties jcsmpProperties = new JCSMPProperties(); @Mock private JCSMPSession session; @Mock private TransactedSession transactedSession; @Mock private XMLMessageProducer messageProducer; @@ -589,8 +590,8 @@ public void testJCSMPPropertiesInheritanceWorkaround( @Values(strings = { JCSMPProperties.SUPPORTED_ACK_EVENT_MODE_PER_MSG, JCSMPProperties.SUPPORTED_ACK_EVENT_MODE_WINDOWED}) String ackEventMode) { - Mockito.when(session.getProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)).thenReturn(pubAckWindowSize); - Mockito.when(session.getProperty(JCSMPProperties.ACK_EVENT_MODE)).thenReturn(ackEventMode); + jcsmpProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, pubAckWindowSize); + jcsmpProperties.setProperty(JCSMPProperties.ACK_EVENT_MODE, ackEventMode); messageHandler.start(); assertThat(producerFlowPropertiesCaptor.getValue()) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java index 05a71db9..97741b91 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java @@ -97,6 +97,7 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin JCSMPOutboundMessageHandler handler = new JCSMPOutboundMessageHandler( destination, jcsmpSession, + jcsmpProperties, errorChannel, sessionProducerManager, producerProperties, From 4717e4cc58b93708fc57d82b9f9aded00c431d1d Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Tue, 19 Nov 2024 13:25:13 -0500 Subject: [PATCH 5/5] update IT infra --- .../test/junit/extension/SpringCloudStreamExtension.java | 1 + .../binder/test/spring/SpringCloudStreamContext.java | 7 +++++-- .../cloud/stream/binder/test/util/SolaceTestBinder.java | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/junit/extension/SpringCloudStreamExtension.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/junit/extension/SpringCloudStreamExtension.java index 6624f6c4..2d5681f8 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/junit/extension/SpringCloudStreamExtension.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/junit/extension/SpringCloudStreamExtension.java @@ -53,6 +53,7 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte LOGGER.info("Creating {}", SpringCloudStreamContext.class.getSimpleName()); SpringCloudStreamContext context = new SpringCloudStreamContext( PubSubPlusExtension.getJCSMPSession(extensionContext), + PubSubPlusExtension.getJCSMPProperties(extensionContext), PubSubPlusExtension.getSempV2Api(extensionContext)); context.before(); return context; diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/spring/SpringCloudStreamContext.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/spring/SpringCloudStreamContext.java index aa8cf3d8..d519f4fd 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/spring/SpringCloudStreamContext.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/spring/SpringCloudStreamContext.java @@ -4,6 +4,7 @@ import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties; import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder; import com.solace.test.integration.semp.v2.SempV2Api; +import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtensionContext; @@ -35,10 +36,12 @@ public class SpringCloudStreamContext extends PartitionCapableBinderTests bindingNameToErrorQueueName = new HashMap<>(); private static final Logger LOGGER = LoggerFactory.getLogger(SolaceTestBinder.class); - public SolaceTestBinder(JCSMPSession jcsmpSession, SempV2Api sempV2Api) { + public SolaceTestBinder(JCSMPSession jcsmpSession, JCSMPProperties jcsmpProperties, SempV2Api sempV2Api) { this.applicationContext = new AnnotationConfigApplicationContext(Config.class); this.jcsmpSession = jcsmpSession; this.sempV2Api = sempV2Api; - SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, new SolaceEndpointProvisioner(jcsmpSession)); + SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, jcsmpProperties, new SolaceEndpointProvisioner(jcsmpSession)); binder.setApplicationContext(this.applicationContext); this.setPollableConsumerBinder(binder); }