Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The event suddenly died and could not be handled #203

Open
ThienPhuc92 opened this issue Mar 12, 2024 · 0 comments
Open

The event suddenly died and could not be handled #203

ThienPhuc92 opened this issue Mar 12, 2024 · 0 comments

Comments

@ThienPhuc92
Copy link

ThienPhuc92 commented Mar 12, 2024

Problem

I register event consumers according to the following structure
My version core and message is 0.32.0.RELEASE

public DomainEventHandlers domainEventHandlers() {
        log.debug("Building Domain Event Handler for Engine Event Consumer");
        String engineEventKafkaTopic = globalSetting.get(AppSettingKey.EVENTUATE.CMD_TOPIC);
        return DomainEventHandlersBuilder
            .forAggregateType(engineEventKafkaTopic)
            .onEvent(CmdUpdatedStatusEvent.class, this::handleCmdUpdatedStatusEvent)
           .build();
}

private void handleCmdUpdatedStatusEvent(DomainEventEnvelope<CmdUpdatedStatusEvent> event) {
        log.info("Received CmdUpdatedStatusEvent: {}", event.getEvent());
        try {
            cmdEventHandler.handleEvent(event.getEvent());
        } catch (Exception exception) {
            log.debug("Error when handle cmd updated status event!", exception);
            saveErrorMessage(event, exception.getMessage());
        }
    }

private <T extends DomainEvent> void saveErrorMessage(DomainEventEnvelope<T> event, String messageLog) {
        try {
            Message message = event.getMessage();

            CreateReceivedMessagesErrorCommand createReceivedMessagesErrorCommand = new CreateReceivedMessagesErrorCommand();
            createReceivedMessagesErrorCommand.setMessageId(message.getId());
            createReceivedMessagesErrorCommand.setEventType(event.getEvent().getClass().getSimpleName());
            createReceivedMessagesErrorCommand.setHeader(objectMapper.writeValueAsString(message.getHeaders()));
            createReceivedMessagesErrorCommand.setPayload(message.getPayload());
            createReceivedMessagesErrorCommand.setLog(messageLog);
            createReceivedMessagesErrorCommand.setStatus(ReceivedMessagesErrorStatus.PENDING);
            createReceivedMessagesErrorCommand.setCreationTime(Instant.now());

            saveReceivedMessagesErrorUseCase.create(createReceivedMessagesErrorCommand);
        } catch (Exception e) {
            log.error("An error occurred while trying convert and send command to create received message error", e);
        }
    }

Everything was fine, and then the event suddenly died for no apparent reason
When I restart the application the application continues to consume events and after a period of a few days it automatically stops consuming

Below is the processing log of eventuate

Log
When eventuate is consumed, this segment usually only has 1 offset. But when the event has a problem, all the offsets are gathered here

To commit CI_DOP_V3_EVENT_CONSUMER io.eventuate.messaging.kafka.basic.consumer.OffsetTracker@59d137a1[state={CI_DOP_V3_EVENT-0=io.eventuate.messaging.kafka.basic.consumer.TopicPartitionOffsets@1f75b245[unprocessed=[52274]

image

I using mysql and I see that it always locked on before the event break down
OBJECT_NAME received_messages
LOCK_TYPE TABLE
LOCK_MODE IX

Do you have any ideals about this issue? Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant