Skip to content

Commit

Permalink
FIX : modification variables configuration kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-maraval committed Jul 9, 2024
1 parent e3f71c8 commit e536ed8
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 45 deletions.
17 changes: 4 additions & 13 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,12 @@
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.concurrency.nbThread}")
private int nbThread;

@Value("${spring.kafka.consumer.bootstrap-servers}")
@Value("${abes.kafka.bootstrap-servers}")
private String bootstrapAddress;


@Value("${spring.kafka.consumer.properties.isolation.level}")
private String isolationLevel;

@Value("${spring.kafka.registry.url}")
@Value("${abes.kafka.registry.url}")
private String registryUrl;

@Value("${spring.kafka.auto.register.schema}")
private boolean autoRegisterSchema;

@Bean
public ConsumerFactory<String, String> consumerKbartFactory() {
Expand All @@ -47,7 +38,7 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
Expand All @@ -70,7 +61,7 @@ public Map<String, Object> producerConfigsWithTransaction() {
//props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, autoRegisterSchema);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
//props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
return props;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema
*
* @param ligneKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}")
public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
String filename = ligneKbart.key();
if (!this.workInProgress.containsKey(filename)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
@Repository
@BaconDbConfiguration
public interface LigneKbartRepository extends JpaRepository<LigneKbart, Integer> {

@Transactional
void deleteAllByIdProviderPackage(Integer idProviderPackage);
}
14 changes: 6 additions & 8 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.auto.register.schema=false
spring.kafka.concurrency.nbThread=
abes.kafka.bootstrap-servers=
abes.kafka.registry.url=
abes.kafka.concurrency.nbThread=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down Expand Up @@ -41,8 +42,5 @@ spring.sql.bacon.init.mode=never
mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors

logging.level.root=info
logging.level.fr.abes.bestppn=debug
16 changes: 7 additions & 9 deletions src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.auto.register.schema=false
spring.kafka.concurrency.nbThread=
abes.kafka.bootstrap-servers=
abes.kafka.registry.url=
abes.kafka.concurrency.nbThread=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down Expand Up @@ -38,7 +39,4 @@ spring.sql.bacon.init.mode=never

# Mailing
mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors
mail.ws.recipient=
14 changes: 6 additions & 8 deletions src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.auto.register.schema=false
spring.kafka.concurrency.nbThread=
abes.kafka.bootstrap-servers=
abes.kafka.registry.url=
abes.kafka.concurrency.nbThread=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down Expand Up @@ -39,6 +40,3 @@ spring.sql.bacon.init.mode=never
# Mailing
mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.errors=errors
5 changes: 0 additions & 5 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ server.port=8083
log4j2.logdir=logs
logging.config=classpath:log4j2-all.xml

# Common Kafka Properties
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.properties.isolation.level=read_committed


# Topic Kafka
topic.name.target.kbart=bacon.kbart.withppn.toload
Expand Down

0 comments on commit e536ed8

Please sign in to comment.