An implementation of Spring’s Cloud Stream Binder for integrating with Solace PubSub+ message brokers. The Spring Cloud Stream Binder project provides a higher-level abstraction towards messaging that standardizes the development of distributed message-based systems.
❗
|
|
- Overview
- Spring Cloud Stream Binder
- Using it in your Application
- Configuration Options
- Native Payload Types
- Generated Queue Name Syntax
- Consumer Concurrency
- Batched Messaging
- Partitioning
- Manual Message Acknowledgment
- Dynamic Producer Destinations
- Failed Consumer Message Error Handling
- Consumer Bindings Pause/Resume
- Failed Producer Message Error Handling
- Publisher Confirmations
- Solace Binder Health Indicator
- Solace Binder Metrics
- Resources
The Solace implementation of the Spring Cloud Stream Binder maps the following concepts from Spring to Solace:
-
Destinations to topics/subscriptions
-
Producer bindings always sends messages to topics
-
-
Consumer groups to durable queues
-
A consumer group’s queue is subscribed to its destination subscription (default)
-
Consumer bindings always receives messages from queues
-
-
Anonymous consumer groups to temporary queues (When no group is specified; used for SCS Publish-Subscribe Model)
In Solace, the above setup is called topic-to-queue mapping. So a typical message flow would then appear as follows:
-
Producer bindings publish messages to their destination topics
-
Each consumer groups' queue receives the messages published to their destination topic
-
The PubSub+ broker distributes messages in a round-robin fashion to each consumer binding for a particular consumer group
ℹ️Round-robin distribution only occurs if the consumer group’s queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages.
❗
|
Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username’s client profile must be allowed to send and receive guaranteed messages. |
For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to Spring’s documentation. This document will solely focus on discussing components unique to Solace.
This project extends the Spring Cloud Stream Binder project. If you are new to Spring Cloud Stream, check out their documentation.
The following is a brief excerpt from that document:
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
The releases from this project are hosted in Maven Central.
The easiest way to get started is to include the spring-cloud-starter-stream-solace
in your application.
Here is how to include the spring cloud stream starter in your project using Gradle and Maven.
// Solace Spring Cloud Stream Binder
compile("com.solace.spring.cloud:spring-cloud-starter-stream-solace:5.6.0")
Starting in Spring Cloud Stream version 3 the recommended way to define binding and binding names is to use the Functional approach, which uses Spring Cloud Functions. You can learn more in the Spring Cloud Function support and Functional Binding Names sections of the reference guide.
Given this example app:
@SpringBootApplication
public class SampleAppApplication {
public static void main(String[] args) {
SpringApplication.run(SampleAppApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
An applicable Solace configuration file may look like:
spring:
cloud:
function:
definition: uppercase
stream:
bindings:
uppercase-in-0:
destination: queuename
group: myconsumergroup
binder: solace-broker
uppercase-out-0:
destination: uppercase/topic
binder: solace-broker
binders:
solace-broker:
type: solace
environment:
solace: # (1)
java:
host: tcp://localhost:55555
msgVpn: default
clientUsername: default
clientPassword: default
connectRetries: -1
reconnectRetries: -1
# apiProperties:
# ssl_trust_store: <path_to_trust_store>
# ssl_trust_store_password: <trust_store_password>
# ssl_validate_certificate: true
-
The latter half of this configuration where the Solace session is configured actually originates from the JCSMP Spring Boot Auto-Configuration project. See Solace Session Properties for more info.
For more samples see Solace Spring Cloud Samples repository.
For step-by-step instructions refer Solace Spring Cloud Stream tutorial and check out the blogs.
Configuration of the Solace Spring Cloud Stream Binder is done through Spring Boot’s externalized configuration. This is where users can control the binder’s configuration options as well as the Solace Java API properties.
For general binder configuration options and properties, refer to the Spring Cloud Stream Reference Documentation.
The binder’s Solace session is configurable using properties prefixed by solace.java
or spring.cloud.stream.binders.<binder-name>.environment.solace.java
.
❗
|
This binder leverages the JCSMP Spring Boot Auto-Configuration project to configure its session. See the JCSMP Spring Boot Auto-Configuration documentation for more info on how to configure these properties. |
See Creating a Simple Solace Binding for a simple example of how to configure a session for this binder.
💡
|
Additional session properties not available under the usual See JCSMP Spring Boot Auto-Configuration documentation for more info about |
💡
|
The Solace session can be configured to use OAuth2 authentication. See JCSMP Spring Boot: Using OAuth2 Authentication Scheme for more info. |
The following properties are available for Solace consumers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.consumer.
where bindingName
looks something like functionName-in-0
as defined in Functional Binding Names.
See SolaceCommonProperties and SolaceConsumerProperties for the most updated list.
- endpointType
-
Specifies whether the configured type of endpoint messages are consumed from is a
queue
or atopic_endpoint
.When set to
topic_endpoint
, then instead of provisioning a queue for the consumer group’s endpoint, the binder will instead provision a topic endpoint.Default:
queue
- provisionDurableQueue
-
Whether to provision durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Queue Name Syntax - addDestinationAsSubscriptionToQueue
-
Whether to add the Destination as a subscription to queue during provisioning.
Default:
true
- selector
-
If specified, enables client applications to choose which messages they are interested in receiving, as determined by the messages’ header field and property values.
A selector has a conditional expression syntax that is a subset of SQL92 Selector can be used with Queue or a Topic Endpoint Subscription
Default:
null
See: https://docs.solace.com/API/Solace-JMS-API/Selectors.htm - queueNameExpression
-
A SpEL expression for creating the consumer group’s queue name.
Default:
"'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Queue Name Syntax⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups. ⚠️ While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueAccessType
-
Access type for the consumer group queue.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values - queuePermission
-
Permissions for the consumer group queue.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values - queueDiscardBehaviour
-
If specified, whether to notify sender if a message fails to be enqueued to the consumer group queue.
Default:
null
- queueMaxMsgRedelivery
-
Sets the maximum message redelivery count on consumer group queue. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
-
Maximum message size for the consumer group queue.
Default:
null
- queueQuota
-
Message spool quota for the consumer group queue.
Default:
null
- queueRespectsMsgTtl
-
Whether the consumer group queue respects Message TTL.
Default:
null
- queueAdditionalSubscriptions
-
An array of additional topic subscriptions to be applied on the consumer group queue.
These subscriptions may also contain wildcards.Default:
String[0]
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups. - polledConsumerWaitTimeInMillis
-
Maximum wait time for polled consumers to receive a message from their consumer group queue.
Only applicable whenbatchMode
isfalse
.Default:
100
- transacted
-
When set to
true
, messages will be received using local transactions.Default:
false
ℹ️The maximum transaction size is 256 messages.
The size of the transaction is controlled by the batched message’s size. See Batch Consumers for more info. - batchMaxSize
-
The maximum number of messages per batch.
Only applicable whenbatchMode
istrue
.Default:
255
- batchWaitStrategy
-
The waiting strategy for accumulating batches.
Only applicable whenbatchMode
istrue
.Default:
respect_timeout
ℹ️The waiting strategy works alongside the batchMaxSize
option.- respect_timeout
-
Adheres to the
batchTimeout
consumer config option. - immediate
-
Immediately collects the batch once no more messages are available on the endpoint.
- batchTimeout
-
The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value of
0
means wait forever.
Only applicable whenbatchMode
istrue
.Default:
5000
- autoBindErrorQueue
-
Whether to automatically create a durable error queue to which messages will be republished when message processing failures are encountered. Only applies once all internal retries have been exhausted.
Default:
false
💡Your ACL Profile must allow for publishing to this queue if you decide to use autoBindErrorQueue
. - provisionErrorQueue
-
Whether to provision durable queues for error queues when
autoBindErrorQueue
istrue
. This should only be set tofalse
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Error Queue Name Syntax - errorQueueNameExpression
-
A SpEL expression for creating the error queue’s name.
Default:
"'scst/error/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Error Queue Name Syntax⚠️ Modifying this can cause naming conflicts between the error queue names. ⚠️ While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - errorQueueMaxDeliveryAttempts
-
Maximum number of attempts to send a failed message to the error queue. When all delivery attempts have been exhausted, the failed message will be requeued.
Default:
3
- errorQueueAccessType
-
Access type for the error queue.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values - errorQueuePermission
-
Permissions for the error queue.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values - errorQueueDiscardBehaviour
-
If specified, whether to notify sender if a message fails to be enqueued to the error queue.
Default:
null
- errorQueueMaxMsgRedelivery
-
Sets the maximum message redelivery count on the error queue. (Zero means retry forever).
Default:
null
- errorQueueMaxMsgSize
-
Maximum message size for the error queue.
Default:
null
- errorQueueQuota
-
Message spool quota for the error queue.
Default:
null
- errorQueueRespectsMsgTtl
-
Whether the error queue respects Message TTL.
Default:
null
- errorMsgDmqEligible
-
The eligibility for republished messages to be moved to a Dead Message Queue.
Default:
null
- errorMsgTtl
-
The number of milliseconds before republished messages are discarded or moved to a Dead Message Queue.
Default:
null
- headerExclusions
-
The list of headers to exclude when converting consumed Solace message to Spring message.
Default: Empty
List<String>
The following properties are available for Solace producers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.producer.
where bindingName
looks something like functionName-out-0
as defined in Functional Binding Names.
See SolaceCommonProperties and SolaceProducerProperties for the most updated list.
- destinationType
-
Specifies whether the configured
destination
is atopic
or aqueue
.When set to
topic
, thedestination
name is a topic subscription added on a queue.When set to
queue
, the producer binds to a queue matching thedestination
name. The queue can be auto-provisioned withprovisionDurableQueue=true
however, all naming prefix and queue name generation options do not apply. A queue will be provisioned using thedestination
name explicitly.Default:
topic
- headerExclusions
-
The list of headers to exclude from the published message. Excluding Solace message headers is not supported.
Default: Empty
List<String>
- nonserializableHeaderConvertToString
-
When set to
true
, irreversibly convert non-serializable headers to strings. An exception is thrown otherwise.Default:
false
❗Non-serializable headers should have a meaningful toString()
implementation. Otherwise enabling this feature may result in potential data loss. - transacted
-
When set to
true
, messages will be delivered using local transactions.Default:
false
⚠️ A transacted producer cannot be used by multiple threads. ℹ️The maximum transaction size is 256 messages.
The size of the transaction is 1 when the binding receives a regular Spring message. Otherwise, if it receives a batched message, then the transaction size is equal to the batch size. - provisionDurableQueue
-
Whether to provision durable queues for non-anonymous consumer groups or queue destinations. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Queue Name Syntax - addDestinationAsSubscriptionToQueue
-
Whether to add the Destination as a subscription to queue during provisioning.
Default:
true
ℹ️Does not apply when destinationType=queue
. - queueNameExpression
-
A SpEL expression for creating the consumer group’s queue name.
Default:
"'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Queue Name Syntax⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups. ⚠️ While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueNameExpressionsForRequiredGroups
-
A mapping of required consumer groups to queue name SpEL expressions.
By default, queueNameExpression will be used to generate a required group’s queue name if it isn’t specified within this configuration option.
Default:
Empty Map<String, String>
See: Generated Queue Name Syntax⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups. ⚠️ While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueAccessType
-
Access type for binder provisioned queues.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values - queuePermission
-
Permissions for binder provisioned queues.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values - queueDiscardBehaviour
-
Queue discard behaviour for binder provisioned queues. Whether to notify sender if a message fails to be enqueued to the endpoint. A null value means use the appliance default.
Default:
null
- queueMaxMsgRedelivery
-
Sets the maximum message redelivery count for binder provisioned queues. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
-
Maximum message size for binder provisioned queues.
Default:
null
- queueQuota
-
Message spool quota for binder provisioned queues.
Default:
null
- queueRespectsMsgTtl
-
Whether the binder provisioned queues respect Message TTL.
Default:
null
- queueAdditionalSubscriptions
-
A mapping of required consumer groups to arrays of additional topic subscriptions to be applied on each consumer group’s queue.
These subscriptions may also contain wildcards.Default: Empty
Map<String,String[]>
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups.ℹ️Does not apply when destinationType=queue
.
These properties configure the Solace connection’s health indicator configurable under solace.health-check.connection
.
- reconnectAttemptsUntilDown
-
The number of session reconnect attempts until the health goes
DOWN
. This will happen regardless if the underlying session is actually still reconnecting. Setting this to0
will disable this feature.This feature operates independently of the PubSub+ session reconnect feature. Meaning that if PubSub+ session reconnect is configured to retry less than the value given to this property, then this feature effectively does nothing.
Default:
0
Solace-defined Spring headers to get/set Solace metadata from/to Spring Message
headers.
|
solace_ is a header space reserved for Solace-defined headers. Creating new solace_ -prefixed headers is not supported. Doing so may cause unexpected side-effects in future versions of this binder.
|
🔥
|
Refer to each header’s documentation for their expected usage scenario. Using headers outside of their intended type and access-control is not supported. |
ℹ️
|
Header inheritance applies to Solace message headers in processor message handlers:
|
These headers are to get/set Solace message properties.
💡
|
Use SolaceHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Description |
---|---|---|---|
|
|
Read/Write |
The message ID (a string for an application-specific message identifier). This is the |
|
|
Read/Write |
The application message type. This is the |
|
|
Read/Write |
The correlation ID. |
|
|
Read |
The number of times the message has been delivered. Note that, while the Delivery Count feature is in controlled availability, |
|
|
Read |
The destination this message was published to. |
|
|
Read |
Whether one or more messages have been discarded prior to the current message. |
|
|
Read/Write |
Whether the message is eligible to be moved to a Dead Message Queue. |
|
|
Read/Write |
The UTC time (in milliseconds, from midnight, January 1, 1970 UTC) when the message is supposed to expire. |
|
|
Read/Write |
The HTTP content encoding header value from interaction with an HTTP client. |
|
|
Read/Write |
Indicates whether this message is a reply. |
|
|
Read/Write |
Priority value in the range of 0–255, or -1 if it is not set. |
|
|
Read |
The receive timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read |
Indicates if the message has been delivered by the broker to the API before. |
|
|
Read |
Specifies a Replication Group Message ID as a replay start location. |
|
|
Read/Write |
The replyTo destination for the message. |
|
|
Read/Write |
The Sender ID for the message. |
|
|
Read/Write |
The send timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read/Write |
The sequence number. |
|
|
Read/Write |
The number of milliseconds before the message is discarded or moved to a Dead Message Queue. |
|
|
Read/Write |
When an application sends a message, it can optionally attach application-specific data along with the message, such as user data. |
These headers are to get/set Solace Spring Cloud Stream Binder properties.
These can be used for:
-
Getting/Setting Solace Binder metadata
-
Directive actions for the binder when producing/consuming messages
💡
|
Use SolaceBinderHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Default Value | Description |
---|---|---|---|---|
|
|
Read |
Only applicable when The consolidated list of message headers for a batch of messages where the headers for each payload element is in this list’s corresponding index. |
|
|
|
Write |
A CorrelationData instance for messaging confirmations |
|
|
|
Read |
|
A static number set by the publisher to indicate the Spring Cloud Stream Solace message version. |
|
|
Read |
Present and true to indicate when the PubSub+ message payload was null. |
|
|
|
Write |
The partition key for PubSub+ partitioned queues. |
|
|
|
Internal Binder Use Only |
Is |
|
|
|
Internal Binder Use Only |
A JSON String array of header names where each entry indicates that that header’s value was serialized by a Solace Spring Cloud Stream binder before publishing it to a broker. |
|
|
|
Internal Binder Use Only |
|
The encoding algorithm used to encode the headers indicated by |
|
|
Write |
Only applicable when topic Specifies that the dynamic destination is a topic queue Specifies that the dynamic destination is a queue When absent, the binding’s configured destination-type is used. |
Below are the payload types natively supported by this binder (before/after Content Type Negotiation):
Payload Type | PubSub+ Message Type | Notes |
---|---|---|
|
Binary Message |
Basic PubSub+ payload type. |
|
Text Message |
Basic PubSub+ payload type. |
|
Stream Message |
Basic PubSub+ payload type. |
|
Map Message |
Basic PubSub+ payload type. |
|
XML-Content Message |
Basic PubSub+ payload type. Only available for consumption. |
|
Bytes Message |
This is not a basic payload type supported by the PubSub+ broker, but is one defined and coordinated by this binder. Publishing: When a Consuming: When the binder consumes a binary message which has the |
💡
|
Typically, the Spring Cloud Stream framework will convert a published payload into a If this occurs, but you wish to publish other message types, then one option is to set See Content Type Negotiation for more info on how Spring Cloud Streams converts payloads and other options to control message conversion. |
Spring messages can’t contain null payloads, however, message handlers can differentiate between null payloads and empty payloads by looking at the solace_scst_nullPayload
header. The binder adds the solace_scst_nullPayload
header when a Solace message with null payload is consumed from the wire. When that is the case, the binder sets the Spring message’s payload to a null equivalent payload. Null equivalent payloads are one of the following: empty byte[]
, empty String
, empty SDTMap
, or empty SDTStream
.
ℹ️
|
Applications can’t differentiate between null payloads and empty payloads when consuming binary messages or XML-content messages from the wire. This is because Solace always converts empty payloads to null payloads when those message types are published. |
By default, generated consumer group queue names have the following form:
<prefix>/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
- prefix
-
A static prefix
scst
. - familiarity-modifier
-
Indicates the durability of the consumer group (
wk
for well-known oran
for anonymous). - group
-
The consumer
group
name. - destination-encoding
-
Indicates the encoding scheme used to encode the destination in the queue name (currently only
plain
is supported). - encoded-destination
-
The encoded
destination
as per<destination-encoding>
.
The queueNameExpression
property’s default SpEL expression conforms to the above format, however, users can provide any valid SpEL expression in order to generate custom queue names. Valid expressions evaluate against the following context:
Context Variable | Description |
---|---|
|
The binding’s destination name. |
|
The binding’s consumer group name. |
|
Indicates whether the consumer is an anonymous consumer group |
|
The configured Solace binding properties. |
|
The configured Spring binding properties. |
By default, generated error queue names have the following form:
<prefix>/error/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
The definitions of each segment of the error queue matches that from Generated Queue Name Syntax, with the following exceptions:
- group
-
The consumer
group
name.
The errorQueueNameExpression
property’s default SpEL expression conforms to the above format. Users can provide any valid SpEL expression in order to generate custom error queue names using the same evaluation context as described in Generated Queue Name Syntax.
Configure Spring Cloud Stream’s concurrency consumer property to enable concurrent message consumption for a particular consumer binding.
Though note that there are few limitations:
-
concurrency
> 1 is not supported for exclusive queues. -
concurrency
> 1 is not supported for consumer bindings which are a part of anonymous consumer groups. -
concurrency
> 1 is ignored for polled consumers. -
concurrency
> 1 is not supported with auto-provisioned topic endpoints. -
Setting
provisionDurableQueue
tofalse
disables endpoint configuration validation. Meaning that point 1 cannot be validated. In this scenario, it is the developer’s responsibility to ensure that point 1 is followed.
Batch consumers can be enabled by setting spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
to true
. In which case, batched messages may be consumed as follows:
@Bean
Consumer<Message<List<Payload>>> input() {
return batchMsg -> { // (1)
List<Payload> batchedPayloads = batchMsg.getPayload();
List<Map<String, Object>> batchedHeaders = (List<Map<String, Object>>) batchMsg.getHeaders().get(SolaceBinderHeaders.BATCHED_HEADERS); // (2)
for (int i = 0; i < batchedPayloads.size(); i++) {
Payload payload = batchedPayloads.get(i);
Map<String, Object> headers = batchedHeaders.get(i);
// Process inidividual message payload and its headers
}
};
}
-
A batch of messages is really just a single Spring
Message
whose payload is a list of individual message payloads. -
The
solace_scst_batchedHeaders
message header contains the consolidated list of message headers for each of the individual messages in the batch.
💡
|
Transacted Batch Consumers
By default, batched messages are non-transacted (i.e. When |
💡
|
Resolving Batch Message Conversion Issues
If the Spring Cloud Stream framework fails to convert the batch message, consider setting one of the following consumer config options:
See Content Type Negotiation for more info on how Spring Cloud Streams converts payloads and other options to control message conversion. See Native Payload Types for more info regarding this binder’s natively supported payload types. |
To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the batchMaxSize
, batchWaitStrategy
, and batchTimeout
consumer config options.
Similar to batch consumers, batched messages may also be published through the producer binding:
@Bean
Supplier<Message<List<Payload>>> output() {
return () -> {
List<Payload> batchedPayloads = new ArrayList<>();
List<Map<String, Object>> batchedHeaders = new ArrayList<>();
for (int i = 0; i < 100; i++) {
// Create batched message contents
batchedPayloads.add(new Payload(i));
batchedHeaders.add(Map.of("my-header", "my-header-value"));
}
// construct batched message
return MessageBuilder.withPayload(batchedPayloads)
.setHeader(SolaceBinderHeaders.BATCHED_HEADERS, batchedHeaders)
.build();
};
}
The producer binding will look for the solace_scst_batchedHeaders
message header to determine if the supplied Spring message is either a batched Spring message or a regular Spring message.
If the producer binding detects that it has received a batched Spring message, then it will individually publish each item in the batch.
ℹ️
|
Publishing Batched Messages using Transacted Producer Bindings
When |
ℹ️
|
The Solace PubSub+ broker supports partitioning natively. The partitioning abstraction as described in the Spring Cloud Stream documentation is not supported. |
To publish messages that are intended for partitioned queues, you must provide a partition key by setting the solace_scst_partitionKey
message header (accessible through the SolaceBinderHeaders.PARTITION_KEY
constant).
For example:
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key")
.build();
}
}
As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue.
See Partitioned Queues for more.
ℹ️
|
Manual message acknowledgment is not supported for consumers where transacted is set to true .
|
Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:
public void consume(Message<?> message) {
AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message); // (1)
acknowledgmentCallback.noAutoAck(); // (2)
try {
AckUtils.accept(acknowledgmentCallback); // (3)
} catch (SolaceAcknowledgmentException e) {} // (4)
}
-
Get the message’s acknowledgement callback header
-
Disable auto-acknowledgement
-
Acknowledge the message with the
ACCEPT
status -
Handle any acknowledgment exceptions
Refer to the AckUtils documentation and AcknowledgmentCallback documentation for more info on these objects.
💡
|
If manual acknowledgement is to be done outside of the message handler’s thread, then make sure auto-acknowledgement is disabled within the message handler’s thread and not an external one. Otherwise, the binder will auto-acknowledge the message when the message handler returns. |
For each acknowledgement status, the binder will perform the following actions:
Status | Action |
---|---|
ACCEPT |
Acknowledge the message. |
REJECT |
If Refer to Failed Consumer Message Error Handling for more info. |
REQUEUE |
For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to requeue/redeliver the message. The message will be redelivered until it is Refer to Message Redelivery for more info. |
❗
|
Acknowledgements may throw Example: A |
ℹ️
|
Manual acknowledgements do not support any application-internal error handling strategies (i.e. retry template, error channel forwarding, etc). Also, throwing an exception in the message handler will always acknowledge the message in some way regardless if auto-acknowledgment is disabled. |
💡
|
If asynchronously acknowledging messages, then if these messages aren’t acknowledged in a timely manner, it is likely for the message consumption rate to stall due to the consumer queue’s configured "Maximum Delivered Unacknowledged Messages per Flow". This property can be configured for dynamically created queues by using queue templates. However note that as per our documentation, anonymous consumer group queues (i.e. temporary queues) will not match a queue template’s name filter. Only the queue template defined in the client profile’s "Copy Settings From Queue Template" setting will apply to those. |
Spring Cloud Stream has a reserved message header called scst_targetDestination
(retrievable via BinderHeaders.TARGET_DESTINATION
), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header.
For this binder’s implementation of this header, the target destination defines the exact Solace topic or queue to which a message will be sent. i.e. No post-processing is done.
This binder also adds a reserved message header called solace_scst_targetDestinationType
(retrievable via SolaceBinderHeaders.TARGET_DESTINATION_TYPE
), which allows to override the configured producer destination-type
.
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(BinderHeaders.TARGET_DESTINATION, "some-dynamic-destination") // (1)
.setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic") // (2)
.build();
}
}
-
This message will be sent to the
some-dynamic-destination
topic, ignoring the producer’s configured destination. -
Optionally, the configured producer
destination-type
can be overridden.
ℹ️
|
Those 2 headers are cleared from the message before it is sent off to the message broker. So you should attach that information to your message payload if you want to get that information on the consumer-side. |
ℹ️
|
Dynamic Producer Destinations with StreamBridge
This binder does not support the usage of StreamBridge’s dynamic destination feature, which automatically creates and caches unknown output bindings on-the-fly. Instead, set the public void sendMessage(StreamBridge streamBridge, String myDynamicDestination, Message<?> message) {
Message<?> messageWithDestination = MessageBuilder.fromMessage(message)
.setHeader(BinderHeaders.TARGET_DESTINATION, myDynamicDestination)
.build();
streamBridge.send("some-pre-defined-output-binding", messageWithDestination);
} Then in your application’s configuration file, configure your predefined output binding: spring.cloud.stream.output-bindings=some-pre-defined-output-binding For more info, see Sending arbitrary data to an output (e.g. Foreign event-driven sources). |
The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption. You can read more about that here:
However, after all internal error handling strategies have been exhausted, the Solace implementation of the binder would either:
-
Redeliver the failed message (default)
-
Republish the message to another queue (an error queue) for an external application/binding to process
Message Redelivery is a simple error handling strategy in which failed messages are redelivered to the application from the consumer group’s queue on the PubSub+ broker. This is very similar to simply enabling the retry template (setting maxAttempts
to a value greater than 1
), but allows for the failed messages to be re-processed by the message broker.
❗
|
The internal implementation of redelivery has changed from Solace Binder v5.0.0. Previously, redelivery was initiated by rebinding consumer flows; however, as of v5.0.0 and later, the Solace API now leverages the Solace broker’s native NACK (Negative Acknowledgement) capabilities. Here is what happens under the hood when this is triggered:
The redelivery may result in message duplication, and the application should be designed to handle this. |
ℹ️
|
Error queue republishing is not supported for consumers where transacted is set to true .
|
First, it must be noted that an Error Queue is different from a Dead Message Queue (DMQ). In particular, a DMQ is used to capture re-routed failed messages as a consequence of Solace PubSub+ messaging features such as TTL expiration or exceeding a message’s max redelivery count. Whereas the purpose of an Error Queue is to capture re-routed messages which have been successfully consumed from the message broker, yet cannot be processed by the application.
An Error Queue can be provisioned for a particular consumer group by setting the autoBindErrorQueue
consumer config option to true
. This Error Queue is simply another durable queue which is named as per the Generated Error Queue Name Syntax section. And like the queues used for consumer groups, its endpoint properties can be configured by means of any consumer properties whose names begin with "errorQueue".
ℹ️
|
Error Queues should not be used with anonymous consumer groups. Since the names of anonymous consumer groups, and in turn the name of their would-be Error Queues, are randomly generated at runtime, it would provide little value to create bindings to these Error Queues because of their unpredictable naming and temporary existence. Also, your environment will be polluted with orphaned Error Queues whenever these consumers rebind. |
When locally reprocessing failed messages with Spring’s Retry Template (i.e. when consumer maxAttempts > 1
), mutations of nested objects within the Spring Message<?>
may persist between retries.
SDTMap
payload and failing the messagepublic Function<Message<SDTMap>, Message<SDTMap>> transform() {
return message -> {
if (!message.getPayload().containsKey("new-key")) { // (1)
message.getPayload().putString("new-key", "value");
}
// failing message processing to trigger retry template
throw new RuntimeException("Failed processing");
};
}
-
Here, this example only invokes this if-statement if the
SDTMap
payload does not contain the key"new-key"
.If the consumer binding was configured with
maxAttempts > 1
, then on the following reprocessing attempts, the payload will still contain the key"new-key"
from the previous attempt.
If this behavior is undesirable, then you should configure your consumers maxAttempts
to 1
and rely on Message Redelivery to handle reprocessing.
The Solace binder supports pausing and resuming consumer bindings. See Spring Cloud Stream documentation to learn how to pause and resume consumer bindings.
ℹ️
|
There is no guarantee that the effect of pausing a binding will be instantaneous: messages already in-flight or being processed by the binder may still be delivered after the call to pause returns. |
By default, asynchronous producer errors aren’t handled by the framework. Producer error channels can be enabled using the errorChannelEnabled
producer config option.
Beyond that, this binder also supports using a Future
to wait for publish confirmations. See [Publisher Confirms] for more info.
For each message you can create a new CorrelationData
instance and set it as the value of your message’s SolaceBinderHeaders.CONFIRM_CORRELATION
header.
ℹ️
|
CorrelationData can be extended to add more correlation info. The SolaceBinderHeaders.CONFIRM_CORRELATION header is not reflected in the actual message published to the broker.
|
Now using CorrelationData.getFuture().get()
, you can wait for a publish acknowledgment from the broker. If the publish failed, then this future will throw an exception.
For example:
@Autowired
private StreamBridge streamBridge;
public void send(String payload, long timeout, TimeUnit unit) {
CorrelationData correlationData = new CorrelationData();
Message<SensorReading> message = MessageBuilder.withPayload(payload)
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData)
.build();
streamBridge.send("output-destination", message);
try {
correlationData.getFuture().get(timeout, unit);
// Do success logic
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Do failure logic
}
}
ℹ️
|
CorrelationData with Batched Messages
When using Batch Producers, the The
|
Solace binders can report health statuses via the Spring Boot Actuator health endpoint. To enable this feature, add Spring Boot Actuator to the classpath. To manually disable this feature, set management.health.binders.enabled=false
.
Health Status | Description |
---|---|
UP |
Status indicating that the binder is functioning as expected. |
RECONNECTING |
Status indicating that the binder is actively trying to reconnect to the message broker. This is a custom health status. It isn’t included in the health severity order list ( |
DOWN |
Status indicating that the binder has suffered an unexpected failure. For instance, the binder may have exhausted all reconnection attempts. User intervention is likely required. |
Leveraging Spring Metrics, the Solace PubSub+ binder exposes the following metrics:
Name | Type | Tags | Description |
---|---|---|---|
|
Base Units: |
|
Message payload size. This is the payload size of the messages received (if |
|
Base Units: |
|
Total message size. This is the total size of the messages received (if |
For more information about Spring Cloud Streams try these resources:
For more information about Solace technology in general please visit these resources:
-
The Solace Developer Portal website at: https://solace.dev
-
Ask the Solace community