Skip to content

Commit

Permalink
allow to have a dedicated key for headers content
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Tych <[email protected]>
  • Loading branch information
ttych committed Nov 13, 2023
1 parent ea0f10a commit 71b2806
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 16 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Consume events by single consumer.
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>

Expand Down Expand Up @@ -122,6 +124,7 @@ Consume events by kafka consumer group features..
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
Expand Down Expand Up @@ -159,6 +162,7 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
Expand Down
21 changes: 20 additions & 1 deletion lib/fluent/plugin/in_kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class Fluent::KafkaInput < Fluent::Input
config_param :kafka_message_key, :string, :default => nil,
:desc => "Set kafka's message key to this field"

config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"

# Kafka#fetch_messages options
config_param :max_bytes, :integer, :default => nil,
:desc => "Maximum number of bytes to fetch."
Expand Down Expand Up @@ -235,6 +240,8 @@ def start
@record_time_key,
@tag_source,
@record_tag_key,
@add_headers,
@headers_key,
opt)
}
@topic_watchers.each {|tw|
Expand All @@ -259,7 +266,7 @@ def run
end

class TopicWatcher < Coolio::TimerWatcher
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={})
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, add_headers, headers_key, options={})
@topic_entry = topic_entry
@kafka = kafka
@callback = method(:consume)
Expand All @@ -274,6 +281,8 @@ def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, off
@record_time_key = record_time_key
@tag_source = tag_source
@record_tag_key = record_tag_key
@add_headers = add_headers
@headers_key = headers_key

@next_offset = @topic_entry.offset
if @topic_entry.offset == -1 && offset_manager
Expand Down Expand Up @@ -332,6 +341,16 @@ def consume
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
headers_record[k] = v
}
end
es.add(record_time, record)
rescue => e
$log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
Expand Down
20 changes: 16 additions & 4 deletions lib/fluent/plugin/in_kafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Fluent::KafkaGroupInput < Fluent::Input
:desc => "For 'text' format only."
config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"
config_param :add_prefix, :string, :default => nil,
:desc => "Tag prefix (Optional)"
config_param :add_suffix, :string, :default => nil,
Expand Down Expand Up @@ -259,7 +261,7 @@ def reconnect_consumer
end

def process_batch_with_record_tag(batch)
es = {}
es = {}
batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
Expand All @@ -285,8 +287,13 @@ def process_batch_with_record_tag(batch)
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es[tag].add(record_time, record)
Expand Down Expand Up @@ -332,8 +339,13 @@ def process_batch(batch)
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es.add(record_time, record)
Expand All @@ -355,7 +367,7 @@ def run
if @tag_source == :record
process_batch_with_record_tag(batch)
else
process_batch(batch)
process_batch(batch)
end
}
rescue ForShutdown
Expand Down
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input
:desc => "For 'text' format only."
config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"
config_param :add_prefix, :string, :default => nil,
:desc => "Tag prefix (Optional)"
config_param :add_suffix, :string, :default => nil,
Expand Down Expand Up @@ -254,8 +256,13 @@ def run
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es.add(record_time, record)
Expand Down
60 changes: 59 additions & 1 deletion test/plugin/test_in_kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf)
end


def test_configure
d = create_driver
assert_equal TOPIC_NAME, d.instance.topics
Expand Down Expand Up @@ -63,4 +62,63 @@ def test_consume
assert_equal expected, d.events[0][2]
end
end

class ConsumeWithHeadersTest < self
CONFIG_TEMPLATE = %[
@type kafka
brokers localhost:9092
format text
@label @kafka
topics %<topic>s
%<conf_adds>s
].freeze

def topic_random
"kafka-input-#{SecureRandom.uuid}"
end

def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE)
kafka = Kafka.new(['localhost:9092'], client_id: 'kafka')
producer = kafka.producer(required_acks: 1)

config = format(conf_template, topic: topic, conf_adds: conf_adds)
driver = create_driver(config)

yield topic, producer, driver
ensure
kafka.delete_topic(topic)
kafka.close
end

def test_with_headers_content_merged_into_record
conf_adds = 'add_headers true'
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'header1' => 'content1' }
assert_equal expected, driver.events[0][2]
end
end

def test_with_headers_content_merged_under_dedicated_key
conf_adds = %(
add_headers true
headers_key kafka_headers
)
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'kafka_headers' => { 'header1' => 'content1' } }
assert_equal expected, driver.events[0][2]
end
end
end
end
70 changes: 61 additions & 9 deletions test/plugin/test_in_kafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::KafkaGroupInput).configure(conf)
end


def test_configure
d = create_driver
assert_equal [TOPIC_NAME], d.instance.topics
Expand All @@ -48,14 +47,6 @@ def teardown
end

def test_consume
conf = %[
@type kafka
brokers localhost:9092
format text
@label @kafka
refresh_topic_interval 0
topics #{TOPIC_NAME}
]
d = create_driver

d.run(expect_records: 1, timeout: 10) do
Expand All @@ -66,4 +57,65 @@ def test_consume
assert_equal expected, d.events[0][2]
end
end

class ConsumeWithHeadersTest < self
CONFIG_TEMPLATE = %(
@type kafka
brokers localhost:9092
consumer_group fluentd
format text
refresh_topic_interval 0
@label @kafka
topics %<topic>s
%<conf_adds>s
).freeze

def topic_random
"kafka-input-#{SecureRandom.uuid}"
end

def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE)
kafka = Kafka.new(['localhost:9092'], client_id: 'kafka')
producer = kafka.producer(required_acks: 1)

config = format(conf_template, topic: topic, conf_adds: conf_adds)
driver = create_driver(config)

yield topic, producer, driver
ensure
kafka.delete_topic(topic)
kafka.close
end

def test_with_headers_content_merged_into_record
conf_adds = 'add_headers true'
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'header1' => 'content1' }
assert_equal expected, driver.events[0][2]
end
end

def test_with_headers_content_merged_under_dedicated_key
conf_adds = %(
add_headers true
headers_key kafka_headers
)
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'kafka_headers' => { 'header1' => 'content1' } }
assert_equal expected, driver.events[0][2]
end
end
end
end

0 comments on commit 71b2806

Please sign in to comment.