Skip to content

Commit

Permalink
feat: add micrometer tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
helios57 committed Sep 14, 2024
1 parent 518b559 commit 2410796
Show file tree
Hide file tree
Showing 25 changed files with 445 additions and 96 deletions.
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>micrometer-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -208,6 +213,23 @@
<artifactId>gson-fire</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
<exclusions>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.wavefront</groupId>
<artifactId>wavefront-sdk-java</artifactId>
<version>3.4.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.solace.spring.cloud.stream.binder;

import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
import com.solace.spring.cloud.stream.binder.inbound.*;
import com.solace.spring.cloud.stream.binder.inbound.JCSMPInboundQueueMessageProducer;
import com.solace.spring.cloud.stream.binder.inbound.topic.JCSMPInboundTopicMessageMultiplexer;
import com.solace.spring.cloud.stream.binder.inbound.topic.JCSMPInboundTopicMessageProducer;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
Expand All @@ -12,6 +12,7 @@
import com.solace.spring.cloud.stream.binder.provisioning.SolaceConsumerDestination;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceEndpointProvisioner;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import com.solace.spring.cloud.stream.binder.util.*;
import com.solacesystems.jcsmp.*;
import lombok.Setter;
Expand All @@ -20,14 +21,13 @@
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand All @@ -46,24 +46,27 @@ public class SolaceMessageChannelBinder
private final JCSMPSessionProducerManager sessionProducerManager;
private final AtomicBoolean consumersRemoteStopFlag = new AtomicBoolean(false);
private final String errorHandlerProducerKey = UUID.randomUUID().toString();
@Setter
private SolaceMeterAccessor solaceMeterAccessor;
private final Optional<SolaceMeterAccessor> solaceMeterAccessor;
private final Optional<TracingProxy> tracingProxy;
private final Optional<SolaceBinderHealthAccessor> solaceBinderHealthAccessor;
@Setter
private SolaceExtendedBindingProperties extendedBindingProperties = new SolaceExtendedBindingProperties();
private static final SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy = new SolaceMessageHeaderErrorMessageStrategy();
@Nullable
private SolaceBinderHealthAccessor solaceBinderHealthAccessor;

public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, SolaceEndpointProvisioner solaceEndpointProvisioner) {
this(jcsmpSession, null, solaceEndpointProvisioner);
}

public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, Context jcsmpContext, SolaceEndpointProvisioner solaceEndpointProvisioner) {
public SolaceMessageChannelBinder(JCSMPSession jcsmpSession,
Context jcsmpContext,
SolaceEndpointProvisioner solaceEndpointProvisioner,
Optional<SolaceMeterAccessor> solaceMeterAccessor,
Optional<TracingProxy> tracingProxy,
Optional<SolaceBinderHealthAccessor> solaceBinderHealthAccessor) {
super(new String[0], solaceEndpointProvisioner);
this.jcsmpSession = jcsmpSession;
this.jcsmpContext = jcsmpContext;
this.solaceMeterAccessor = solaceMeterAccessor;
this.tracingProxy = tracingProxy;
this.solaceBinderHealthAccessor=solaceBinderHealthAccessor;
this.sessionProducerManager = new JCSMPSessionProducerManager(jcsmpSession);
this.jcsmpInboundTopicMessageMultiplexer = new JCSMPInboundTopicMessageMultiplexer(jcsmpSession, () -> this.solaceMeterAccessor);
this.jcsmpInboundTopicMessageMultiplexer = new JCSMPInboundTopicMessageMultiplexer(jcsmpSession, this.solaceMeterAccessor, this.tracingProxy);
}

@Override
Expand Down Expand Up @@ -92,7 +95,8 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
errorChannel,
sessionProducerManager,
producerProperties,
solaceMeterAccessor);
solaceMeterAccessor,
tracingProxy);

if (errorChannel != null) {
handler.setErrorMessageStrategy(errorMessageStrategy);
Expand Down Expand Up @@ -120,11 +124,9 @@ protected MessageProducer createQueueMessageProducer(ConsumerDestination destina
jcsmpSession,
properties,
getConsumerEndpointProperties(properties),
solaceMeterAccessor);

if (solaceBinderHealthAccessor != null) {
adapter.setSolaceBinderHealthAccessor(solaceBinderHealthAccessor);
}
solaceMeterAccessor,
tracingProxy,
solaceBinderHealthAccessor);

adapter.setRemoteStopFlag(consumersRemoteStopFlag);
adapter.setPostStart(getConsumerPostStart(solaceDestination, properties));
Expand Down Expand Up @@ -209,10 +211,6 @@ public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEn
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}

public void setSolaceBinderHealthAccessor(@Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor) {
this.solaceBinderHealthAccessor = solaceBinderHealthAccessor;
}

/**
* WORKAROUND (SOL-4272) ----------------------------------------------------------
* Temporary endpoints are only provisioned when the consumer is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static SessionCacheEntry createSession(JCSMPProperties jcsmpProperties,
}
throw new RuntimeException(e);
}
SolaceEndpointProvisioner solaceEndpointProvisioner = new SolaceEndpointProvisioner(jcsmpSession, jcsmpSessionEventHandler);
SolaceEndpointProvisioner solaceEndpointProvisioner = new SolaceEndpointProvisioner(jcsmpSession);
return new SessionCacheEntry(properties, jcsmpSessionEventHandler, jcsmpSession, context, solaceEndpointProvisioner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceExtendedBindingProperties;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceEndpointProvisioner;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import com.solacesystems.jcsmp.Context;
import com.solacesystems.jcsmp.JCSMPSession;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.lang.Nullable;

import java.util.Optional;

@RequiredArgsConstructor
@Configuration
Expand All @@ -24,13 +26,17 @@ public class SolaceMessageChannelBinderConfiguration {
private final Context context;

@Bean
SolaceMessageChannelBinder solaceMessageChannelBinder(SolaceEndpointProvisioner solaceEndpointProvisioner, @Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor, @Nullable SolaceMeterAccessor solaceMeterAccessor) {
SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, context, solaceEndpointProvisioner);
SolaceMessageChannelBinder solaceMessageChannelBinder(SolaceEndpointProvisioner solaceEndpointProvisioner,
Optional<SolaceMeterAccessor> solaceMeterAccessor,
Optional<TracingProxy> tracingProxy,
Optional<SolaceBinderHealthAccessor> solaceBinderHealthAccessor) {
SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession,
context,
solaceEndpointProvisioner,
solaceMeterAccessor,
tracingProxy,
solaceBinderHealthAccessor);
binder.setExtendedBindingProperties(solaceExtendedBindingProperties);
binder.setSolaceMeterAccessor(solaceMeterAccessor);
if (solaceBinderHealthAccessor != null) {
binder.setSolaceBinderHealthAccessor(solaceBinderHealthAccessor);
}
return binder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.solace.spring.cloud.stream.binder.config;

import com.solace.spring.cloud.stream.binder.tracing.TracingImpl;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@ConditionalOnBean({Tracer.class, Propagator.class})
@Configuration
public class SolaceTracerConfiguration {

@Bean

public TracingImpl tracingImpl(@Autowired Tracer tracer, @Autowired Propagator propagator) {
return new TracingImpl(tracer, propagator);
}

@Bean
public TracingProxy tracingProxy(TracingImpl tracing) {
return new TracingProxy(tracing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.SolaceAckUtil;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
import com.solace.spring.cloud.stream.binder.util.SolaceAcknowledgmentException;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -15,6 +16,7 @@
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -30,7 +32,8 @@ public class BasicInboundXMLMessageListener extends InboundXMLMessageListener {
Consumer<Message<?>> messageConsumer,
JCSMPAcknowledgementCallbackFactory ackCallbackFactory,
BiFunction<Message<?>, RuntimeException, Boolean> errorHandlerFunction,
@Nullable SolaceMeterAccessor solaceMeterAccessor,
Optional<SolaceMeterAccessor> solaceMeterAccessor,
Optional<TracingProxy> tracingProxy,
@Nullable AtomicBoolean remoteStopFlag,
ThreadLocal<AttributeAccessor> attributesHolder,
boolean needHolderAndAttributes) {
Expand All @@ -40,6 +43,7 @@ public class BasicInboundXMLMessageListener extends InboundXMLMessageListener {
messageConsumer,
ackCallbackFactory,
solaceMeterAccessor,
tracingProxy,
remoteStopFlag,
attributesHolder,
needHolderAndAttributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.SolaceAckUtil;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import com.solace.spring.cloud.stream.binder.util.*;
import com.solacesystems.jcsmp.*;
import lombok.Getter;
Expand All @@ -20,6 +21,7 @@
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -34,22 +36,33 @@ abstract class InboundXMLMessageListener implements Runnable {
private final XMLMessageMapper xmlMessageMapper;
private final Consumer<Message<?>> messageConsumer;
private final JCSMPAcknowledgementCallbackFactory ackCallbackFactory;
@Nullable
private final SolaceMeterAccessor solaceMeterAccessor;
private final Optional<SolaceMeterAccessor> solaceMeterAccessor;
private final Optional<TracingProxy> tracingProxy;
private final boolean needHolder;
private final boolean needAttributes;
@Getter
private final AtomicBoolean stopFlag = new AtomicBoolean(false);
private final Supplier<Boolean> remoteStopFlag;
private final LargeMessageSupport largeMessageSupport = new LargeMessageSupport();

InboundXMLMessageListener(FlowReceiverContainer flowReceiverContainer, ConsumerDestination consumerDestination, ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties, Consumer<Message<?>> messageConsumer, JCSMPAcknowledgementCallbackFactory ackCallbackFactory, @Nullable SolaceMeterAccessor solaceMeterAccessor, @Nullable AtomicBoolean remoteStopFlag, ThreadLocal<AttributeAccessor> attributesHolder, boolean needHolder, boolean needAttributes) {
InboundXMLMessageListener(FlowReceiverContainer flowReceiverContainer,
ConsumerDestination consumerDestination,
ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties,
Consumer<Message<?>> messageConsumer,
JCSMPAcknowledgementCallbackFactory ackCallbackFactory,
Optional<SolaceMeterAccessor> solaceMeterAccessor,
Optional<TracingProxy> tracingProxy,
@Nullable AtomicBoolean remoteStopFlag,
ThreadLocal<AttributeAccessor> attributesHolder,
boolean needHolder,
boolean needAttributes) {
this.flowReceiverContainer = flowReceiverContainer;
this.consumerDestination = consumerDestination;
this.consumerProperties = consumerProperties;
this.messageConsumer = messageConsumer;
this.ackCallbackFactory = ackCallbackFactory;
this.solaceMeterAccessor = solaceMeterAccessor;
this.tracingProxy = tracingProxy;
this.remoteStopFlag = () -> remoteStopFlag != null && remoteStopFlag.get();
this.attributesHolder = attributesHolder;
this.needHolder = needHolder;
Expand Down Expand Up @@ -101,8 +114,8 @@ private void receive() throws UnboundFlowReceiverContainerException, StaleSessio
return;
}

if (solaceMeterAccessor != null && messageContainer != null) {
solaceMeterAccessor.recordMessage(consumerProperties.getBindingName(), messageContainer.getMessage());
if (solaceMeterAccessor.isPresent() && messageContainer != null) {
solaceMeterAccessor.get().recordMessage(consumerProperties.getBindingName(), messageContainer.getMessage());
}

try {
Expand All @@ -129,7 +142,15 @@ private void processMessage(MessageContainer messageContainer) {

private void processMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallback acknowledgmentCallback) {
try {
handleMessage(() -> createOneMessage(bytesXMLMessage, acknowledgmentCallback), m -> sendOneToConsumer(m, bytesXMLMessage), acknowledgmentCallback);
Supplier<Message<?>> createMessageSupplier = () -> createOneMessage(bytesXMLMessage, acknowledgmentCallback);
Consumer<Message<?>> sendToCustomerConsumer = m -> sendOneToConsumer(m, bytesXMLMessage);
if (tracingProxy.isPresent()) {
SDTMap tracingHeader = bytesXMLMessage.getProperties().getMap(TracingProxy.TRACING_HEADER_KEY);
if (tracingHeader != null) {
sendToCustomerConsumer = tracingProxy.get().wrapInTracingContext(tracingHeader, sendToCustomerConsumer);
}
}
handleMessage(createMessageSupplier, sendToCustomerConsumer, acknowledgmentCallback);
} catch (SolaceAcknowledgmentException e) {
throw e;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 2410796

Please sign in to comment.