Skip to content

Commit

Permalink
Fixing poppy p1
Browse files Browse the repository at this point in the history
  • Loading branch information
Vignesh-kalyanasundaram committed May 30, 2024
1 parent 90fa132 commit 1615266
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class KafkaListenerConfiguration {

@Bean(KAFKA_CONSUMER_FACTORY)
public ConsumerFactory<String, DomainEvent> kafkaDomainEventConsumerFactory() {
var consumerProperties = kafkaProperties.buildConsumerProperties();
var consumerProperties = kafkaProperties.buildConsumerProperties(null);

JsonDeserializer<DomainEvent> deserializer = new JsonDeserializer<>(mapper);
deserializer.setTypeResolver(typeResolver);
Expand Down Expand Up @@ -83,7 +83,7 @@ public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaDomainE

@Bean("kafkaInitialContributionConsumer")
public ConsumerFactory<String, InstanceIterationEvent> kafkaInitialContributionEventConsumerFactory() {
var consumerProperties = kafkaProperties.buildConsumerProperties();
var consumerProperties = kafkaProperties.buildConsumerProperties(null);

JsonDeserializer<InstanceIterationEvent> deserializer = new JsonDeserializer<>(InstanceIterationEvent.class);
deserializer.setUseTypeHeaders(false);
Expand All @@ -107,7 +107,7 @@ public ConcurrentKafkaListenerContainerFactory<String, InstanceIterationEvent> k
factory.setCommonErrorHandler(errorHandler());
return factory;
}

@Bean(BATCH_EVENT_PROCESSOR_RETRY_TEMPLATE)
public RetryTemplate batchEventRetryTemplate() {
return RetryTemplate.builder()
Expand All @@ -121,9 +121,13 @@ public DefaultErrorHandler errorHandler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.info("inside errorHandler for Ongoing contribution");
// logic to execute when all the retry attempts are exhausted
executionService.runTenantScoped(getContributionJobContext().getTenantId(),
try {
executionService.runTenantScoped(getContributionJobContext().getTenantId(),
() -> contributionJobRunner.completeContribution(getContributionJobContext()));
endContributionJobContext();
endContributionJobContext();
} catch (Exception ex) {
log.warn("Exception while processing error handler {} ", ex.getMessage());
}
}, fixedBackOff);
errorHandler.addRetryableExceptions(ServiceSuspendedException.class);
errorHandler.addRetryableExceptions(SocketTimeOutExceptionWrapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.folio.innreach.external.exception.ServiceSuspendedException;
import org.folio.innreach.external.exception.SocketTimeOutExceptionWrapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;
Expand All @@ -28,24 +29,29 @@ public class BatchDomainEventProcessor {

@Qualifier(value = BATCH_EVENT_PROCESSOR_RETRY_TEMPLATE)
private final RetryTemplate retryTemplate;
@Value("${innReachTenants}")
private String innReachTenants;

public <T> void process(List<DomainEvent<T>> batch, Consumer<DomainEvent<T>> recordProcessor) {
log.debug("process:: parameters batch: {}, recordProcessor: {}", batch, recordProcessor);
var tenantEventsMap = batch.stream().collect(Collectors.groupingBy(DomainEvent::getTenant));
for (var tenantEventsEntry : tenantEventsMap.entrySet()) {
var tenantId = tenantEventsEntry.getKey();
var events = tenantEventsEntry.getValue();

try {
executionService.runTenantScoped(tenantId,
() -> processTenantEvents(events, recordProcessor));
}
catch (ServiceSuspendedException | FeignException | InnReachConnectionException | SocketTimeOutExceptionWrapper e) {
log.info("exception thrown from process", e);
throw e;
}
catch (ListenerExecutionFailedException listenerExecutionFailedException) {
log.warn("Consuming this event [{}] not permitted for system user [tenantId={}]", recordProcessor, tenantId);
if (innReachTenants.contains(tenantId)) {
try {
executionService.runTenantScoped(tenantId,
() -> processTenantEvents(events, recordProcessor));
}
catch (ServiceSuspendedException | FeignException | InnReachConnectionException | SocketTimeOutExceptionWrapper e) {
log.info("exception thrown from process", e);
throw e;
}
catch (ListenerExecutionFailedException listenerExecutionFailedException) {
log.warn("Consuming this event [{}] not permitted for system user [tenantId={}]", recordProcessor, tenantId);
}
} else {
log.warn("Ignoring event of a unknown tenant {}, events {}", tenantId, events);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spring:
consumer:
max-poll-records: 20
group-id: ${ENV:folio}-mod-innreach-events-group
auto-offset-reset: earliest
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
ssl:
Expand Down Expand Up @@ -132,6 +133,7 @@ retryable-update:
retry-interval-ms: 0
okapi.url: ${OKAPI_URL:http://okapi:9130}
environment: ${ENV:folio}
innReachTenants: ${INNREACH_TENANTS}
kafka:
listener:
loan:
Expand Down

0 comments on commit 1615266

Please sign in to comment.