Skip to content
HOA PHAN edited this page Jan 28, 2024 · 10 revisions

Zookeeper

https://developer.confluent.io/learn/kraft/

KRaft

https://www.youtube.com/watch?v=6YL0L4lb9iM

Pitfall

https://www.confluent.io/blog/5-common-pitfalls-when-using-apache-kafka

Clients diff

https://docs.rackspace.com/blog/Apache-Kafka-Client-Benchmarks/

Python Producer

from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib
import re

if __name__ == '__main__':
    # Read arguments and configurations and initialize
    args = ccloud_lib.parse_args()
    config_file = args.config_file
    topic = args.topic
    conf = ccloud_lib.read_ccloud_config(config_file)

    # Create Producer instance
    producer = Producer({
        'bootstrap.servers': conf['bootstrap.servers'],
        'sasl.mechanisms': conf['sasl.mechanisms'],
        'security.protocol': conf['security.protocol'],
        'sasl.username': conf['sasl.username'],
        'sasl.password': conf['sasl.password'],
    })

    # Create topic if needed
    ccloud_lib.create_topic(conf, topic)

    delivered_records = 0

    # Optional per-message on_delivery handler (triggered by poll() or flush())
    # when a message has been successfully delivered or
    # permanently failed delivery (after retries).
    def acked(err, msg):
        global delivered_records
        """Delivery report handler called on
        successful or failed delivery of message
        """
        if err is not None:
            print("Failed to deliver message: {}".format(err))
        else:
            delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}"
                  .format(msg.topic(), msg.partition(), msg.offset()))
    file1 = open('events', 'r') 
    events = [line.rstrip('|\n').lstrip('|') for line in file1.readlines()]
    
    for event in events:
        print("Producing record: {}\t{}".format(event, event))
        producer.produce(topic, key=event, value=event, on_delivery=acked)
        producer.poll(0)        
    producer.flush()

    print("{} messages were produced to topic {}!".format(delivered_records, topic))

Java Consumer

package tut.kafka.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class Consumer {
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    @Autowired
    private final BullshitService service;
    @Autowired
    private final KafkaListenerEndpointRegistry registry;
    private MessageListenerContainer container = null;
    private static final int LIMIT_INFLIGHT = 10;

    public Consumer(
            BullshitService service,
            KafkaListenerEndpointRegistry registry
    ) {
        this.service = service;
        this.registry = registry;
    }

    @KafkaListener(topics = "stupidTopic", groupId = "hoa05nov", id = "stupidContainer")
    public void listen(ConsumerRecord<?, ?> cr) {
        logger.info(cr.toString());
        service.process(cr.toString());
    }

    private void initContainer() {
        registry.getAllListenerContainers().stream()
                .filter(c -> "stupidContainer".equals(c.getListenerId()))
                .findAny()
                .ifPresent(c -> container = c);

    }

    @Scheduled(fixedDelay = 1000)
    public synchronized void regulate() {
        service.size();
        if (null == container) {
            initContainer();
        } else {
            if (service.size() > LIMIT_INFLIGHT) {
                if(!container.isPauseRequested() && !container.isContainerPaused()){
                    logger.info("PAUSING....");
                    container.pause();
                    while(!container.isContainerPaused()){}
                    logger.info("PAUSED!");
                }
            } else {
                if(container.isContainerPaused()){
                    logger.info("RESUMING....");
                    container.resume();
                    while(container.isContainerPaused()){}
                    logger.info("RESUMED!");
                }
            }
        }
    }
}

Reactor Kafka

Kafka tool:

java -cp .:reactor-kafka-tools-1.3.18-SNAPSHOT.jar:argparse4j-0.7.0.jar reactor.kafka.tools.perf.ProducerPerformance

help

java -cp .:reactor-kafka-tools-1.3.18-SNAPSHOT.jar:argparse4j-0.7.0.jar reactor.kafka.tools.perf.ProducerPerformance

usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS --record-size RECORD-SIZE --throughput THROUGHPUT --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] [--reactive REACTIVE]
                            [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION]

This tool is used to verify the producer performance.

optional arguments:
  -h, --help             show this help message and exit
  --topic TOPIC          produce messages to this topic
  --num-records NUM-RECORDS
                         number of messages to produce
  --record-size RECORD-SIZE
                         message size in bytes
  --throughput THROUGHPUT
                         throttle maximum message throughput to *approximately* THROUGHPUT messages/sec
  --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
                         kafka producer related configuration properties like bootstrap.servers,client.id etc..
  --reactive REACTIVE    if true, use reactive API (default: true)
  --transactional-id TRANSACTIONAL-ID
                         The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id)
  --transaction-duration-ms TRANSACTION-DURATION
                         The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)

eye

Clone this wiki locally