diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java index de5ccb3..fbc16a3 100644 --- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java +++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java @@ -73,6 +73,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -343,9 +345,12 @@ public void testCommitRollback(boolean autoFlush, SempV2Api sempV2Api, Queue que assertThat(thrown.getMessage(), containsString("Error in committing transaction")); assertThat(thrown.getCause(), instanceOf(RollbackException.class)); assertThat(thrown.getCause().getMessage(), containsString("Document Is Too Large")); - // only 1 message is logged to the broker for failure - assertEquals(1, sempV2Api.monitor().getMsgVpnQueue(vpnName, queue.getName(), null) - .getData().getMaxMsgSizeExceededDiscardedMsgCount()); + + // If the txn fails and needs to rollback, the API might not try to send subsequent messages to the broker. + // Resulting in only 1 failed message being reported by the broker. + assertThat(sempV2Api.monitor().getMsgVpnQueue(vpnName, queue.getName(), null).getData() + .getMaxMsgSizeExceededDiscardedMsgCount(), + either(equalTo(1L)).or(equalTo(2L))); } @CartesianTest(name = "[{index}] destinationType={0}, autoFlush={1}")