Skip to content

Commit

Permalink
eventuate-tram#2 Building/Rebuilding CQRS views. Starting customer vi…
Browse files Browse the repository at this point in the history
…ew reimplementation.
  • Loading branch information
dartartem committed Jun 4, 2018
1 parent 37757cc commit f7256f7
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.eventuate.examples.tram.ordersandcustomers.commondomain;

public class CustomerSnapshotEvent {
private Long id;
private String name;
private Money creditLimit;

public CustomerSnapshotEvent() {
}

public CustomerSnapshotEvent(Long id, String name, Money creditLimit) {
this.id = id;
this.name = name;
this.creditLimit = creditLimit;
}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Money getCreditLimit() {
return creditLimit;
}

public void setCreditLimit(Money creditLimit) {
this.creditLimit = creditLimit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.eventuate.examples.tram.ordersandcustomers.commondomain;

import io.eventuate.tram.events.common.DomainEvent;

public class CustomerSnapshotStartingOffsetEvent implements DomainEvent {
private String topic;
private int partition;
private long offset;

public CustomerSnapshotStartingOffsetEvent() {
}

public CustomerSnapshotStartingOffsetEvent(String topic, int partition, long offset) {

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

This event is published to a topic, partition, offset.
It does not need to contain this information.

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

But this is data not for this event, but for the snapshot. It only describes where to search the snapshot.

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

This event is published to all partitions of the Customer topic before publishing any snapshots events.
See other comments about how this event's topic partition, offset is returned by the /export endpoint and then used by a command line tool to initialize the subscriber's offsets.

this.topic = topic;
this.partition = partition;
this.offset = offset;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public int getPartition() {
return partition;
}

public void setPartition(int partition) {
this.partition = partition;
}

public long getOffset() {
return offset;
}

public void setOffset(long offset) {
this.offset = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import io.eventuate.examples.tram.ordersandcustomers.customers.service.CustomerService;
import io.eventuate.examples.tram.ordersandcustomers.customers.service.OrderEventConsumer;
import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer;
import io.eventuate.tram.consumer.kafka.TramConsumerKafkaConfiguration;
import io.eventuate.tram.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.events.subscriber.DomainEventDispatcher;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -36,4 +38,9 @@ public CustomerService customerService() {
return new CustomerService();
}

@Bean
public EventuateKafkaProducer eventuateKafkaProducer(@Value("${eventuatelocal.kafka.bootstrap.servers}") String eventuateKafkaBootstrapServers) {
return new EventuateKafkaProducer(eventuateKafkaBootstrapServers);
}

}
Original file line number Diff line number Diff line change
@@ -1,24 +1,67 @@
package io.eventuate.examples.tram.ordersandcustomers.customers.service;

import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotEvent;
import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotStartingOffsetEvent;
import io.eventuate.examples.tram.ordersandcustomers.commondomain.Money;
import io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer;
import io.eventuate.examples.tram.ordersandcustomers.customers.domain.CustomerRepository;
import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer;
import io.eventuate.tram.events.ResultWithEvents;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

public class CustomerService {
private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private CustomerRepository customerRepository;

@Autowired
private DomainEventPublisher domainEventPublisher;

@Autowired
private EventuateKafkaProducer eventuateKafkaProducer;

public Customer createCustomer(String name, Money creditLimit) {
ResultWithEvents<Customer> customerWithEvents = Customer.create(name, creditLimit);
Customer customer = customerRepository.save(customerWithEvents.result);
domainEventPublisher.publish(Customer.class, customer.getId(), customerWithEvents.events);
return customer;
}

public void exportSnapshots() {
AtomicReference<CompletableFuture<?>> metadataFuture = new AtomicReference<>();

StreamSupport
.stream(customerRepository.findAll().spliterator(), false)
.forEach(customer -> {
CustomerSnapshotEvent customerSnapshotEvent = new CustomerSnapshotEvent(customer.getId(), customer.getName(), customer.getCreditLimit());

CompletableFuture<?> metadata = eventuateKafkaProducer.send("CustomerSnapshot", null, JSonMapper.toJson(customerSnapshotEvent));

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

Not sure if there should be round robin dispatching,
if not probably this low level api can be replaced by tram MessageProducer.

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

CustomerSnapshotEvent should be published using the Tram API DomainEventPublisher to the Customer topic (like all the other customer events) with the correct aggregateId.

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

ok

metadataFuture.compareAndSet(null, metadata);
});

try {
RecordMetadata metadata = (RecordMetadata)metadataFuture.get().get();

eventuateKafkaProducer.send("CustomerSnapshotStartingOffset",
null,

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

low level api is used because key should be null.
But why it is necessary to publish this event to all partitions?
And how to handle duplicates (For now there is one subscriber which listens all partitions)?

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

CustomerSnapshotStartingOffsetEvent should be published before publishing any Snapshot Events.

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

The Order History View needs to know the starting offset for all (topic=Customer|Order, partitions).
If a topic has N partitions then you publish the CustomerSnapshotStartingOffset N times using a message key
That's how you then know the starting offset for all partitions.

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

when I asked about offsets in google doc you said: "When you publish an event Kafka returns the offset."
So where I should find offset to publish?
I thought there are snapshot events (CustomerSnapshotEvent) and events which keeps offsets of snapshots events (CustomerSnapshotStartingOffsetEvent). I thought that I need to publish snapshot events and event which contains starting offset of the first snapshot. But you said that I need to publish offsets before publishing snapshots.
So probably I need to ask kafka about last known offset for aggregate for each partition?

Also if these offsets should be returned by api and then used by command line tool, then why we need to publish CustomerSnapshotStartingOffset to kafka (which designed to keep offsets of snapshots)? If our goal just to read snapshots from proper kafka offset?

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

Not exactly.

. Lock Customer table - prevents updates/events being published
. Publish CustomerSnapshotStartingOffsetEvent to all partitions of the Customer Topic. Kafka returns the offset of each of those events. This is the starting point of for the consumer - returned by the /exports REST endpoint
. Publish CustomersnapshotEvents
. Unlock Customer table

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

It seems now I got it, thank you

JSonMapper.toJson(new CustomerSnapshotStartingOffsetEvent(metadata.topic(),
metadata.partition(),

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

The event's constructor should not have any arguments

metadata.offset())));

} catch (InterruptedException | ExecutionException e) {
logger.error(e.getMessage(), e);
//TODO: report error to client
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public CreateCustomerResponse createCustomer(@RequestBody CreateCustomerRequest
Customer customer = customerService.createCustomer(createCustomerRequest.getName(), createCustomerRequest.getCreditLimit());
return new CreateCustomerResponse(customer.getId());
}

@RequestMapping(value = "/export-snapshots", method = RequestMethod.POST)
public void exportSnapshots() {
customerService.exportSnapshots();

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

This should return the offsets of all of the CustomerSnapshotStartingOffsetEvent

}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ eventuateMavenRepoUrl=https://dl.bintray.com/eventuateio-oss/eventuate-maven-rel

springBootVersion=1.4.5.RELEASE
eventuateClientVersion=0.20.1.RELEASE
eventuateTramVersion=0.7.0.RELEASE
eventuateTramVersion=0.6.0-SNAPSHOT
dockerComposePluginVersion=0.4.5
version=0.1.0-SNAPSHOT
Original file line number Diff line number Diff line change
@@ -1,28 +1,68 @@
package io.eventuate.examples.tram.ordersandcustomers.orderhistory.backend;

import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerCreatedEvent;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.DomainEventHandlers;
import io.eventuate.tram.events.subscriber.DomainEventHandlersBuilder;
import com.google.common.collect.ImmutableList;
import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotEvent;
import io.eventuate.examples.tram.ordersandcustomers.commondomain.CustomerSnapshotStartingOffsetEvent;
import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import javax.annotation.PostConstruct;
import java.util.UUID;


public class CustomerHistoryEventConsumer {

@Autowired
private OrderHistoryViewService orderHistoryViewService;

@Value("${eventuatelocal.kafka.bootstrap.servers}")
private String kafkaBootstrapServers;

public DomainEventHandlers domainEventHandlers() {
return DomainEventHandlersBuilder
.forAggregateType("io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer")
.onEvent(CustomerCreatedEvent.class, this::customerCreatedEventHandler)
.build();
}
// @Autowired
// private EventuateKafkaConsumer eventuateKafkaConsumer;
// public DomainEventHandlers domainEventHandlers() {
// return DomainEventHandlersBuilder
// .forAggregateType("io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer")
// .onEvent(CustomerCreatedEvent.class, this::customerCreatedEventHandler)
// .build();
// }
//
// private void customerCreatedEventHandler(DomainEventEnvelope<CustomerCreatedEvent> domainEventEnvelope) {
// CustomerCreatedEvent customerCreatedEvent = domainEventEnvelope.getEvent();
// orderHistoryViewService.createCustomer(Long.parseLong(domainEventEnvelope.getAggregateId()),
// customerCreatedEvent.getName(), customerCreatedEvent.getCreditLimit());
// }

@PostConstruct
public void init() {
EventuateKafkaConsumer offsetConsumer = new EventuateKafkaConsumer(UUID.randomUUID().toString(), (offsetRecord, offsetCallback) -> {

This comment has been minimized.

Copy link
@cer

cer Jun 4, 2018

The offsets are returned from the /export API.

There needs to be a command tool for setting a subscription's offset (in Kafka).

Once the offsets have been set then you start the CustomerOrderHistoryViewService.

CustomerSnapshotStartingOffsetEvent snapshotStartingOffsetEvent = JSonMapper.fromJson(offsetRecord.value(),
CustomerSnapshotStartingOffsetEvent.class);

EventuateKafkaConsumer snapshotConsumer = new EventuateKafkaConsumer(UUID.randomUUID().toString(),
(snapshotRecord, snapshotCallback) -> {
CustomerSnapshotEvent customerSnapshotEvent = JSonMapper.fromJson(snapshotRecord.value(), CustomerSnapshotEvent.class);

orderHistoryViewService.createCustomer(customerSnapshotEvent.getId(),
customerSnapshotEvent.getName(),
customerSnapshotEvent.getCreditLimit());

offsetCallback.accept(null, null);
},
ImmutableList.of("CustomerSnapshot"),
kafkaBootstrapServers);

//TODO: fix consumer fix (error: No current assignment for partition CustomerSnapshot-1)
// snapshotConsumer.setTopicPartitionOffsets(ImmutableMap.of(new TopicPartition(snapshotStartingOffsetEvent.getTopic(),
// snapshotStartingOffsetEvent.getPartition()), snapshotStartingOffsetEvent.getOffset()));

snapshotConsumer.start();

This comment has been minimized.

Copy link
@dartartem

dartartem Jun 4, 2018

Author Owner

This should be stopped.
But consumer should for example snapshot count.
It is possible to count them at message sending.


offsetCallback.accept(null, null);
}, ImmutableList.of("CustomerSnapshotStartingOffset"), kafkaBootstrapServers);

private void customerCreatedEventHandler(DomainEventEnvelope<CustomerCreatedEvent> domainEventEnvelope) {
CustomerCreatedEvent customerCreatedEvent = domainEventEnvelope.getEvent();
orderHistoryViewService.createCustomer(Long.parseLong(domainEventEnvelope.getAggregateId()),
customerCreatedEvent.getName(), customerCreatedEvent.getCreditLimit());
offsetConsumer.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public CustomerHistoryEventConsumer customerHistoryEventConsumer() {
return new CustomerHistoryEventConsumer();
}

@Bean("customerHistoryDomainEventDispatcher")
public DomainEventDispatcher customerHistoryDomainEventDispatcher(CustomerHistoryEventConsumer customerHistoryEventConsumer,
MessageConsumer messageConsumer) {

return new DomainEventDispatcher("customerHistoryServiceEvents",
customerHistoryEventConsumer.domainEventHandlers(), messageConsumer);
}
// @Bean("customerHistoryDomainEventDispatcher")
// public DomainEventDispatcher customerHistoryDomainEventDispatcher(CustomerHistoryEventConsumer customerHistoryEventConsumer,
// MessageConsumer messageConsumer) {
//
// return new DomainEventDispatcher("customerHistoryServiceEvents",
// customerHistoryEventConsumer.domainEventHandlers(), messageConsumer);
// }
}

0 comments on commit f7256f7

Please sign in to comment.