Skip to content

Commit

Permalink
DATAGO-70002: Import support in EMA (#163)
Browse files Browse the repository at this point in the history
Co-authored-by: Cameron Rushton <[email protected]>
  • Loading branch information
gregmeldrum and CameronRushton authored Feb 13, 2024
1 parent e099ccd commit 8069186
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 137 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.solace.maas.ep.common.messages;

import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
Expand All @@ -15,6 +16,7 @@ public class CommandMessage extends MOPMessage {
private String commandCorrelationId;
private String context;
private String serviceId;
private JobStatus status;
private List<CommandBundle> commandBundles;

public CommandMessage() {
Expand All @@ -24,6 +26,7 @@ public CommandMessage() {
public CommandMessage(String serviceId,
String commandCorrelationId,
String context,
JobStatus status,
List<CommandBundle> commandBundles) {
super();
withMessageType(MOPMessageType.generic)
Expand All @@ -33,6 +36,7 @@ public CommandMessage(String serviceId,
this.serviceId = serviceId;
this.commandCorrelationId = commandCorrelationId;
this.context = context;
this.status = status;
this.commandBundles = commandBundles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandRequest;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
Expand All @@ -14,6 +15,7 @@
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
Expand All @@ -25,8 +27,10 @@
import java.util.concurrent.CompletableFuture;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.LOG_LEVEL_ERROR;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.setCommandError;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.ACTOR_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformUtils.LOG_LEVEL_ERROR;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformUtils.setCommandError;

@Slf4j
@Service
Expand Down Expand Up @@ -57,26 +61,26 @@ public CommandManager(TerraformManager terraformManager, CommandMapper commandMa
}

public void execute(CommandMessage request) {

CompletableFuture.runAsync(() -> configPush(request), configPushPool)
CommandRequest requestBO = commandMapper.map(request);
CompletableFuture.runAsync(() -> configPush(requestBO), configPushPool)
.exceptionally(e -> {
log.error("Error running command", e);
Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0);
Command firstCommand = requestBO.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, (Exception) e);
sendResponse(request);
finalizeAndSendResponse(requestBO);
return null;
});
}

public void configPush(CommandMessage request) {
public void configPush(CommandRequest request) {
Map<String, String> envVars;
try {
envVars = setBrokerSpecificEnvVars(request.getServiceId());
} catch (Exception e) {
log.error("Error getting terraform variables", e);
Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, e);
sendResponse(request);
finalizeAndSendResponse(request);
return;
}

Expand All @@ -86,7 +90,7 @@ public void configPush(CommandMessage request) {
try {
switch (command.getCommandType()) {
case terraform:
terraformManager.execute(commandMapper.map(request), command, envVars);
terraformManager.execute(request, command, envVars);
break;
default:
command.setResult(CommandResult.builder()
Expand All @@ -103,18 +107,37 @@ public void configPush(CommandMessage request) {
log.error("Error executing command", e);
setCommandError(command, e);
}
if (exitEarlyOnFailedCommand(bundle, command)) {
break;
}
}
}
sendResponse(request);

finalizeAndSendResponse(request);
}

private static boolean exitEarlyOnFailedCommand(CommandBundle bundle, Command command) {
return Boolean.TRUE.equals(bundle.getExitOnFailure())
&& Boolean.FALSE.equals(command.getIgnoreResult())
&& (command.getResult() == null || JobStatus.error.equals(command.getResult().getStatus()));
}

private void sendResponse(CommandMessage request) {
private void finalizeAndSendResponse(CommandRequest request) {
request.determineStatus();
Map<String, String> topicVars = Map.of(
"orgId", request.getOrgId(),
"orgId", eventPortalProperties.getOrganizationId(),
"runtimeAgentId", eventPortalProperties.getRuntimeAgentId(),
COMMAND_CORRELATION_ID, request.getCommandCorrelationId()
);
commandPublisher.sendCommandResponse(request, topicVars);
CommandMessage response = new CommandMessage(request.getServiceId(),
request.getCommandCorrelationId(),
request.getContext(),
request.getStatus(),
request.getCommandBundles());
response.setOrgId(eventPortalProperties.getOrganizationId());
response.setTraceId(MDC.get(TRACE_ID));
response.setActorId(MDC.get(ACTOR_ID));
commandPublisher.sendCommandResponse(response, topicVars);
}

private Map<String, String> setBrokerSpecificEnvVars(String messagingServiceId) {
Expand Down
Loading

0 comments on commit 8069186

Please sign in to comment.