Skip to content

Commit

Permalink
Merge pull request #199 from SolaceProducts/crushton/DATAGO-78976_cme…
Browse files Browse the repository at this point in the history
…a_metrics

DATAGO-78976: Quality | EMA metrics for number of ongoing jobs + health check metrics
  • Loading branch information
CameronRushton authored Aug 30, 2024
2 parents 7a53aa5 + da3cdc3 commit 2169498
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public ScanCommandMessage(String messagingServiceId,
this(messagingServiceId, scanId, scanTypes, destinations, null);
}



@Override
public String toLog() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.solace.maas.ep.common.metrics;

public class ObservabilityConstants {
public static final String MAAS_EMA_SCAN_EVENT_SENT = "maas.ema.scan_event.sent";
public static final String MAAS_EMA_CONFIG_PUSH_EVENT_SENT = "maas.ema.config_push_event.sent";
public static final String MAAS_EMA_HEARTBEAT_EVENT_SENT = "maas.ema.heartbeat_event.sent";

public static final String MAAS_EMA_SCAN_EVENT_RECEIVED = "maas.ema.scan_event.received";
public static final String MAAS_EMA_CONFIG_PUSH_EVENT_RECEIVED = "maas.ema.config_push_event.received";

public static final String MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME = "maas.ema.config_push_event.cycle_time";

public static final String STATUS_TAG = "status";
public static final String ORG_ID_TAG = "org_id";
public static final String SCAN_ID_TAG = "scan_id";

private ObservabilityConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.solace.maas.ep.event.management.agent.processor.CommandLogStreamingProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -24,14 +26,21 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_CONFIG_PUSH_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;
import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.ACTOR_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID;
Expand All @@ -49,18 +58,21 @@ public class CommandManager {
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;
private final Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt;
private final MeterRegistry meterRegistry;

public CommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt,
MeterRegistry meterRegistry) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
this.meterRegistry = meterRegistry;
configPushPool = new ThreadPoolTaskExecutor();
configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
Expand All @@ -73,6 +85,7 @@ public CommandManager(TerraformManager terraformManager,

public void execute(CommandMessage request) {
CommandRequest requestBO = commandMapper.map(request);
requestBO.setCreatedTime(Instant.now());
CompletableFuture.runAsync(() -> configPush(requestBO), configPushPool)
.exceptionally(e -> {
log.error("Error running command", e);
Expand Down Expand Up @@ -221,6 +234,14 @@ private void finalizeAndSendResponse(CommandRequest request) {
response.setTraceId(MDC.get(TRACE_ID));
response.setActorId(MDC.get(ACTOR_ID));
commandPublisher.sendCommandResponse(response, topicVars);
meterRegistry.counter(MAAS_EMA_CONFIG_PUSH_EVENT_SENT, ORG_ID_TAG, response.getOrgId(),
STATUS_TAG, response.getStatus().name()).increment();
Timer jobCycleTime = Timer
.builder(MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME)
.tag(ORG_ID_TAG, response.getOrgId())
.tag(STATUS_TAG, request.getStatus().name())
.register(meterRegistry);
jobCycleTime.record(request.getLifetime(ChronoUnit.MILLIS), TimeUnit.MILLISECONDS);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.jacoco.ExcludeFromJacocoGeneratedReport;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.info.BuildProperties;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_HEARTBEAT_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;

@ExcludeFromJacocoGeneratedReport
@Component
Expand All @@ -23,21 +31,33 @@ public class HeartbeatGenerator {
private final String runtimeAgentId;
private final String topic;
private final String runtimeAgentVersion;
private final MeterRegistry meterRegistry;

public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
EventPortalProperties eventPortalProperties,
SolacePublisher solacePublisher,
BuildProperties buildProperties) {
BuildProperties buildProperties,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
topic = solaceConfiguration.getTopicPrefix() + "heartbeat/v1";
this.runtimeAgentVersion = getFormattedVersion(buildProperties.getVersion());
this.meterRegistry = meterRegistry;
}

@Scheduled(fixedRate = 5000)
public void sendHeartbeat() {
HeartbeatMessage message = new HeartbeatMessage(runtimeAgentId, Instant.now().toString(), runtimeAgentVersion);
solacePublisher.publish(message, topic);
boolean result = solacePublisher.publish(message, topic);
logHealthMetric(message, result);
}

private void logHealthMetric(HeartbeatMessage message, boolean isHealthy) {
List<Tag> tags = new ArrayList<>();
if (Objects.nonNull(message.getOrgId())) {
tags.add(Tag.of(ORG_ID_TAG, message.getOrgId()));
}
meterRegistry.gauge(MAAS_EMA_HEARTBEAT_EVENT_SENT, tags, isHealthy ? 1 : 0);
}

private String getFormattedVersion(String version) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanDataPublisher {

private final SolacePublisher solacePublisher;
private final MeterRegistry meterRegistry;

public ScanDataPublisher(SolacePublisher solacePublisher) {
public ScanDataPublisher(SolacePublisher solacePublisher,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.meterRegistry = meterRegistry;
}

/**
Expand Down Expand Up @@ -43,6 +53,11 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {
topicDetails.get("scanId"),
topicDetails.get("scanType"));

solacePublisher.publish(message, topicString);
boolean isSuccessful = solacePublisher.publish(message, topicString);

meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT,
STATUS_TAG, isSuccessful ? ScanStatus.COMPLETE.name() : ScanStatus.FAILED.name(),
SCAN_ID_TAG, topicDetails.get("scanId"),
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanOverallStatusException;
import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanStatusException;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -14,15 +15,23 @@
import java.util.List;
import java.util.Map;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanStatusPublisher {

private final SolacePublisher solacePublisher;
private final MeterRegistry meterRegistry;

public ScanStatusPublisher(SolacePublisher solacePublisher) {
public ScanStatusPublisher(SolacePublisher solacePublisher,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.meterRegistry = meterRegistry;
}

/**
Expand All @@ -46,6 +55,9 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map<String, String>
} catch (Exception e) {
throw new ScanOverallStatusException("Over all status exception: " + e.getMessage(),
Map.of(scanId, List.of(e)), "Overall status", Arrays.asList(scanType.split(",")), ScanStatus.valueOf(status));
} finally {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId,
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}

Expand All @@ -72,6 +84,9 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map<String, String
} catch (Exception e) {
throw new ScanStatusException("Route status exception: " + e.getMessage(),
Map.of(scanId, List.of(e)), "Route status", List.of(scanType), ScanStatus.valueOf(status));
} finally {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId,
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public String scan(ScanRequestBO scanRequestBO) {
brokerScanType, e.getKey()))
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.collect(Collectors.toList()).stream()
.toList().stream()
)
.collect(Collectors.toList()).stream().flatMap(List::stream).collect(Collectors.toList());
.toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanTypeBO;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
Expand All @@ -40,6 +41,10 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

/**
* Responsible for initiating and managing Messaging Service scans.
*/
Expand All @@ -62,11 +67,17 @@ public class ScanService {

private final IDGenerator idGenerator;

private final MeterRegistry meterRegistry;

public ScanService(ScanRepository repository,
ScanRecipientHierarchyRepository scanRecipientHierarchyRepository,
ScanTypeRepository scanTypeRepository, ScanStatusRepository scanStatusRepository, ScanRouteService scanRouteService,
RouteService routeService, ProducerTemplate producerTemplate,
IDGenerator idGenerator) {
ScanTypeRepository scanTypeRepository,
ScanStatusRepository scanStatusRepository,
ScanRouteService scanRouteService,
RouteService routeService,
ProducerTemplate producerTemplate,
IDGenerator idGenerator,
MeterRegistry meterRegistry) {
this.repository = repository;
this.scanRecipientHierarchyRepository = scanRecipientHierarchyRepository;
this.scanTypeRepository = scanTypeRepository;
Expand All @@ -75,6 +86,7 @@ public ScanService(ScanRepository repository,
this.routeService = routeService;
this.producerTemplate = producerTemplate;
this.idGenerator = idGenerator;
this.meterRegistry = meterRegistry;
}

/**
Expand Down Expand Up @@ -284,6 +296,7 @@ public void sendScanStatus(String groupId, String scanId, String traceId, String
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS, status);
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS_DESC, "");
});
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status.name(), SCAN_ID_TAG, scanId).increment();
}

protected CompletableFuture<Exchange> scanAsync(String groupId, String scanId, String traceId, String actorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class CommandMessageHandler extends SolaceDirectMessageHandler<CommandMes

public CommandMessageHandler(
SolaceConfiguration solaceConfiguration,
SolaceSubscriber solaceSubscriber, CommandMessageProcessor commandMessageProcessor) {
SolaceSubscriber solaceSubscriber,
CommandMessageProcessor commandMessageProcessor) {
super(solaceConfiguration.getTopicPrefix() + "command/v1/>", solaceSubscriber);
this.commandMessageProcessor = commandMessageProcessor;
}
Expand Down
Loading

0 comments on commit 2169498

Please sign in to comment.