diff --git a/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotEvent.java b/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotEvent.java new file mode 100644 index 00000000..7c82dab9 --- /dev/null +++ b/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotEvent.java @@ -0,0 +1,40 @@ +package io.eventuate.examples.tram.ordersandcustomers.commondomain; + +public class CustomerSnapshotEvent { + private Long id; + private String name; + private Money creditLimit; + + public CustomerSnapshotEvent() { + } + + public CustomerSnapshotEvent(Long id, String name, Money creditLimit) { + this.id = id; + this.name = name; + this.creditLimit = creditLimit; + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Money getCreditLimit() { + return creditLimit; + } + + public void setCreditLimit(Money creditLimit) { + this.creditLimit = creditLimit; + } +} diff --git a/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotStartingOffsetEvent.java b/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotStartingOffsetEvent.java new file mode 100644 index 00000000..31eb2f80 --- /dev/null +++ b/common/src/main/java/io/eventuate/examples/tram/ordersandcustomers/commondomain/CustomerSnapshotStartingOffsetEvent.java @@ -0,0 +1,42 @@ +package io.eventuate.examples.tram.ordersandcustomers.commondomain; + +import io.eventuate.tram.events.common.DomainEvent; + +public class CustomerSnapshotStartingOffsetEvent implements DomainEvent { + private String topic; + private int partition; + private long offset; + + public CustomerSnapshotStartingOffsetEvent() { + } + + public CustomerSnapshotStartingOffsetEvent(String topic, int partition, long offset) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getPartition() { + return partition; + } + + public void setPartition(int partition) { + this.partition = partition; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } +} diff --git a/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/CustomerConfiguration.java b/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/CustomerConfiguration.java index f155bb38..a230d9b5 100644 --- a/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/CustomerConfiguration.java +++ b/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/CustomerConfiguration.java @@ -2,11 +2,13 @@ import io.eventuate.examples.tram.ordersandcustomers.customers.service.CustomerService; import io.eventuate.examples.tram.ordersandcustomers.customers.service.OrderEventConsumer; +import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer; import io.eventuate.tram.consumer.kafka.TramConsumerKafkaConfiguration; import io.eventuate.tram.events.publisher.TramEventsPublisherConfiguration; import io.eventuate.tram.events.subscriber.DomainEventDispatcher; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -36,4 +38,9 @@ public CustomerService customerService() { return new CustomerService(); } + @Bean + public EventuateKafkaProducer eventuateKafkaProducer(@Value("${eventuatelocal.kafka.bootstrap.servers}") String eventuateKafkaBootstrapServers) { + return new EventuateKafkaProducer(eventuateKafkaBootstrapServers); + } + } diff --git a/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/service/CustomerService.java b/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/service/CustomerService.java index 4234c9be..0b37d42d 100644 --- a/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/service/CustomerService.java +++ b/customer-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/service/CustomerService.java @@ -1,13 +1,26 @@ package io.eventuate.examples.tram.ordersandcustomers.customers.service; +import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotEvent; +import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotStartingOffsetEvent; import io.eventuate.examples.tram.ordersandcustomers.commondomain.Money; import io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer; import io.eventuate.examples.tram.ordersandcustomers.customers.domain.CustomerRepository; +import io.eventuate.javaclient.commonimpl.JSonMapper; +import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer; import io.eventuate.tram.events.ResultWithEvents; import io.eventuate.tram.events.publisher.DomainEventPublisher; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; + public class CustomerService { + private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private CustomerRepository customerRepository; @@ -15,10 +28,40 @@ public class CustomerService { @Autowired private DomainEventPublisher domainEventPublisher; + @Autowired + private EventuateKafkaProducer eventuateKafkaProducer; + public Customer createCustomer(String name, Money creditLimit) { ResultWithEvents customerWithEvents = Customer.create(name, creditLimit); Customer customer = customerRepository.save(customerWithEvents.result); domainEventPublisher.publish(Customer.class, customer.getId(), customerWithEvents.events); return customer; } + + public void exportSnapshots() { + AtomicReference> metadataFuture = new AtomicReference<>(); + + StreamSupport + .stream(customerRepository.findAll().spliterator(), false) + .forEach(customer -> { + CustomerSnapshotEvent customerSnapshotEvent = new CustomerSnapshotEvent(customer.getId(), customer.getName(), customer.getCreditLimit()); + + CompletableFuture metadata = eventuateKafkaProducer.send("CustomerSnapshot", null, JSonMapper.toJson(customerSnapshotEvent)); + metadataFuture.compareAndSet(null, metadata); + }); + + try { + RecordMetadata metadata = (RecordMetadata)metadataFuture.get().get(); + + eventuateKafkaProducer.send("CustomerSnapshotStartingOffset", + null, + JSonMapper.toJson(new CustomerSnapshotStartingOffsetEvent(metadata.topic(), + metadata.partition(), + metadata.offset()))); + + } catch (InterruptedException | ExecutionException e) { + logger.error(e.getMessage(), e); + //TODO: report error to client + } + } } diff --git a/customer-service/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/web/CustomerController.java b/customer-service/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/web/CustomerController.java index fb7bb16d..a091eab3 100644 --- a/customer-service/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/web/CustomerController.java +++ b/customer-service/src/main/java/io/eventuate/examples/tram/ordersandcustomers/customers/web/CustomerController.java @@ -25,4 +25,9 @@ public CreateCustomerResponse createCustomer(@RequestBody CreateCustomerRequest Customer customer = customerService.createCustomer(createCustomerRequest.getName(), createCustomerRequest.getCreditLimit()); return new CreateCustomerResponse(customer.getId()); } + + @RequestMapping(value = "/export-snapshots", method = RequestMethod.POST) + public void exportSnapshots() { + customerService.exportSnapshots(); + } } diff --git a/gradle.properties b/gradle.properties index 01cad4db..3b7b358f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -6,6 +6,6 @@ eventuateMavenRepoUrl=https://dl.bintray.com/eventuateio-oss/eventuate-maven-rel springBootVersion=1.4.5.RELEASE eventuateClientVersion=0.20.1.RELEASE -eventuateTramVersion=0.7.0.RELEASE +eventuateTramVersion=0.6.0-SNAPSHOT dockerComposePluginVersion=0.4.5 version=0.1.0-SNAPSHOT \ No newline at end of file diff --git a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/CustomerHistoryEventConsumer.java b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/CustomerHistoryEventConsumer.java index 6bab791d..321c82ce 100644 --- a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/CustomerHistoryEventConsumer.java +++ b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/CustomerHistoryEventConsumer.java @@ -1,10 +1,15 @@ package io.eventuate.examples.tram.ordersandcustomers.orderhistory.backend; -import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerCreatedEvent; -import io.eventuate.tram.events.subscriber.DomainEventEnvelope; -import io.eventuate.tram.events.subscriber.DomainEventHandlers; -import io.eventuate.tram.events.subscriber.DomainEventHandlersBuilder; +import com.google.common.collect.ImmutableList; +import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotEvent; +import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotStartingOffsetEvent; +import io.eventuate.javaclient.commonimpl.JSonMapper; +import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.util.UUID; public class CustomerHistoryEventConsumer { @@ -12,17 +17,52 @@ public class CustomerHistoryEventConsumer { @Autowired private OrderHistoryViewService orderHistoryViewService; + @Value("${eventuatelocal.kafka.bootstrap.servers}") + private String kafkaBootstrapServers; - public DomainEventHandlers domainEventHandlers() { - return DomainEventHandlersBuilder - .forAggregateType("io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer") - .onEvent(CustomerCreatedEvent.class, this::customerCreatedEventHandler) - .build(); - } +// @Autowired +// private EventuateKafkaConsumer eventuateKafkaConsumer; +// public DomainEventHandlers domainEventHandlers() { +// return DomainEventHandlersBuilder +// .forAggregateType("io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer") +// .onEvent(CustomerCreatedEvent.class, this::customerCreatedEventHandler) +// .build(); +// } +// +// private void customerCreatedEventHandler(DomainEventEnvelope domainEventEnvelope) { +// CustomerCreatedEvent customerCreatedEvent = domainEventEnvelope.getEvent(); +// orderHistoryViewService.createCustomer(Long.parseLong(domainEventEnvelope.getAggregateId()), +// customerCreatedEvent.getName(), customerCreatedEvent.getCreditLimit()); +// } + + @PostConstruct + public void init() { + EventuateKafkaConsumer offsetConsumer = new EventuateKafkaConsumer(UUID.randomUUID().toString(), (offsetRecord, offsetCallback) -> { + CustomerSnapshotStartingOffsetEvent snapshotStartingOffsetEvent = JSonMapper.fromJson(offsetRecord.value(), + CustomerSnapshotStartingOffsetEvent.class); + + EventuateKafkaConsumer snapshotConsumer = new EventuateKafkaConsumer(UUID.randomUUID().toString(), + (snapshotRecord, snapshotCallback) -> { + CustomerSnapshotEvent customerSnapshotEvent = JSonMapper.fromJson(snapshotRecord.value(), CustomerSnapshotEvent.class); + + orderHistoryViewService.createCustomer(customerSnapshotEvent.getId(), + customerSnapshotEvent.getName(), + customerSnapshotEvent.getCreditLimit()); + + offsetCallback.accept(null, null); + }, + ImmutableList.of("CustomerSnapshot"), + kafkaBootstrapServers); + + //TODO: fix consumer fix (error: No current assignment for partition CustomerSnapshot-1) +// snapshotConsumer.setTopicPartitionOffsets(ImmutableMap.of(new TopicPartition(snapshotStartingOffsetEvent.getTopic(), +// snapshotStartingOffsetEvent.getPartition()), snapshotStartingOffsetEvent.getOffset())); + + snapshotConsumer.start(); + + offsetCallback.accept(null, null); + }, ImmutableList.of("CustomerSnapshotStartingOffset"), kafkaBootstrapServers); - private void customerCreatedEventHandler(DomainEventEnvelope domainEventEnvelope) { - CustomerCreatedEvent customerCreatedEvent = domainEventEnvelope.getEvent(); - orderHistoryViewService.createCustomer(Long.parseLong(domainEventEnvelope.getAggregateId()), - customerCreatedEvent.getName(), customerCreatedEvent.getCreditLimit()); + offsetConsumer.start(); } } diff --git a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java index 0740903c..2a0e4455 100644 --- a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java +++ b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java @@ -34,11 +34,11 @@ public CustomerHistoryEventConsumer customerHistoryEventConsumer() { return new CustomerHistoryEventConsumer(); } - @Bean("customerHistoryDomainEventDispatcher") - public DomainEventDispatcher customerHistoryDomainEventDispatcher(CustomerHistoryEventConsumer customerHistoryEventConsumer, - MessageConsumer messageConsumer) { - - return new DomainEventDispatcher("customerHistoryServiceEvents", - customerHistoryEventConsumer.domainEventHandlers(), messageConsumer); - } +// @Bean("customerHistoryDomainEventDispatcher") +// public DomainEventDispatcher customerHistoryDomainEventDispatcher(CustomerHistoryEventConsumer customerHistoryEventConsumer, +// MessageConsumer messageConsumer) { +// +// return new DomainEventDispatcher("customerHistoryServiceEvents", +// customerHistoryEventConsumer.domainEventHandlers(), messageConsumer); +// } }