Python Producer

from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib
import re

if __name__ == '__main__':
    # Read arguments and configurations and initialize
    args = ccloud_lib.parse_args()
    config_file = args.config_file
    topic = args.topic
    conf = ccloud_lib.read_ccloud_config(config_file)

    # Create Producer instance
    producer = Producer({
        'bootstrap.servers': conf['bootstrap.servers'],
        'sasl.mechanisms': conf['sasl.mechanisms'],
        'security.protocol': conf['security.protocol'],
        'sasl.username': conf['sasl.username'],
        'sasl.password': conf['sasl.password'],

    # Create topic if needed
    ccloud_lib.create_topic(conf, topic)

    delivered_records = 0

    # Optional per-message on_delivery handler (triggered by poll() or flush())
    # when a message has been successfully delivered or
    # permanently failed delivery (after retries).
    def acked(err, msg):
        global delivered_records
        """Delivery report handler called on
        successful or failed delivery of message
        if err is not None:
            print("Failed to deliver message: {}".format(err))
            delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}"
                  .format(msg.topic(), msg.partition(), msg.offset()))
    file1 = open('events', 'r') 
    events = [line.rstrip('|\n').lstrip('|') for line in file1.readlines()]
    for event in events:
        print("Producing record: {}\t{}".format(event, event))
        producer.produce(topic, key=event, value=event, on_delivery=acked)

    print("{} messages were produced to topic {}!".format(delivered_records, topic))

Reactor Kafka

Kafka tool:

java -cp .:reactor-kafka-tools-1.3.18-SNAPSHOT.jar:argparse4j-0.7.0.jar


usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS --record-size RECORD-SIZE --throughput THROUGHPUT --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] [--reactive REACTIVE]
                            [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION]

This tool is used to verify the producer performance.

optional arguments:
  -h, --help             show this help message and exit
  --topic TOPIC          produce messages to this topic
  --num-records NUM-RECORDS
                         number of messages to produce
  --record-size RECORD-SIZE
                         message size in bytes
  --throughput THROUGHPUT
                         throttle maximum message throughput to *approximately* THROUGHPUT messages/sec
                         kafka producer related configuration properties like bootstrap.servers, etc..
  --reactive REACTIVE    if true, use reactive API (default: true)
  --transactional-id TRANSACTIONAL-ID
                         The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id)
  --transaction-duration-ms TRANSACTION-DURATION
                         The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)


