Skip to content

Commit

Permalink
Merge pull request #6 from protegeproject/WHO-setup
Browse files Browse the repository at this point in the history
increase timeout
  • Loading branch information
matthewhorridge authored Jun 27, 2024
2 parents 3e9a315 + ce8c322 commit ca8cbca
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private R handleResponse(Message rabbitResponse) {

if(exception != null) {
try {
logger.error("Found error on response " + exception);
logger.error("Found error on response {}. Action : {}" ,exception, rabbitResponse.getMessageProperties().getHeaders().get(Headers.METHOD));
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void dispatchEvent(Event event) {
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public RabbitMQEventHandlerWrapper(List<EventHandler<? extends Event>> eventHand

@Override
public void onMessage(Message message) {
logger.info("Handling event with id {}", message.getMessageProperties().getMessageId());
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? ex

@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("Received message " + message);

var replyChannel = message.getMessageProperties().getReplyTo();
if (replyChannel == null) {
String errorMessage = Headers.REPLY_CHANNEL + " header is missing. Cannot reply to message.";
Expand Down Expand Up @@ -86,7 +84,6 @@ public void onMessage(Message message, Channel channel) throws Exception {
}

CommandHandler handler = extractHandler(messageType);
logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}

Expand Down Expand Up @@ -171,10 +168,15 @@ private void authorizeAndReplyToRequest(CommandHandler<Q,R> handler,

private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channel, Message message, UserId userId, Q request, String accessToken) {
var executionContext = new ExecutionContext(userId, accessToken);
long startTime = System.currentTimeMillis();

try {
var response = handler.handleRequest(request, executionContext);
response.subscribe(r -> {
replyWithSuccessResponse(channel, message, userId, r);
long endtime = System.currentTimeMillis();
logger.info("Request executed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}, throwable -> {
if (throwable instanceof CommandExecutionException ex) {
logger.info(
Expand All @@ -183,14 +185,23 @@ private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channe
throwable.getMessage(),
request);
replyWithErrorResponse(message,channel, userId, ex.getStatus());
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");
}
else {
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();

logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
});
} catch (Throwable throwable) {
logger.error("Uncaught exception when handling request", throwable);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyQueue);
container.setConcurrency("15-20");
return container;
}

Expand All @@ -125,6 +126,7 @@ public SimpleMessageListenerContainer messageListenerContainers() {
container.setQueueNames(getCommandQueue());
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitMqCommandHandlerWrapper());
container.setConcurrency("15-20");
return container;
}

Expand Down

0 comments on commit ca8cbca

Please sign in to comment.