-
Notifications
You must be signed in to change notification settings - Fork 1
Kafka
HOA PHAN edited this page Jan 30, 2024
·
10 revisions
https://www.youtube.com/watch?v=li2aowPnezA
https://developer.confluent.io/learn/kraft/
https://www.youtube.com/watch?v=6YL0L4lb9iM
https://www.youtube.com/watch?v=uXEYuDwm7e4
https://www.confluent.io/blog/5-common-pitfalls-when-using-apache-kafka
https://docs.rackspace.com/blog/Apache-Kafka-Client-Benchmarks/
from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib
import re
if __name__ == '__main__':
# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
config_file = args.config_file
topic = args.topic
conf = ccloud_lib.read_ccloud_config(config_file)
# Create Producer instance
producer = Producer({
'bootstrap.servers': conf['bootstrap.servers'],
'sasl.mechanisms': conf['sasl.mechanisms'],
'security.protocol': conf['security.protocol'],
'sasl.username': conf['sasl.username'],
'sasl.password': conf['sasl.password'],
})
# Create topic if needed
ccloud_lib.create_topic(conf, topic)
delivered_records = 0
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or
# permanently failed delivery (after retries).
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
file1 = open('events', 'r')
events = [line.rstrip('|\n').lstrip('|') for line in file1.readlines()]
for event in events:
print("Producing record: {}\t{}".format(event, event))
producer.produce(topic, key=event, value=event, on_delivery=acked)
producer.poll(0)
producer.flush()
print("{} messages were produced to topic {}!".format(delivered_records, topic))
package tut.kafka.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
private final BullshitService service;
@Autowired
private final KafkaListenerEndpointRegistry registry;
private MessageListenerContainer container = null;
private static final int LIMIT_INFLIGHT = 10;
public Consumer(
BullshitService service,
KafkaListenerEndpointRegistry registry
) {
this.service = service;
this.registry = registry;
}
@KafkaListener(topics = "stupidTopic", groupId = "hoa05nov", id = "stupidContainer")
public void listen(ConsumerRecord<?, ?> cr) {
logger.info(cr.toString());
service.process(cr.toString());
}
private void initContainer() {
registry.getAllListenerContainers().stream()
.filter(c -> "stupidContainer".equals(c.getListenerId()))
.findAny()
.ifPresent(c -> container = c);
}
@Scheduled(fixedDelay = 1000)
public synchronized void regulate() {
service.size();
if (null == container) {
initContainer();
} else {
if (service.size() > LIMIT_INFLIGHT) {
if(!container.isPauseRequested() && !container.isContainerPaused()){
logger.info("PAUSING....");
container.pause();
while(!container.isContainerPaused()){}
logger.info("PAUSED!");
}
} else {
if(container.isContainerPaused()){
logger.info("RESUMING....");
container.resume();
while(container.isContainerPaused()){}
logger.info("RESUMED!");
}
}
}
}
}
java -cp .:reactor-kafka-tools-1.3.18-SNAPSHOT.jar:argparse4j-0.7.0.jar reactor.kafka.tools.perf.ProducerPerformance
help
java -cp .:reactor-kafka-tools-1.3.18-SNAPSHOT.jar:argparse4j-0.7.0.jar reactor.kafka.tools.perf.ProducerPerformance
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS --record-size RECORD-SIZE --throughput THROUGHPUT --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] [--reactive REACTIVE]
[--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION]
This tool is used to verify the producer performance.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--record-size RECORD-SIZE
message size in bytes
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT messages/sec
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.servers,client.id etc..
--reactive REACTIVE if true, use reactive API (default: true)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)