Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

[Vorbereitung] Do not use this! #2

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions account-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,20 @@
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package de.sample.schulung.accounts.kafka;

import java.util.UUID;

/*
* This is the Kafka API.
*/
public record CustomerEventKafkaDto(
String eventType,
UUID uuid,
CustomerKafkaDto customer
) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import de.sample.schulung.accounts.domain.events.CustomerDeletedEvent;
import de.sample.schulung.accounts.domain.events.CustomerReplacedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class CustomerEventKafkaDtoMapper {

private final CustomerKafkaDtoMapper customerDtoMapper;

public CustomerEventKafkaDto map(CustomerCreatedEvent event) {
return new CustomerEventKafkaDto(
"CREATED",
event.customer().getUuid(),
customerDtoMapper.map(event.customer())
);
}

public CustomerEventKafkaDto map(CustomerReplacedEvent event) {
return new CustomerEventKafkaDto(
"REPLACED",
event.customer().getUuid(),
customerDtoMapper.map(event.customer())
);
}

public CustomerEventKafkaDto map(CustomerDeletedEvent event) {
return new CustomerEventKafkaDto(
"DELETED",
event.uuid(),
null
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import de.sample.schulung.accounts.kafka.interceptor.KafkaProducer;
import de.sample.schulung.accounts.kafka.interceptor.KafkaRecord;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
@RequiredArgsConstructor
public class CustomerEventsProducer {

private final CustomerEventKafkaDtoMapper eventDtoMapper;

@EventListener
@KafkaProducer(topic = KafkaConstants.DEFAULT_CUSTOMER_EVENTS_TOPIC)
public KafkaRecord<UUID, CustomerEventKafkaDto> handle(CustomerCreatedEvent event) {
return new KafkaRecord<>(
event.customer().getUuid(),
eventDtoMapper.map(event)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.sample.schulung.accounts.kafka;

import lombok.Getter;
import lombok.Setter;

import java.time.LocalDate;

@Getter
@Setter
public class CustomerKafkaDto {

private String name;
private LocalDate dateOfBirth;
private String state;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.Customer;
import org.mapstruct.Mapper;

@Mapper(componentModel = "spring")
public interface CustomerKafkaDtoMapper {

CustomerKafkaDto map(Customer source);

default String mapState(Customer.CustomerState source) {
return switch (source) {
case ACTIVE -> "active";
case LOCKED -> "locked";
case DISABLED -> "disabled";
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package de.sample.schulung.accounts.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfiguration {

@Bean
public NewTopic customerEventsTopic() {
return TopicBuilder
.name(KafkaConstants.DEFAULT_CUSTOMER_EVENTS_TOPIC)
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import lombok.experimental.UtilityClass;

@UtilityClass
public class KafkaConstants {

public final String DEFAULT_CUSTOMER_EVENTS_TOPIC = "customer-events-vorlage";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package de.sample.schulung.accounts.kafka.interceptor;

import java.lang.annotation.*;

/**
* Annotate a method to get the return value
* sent to a Kafka topic. The method can return a simple object that is then sent as a value
* without a key, or it can return an instance of {@link KafkaRecord}.
* If the method returns <tt>null</tt> (or has a void return type), no message is produced.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface KafkaProducer {

/**
* The name of the topic.
*
* @return the name of the topic
*/
String topic();

/**
* The partition. Leave empty, if the Partitioner should do the job.
*
* @return the partition
*/
int partition() default -1;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package de.sample.schulung.accounts.kafka.interceptor;

import org.aopalliance.intercept.MethodInterceptor;
import org.springframework.aop.Pointcut;
import org.springframework.aop.framework.autoproxy.AbstractBeanFactoryAwareAdvisingPostProcessor;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("unused")
public class KafkaProducerInterceptor
extends AbstractBeanFactoryAwareAdvisingPostProcessor
implements InitializingBean {

private final MethodInterceptor advice;

public KafkaProducerInterceptor(final KafkaTemplate<Object, Object> kafkaTemplate) {
this.advice = invocation -> {
final var result = invocation.proceed();
if (result == null) {
return null;
}
final var annotation = AnnotationUtils.findAnnotation(invocation.getMethod(), KafkaProducer.class);
if (annotation != null) {
final Object key;
final Object value;
if (result instanceof KafkaRecord<?, ?> r) {
key = r.key();
value = r.value();
} else {
key = null;
value = result;
}
//noinspection DataFlowIssue
kafkaTemplate.send(
annotation.topic(),
annotation.partition() < 0 ? null : annotation.partition(),
key,
value
);
}
return result;
};
}

@Override
public void afterPropertiesSet() {
Pointcut pointcut = new AnnotationMatchingPointcut(null, KafkaProducer.class, true);
this.advisor = new DefaultPointcutAdvisor(pointcut, advice);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package de.sample.schulung.accounts.kafka.interceptor;

/**
* Return a KafkaRecord from a {@link KafkaProducer} method
* to get a key and a value sent.
*/
public record KafkaRecord<K, V>(
K key,
V value
) {

}
12 changes: 11 additions & 1 deletion account-service-provider/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ spring:
console:
path: /db
enabled: true
kafka:
admin:
auto-create: true
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # Jackson
properties:
# do not serialize the class name into the message
"[spring.json.add.type.headers]": false
application:
customers:
initialization:
enabled: ${CUSTOMERS_INITIALIZATION_ENABLED:true}
enabled: ${CUSTOMERS_INITIALIZATION_ENABLED:false}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -18,6 +19,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
class AccountsApiTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -15,6 +16,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class IndexPageTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -28,6 +29,7 @@
)
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class AccountsBoundaryTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -15,6 +16,7 @@
}
)
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class CustomersInitializerTests {

@MockBean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -12,6 +13,7 @@

@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class CustomersServiceTest {

@Autowired
Expand Down
Loading