From 72756d731f7281141049def47b12dd53f9de0337 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 13 Aug 2018 16:25:10 +0100 Subject: [PATCH] Header support for C++ API --- examples/kafkatest_verifiable_client.cpp | 40 +++- src-cpp/CMakeLists.txt | 1 + src-cpp/HeadersImpl.cpp | 49 +++++ src-cpp/Makefile | 2 +- src-cpp/ProducerImpl.cpp | 12 +- src-cpp/rdkafkacpp.h | 135 ++++++++++++- src-cpp/rdkafkacpp_int.h | 148 ++++++++++++++- tests/0054-offset_time.cpp | 2 +- tests/0057-invalid_topic.cpp | 4 +- tests/0059-bsearch.cpp | 2 +- tests/0065-yield.cpp | 2 +- tests/0070-null_empty.cpp | 2 +- tests/0085-headers.cpp | 219 ++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 3 + win32/librdkafkacpp/librdkafkacpp.vcxproj | 1 + win32/tests/tests.vcxproj | 1 + 17 files changed, 594 insertions(+), 30 deletions(-) create mode 100644 src-cpp/HeadersImpl.cpp create mode 100644 tests/0085-headers.cpp diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index 542ef823f6..4c9e1e1a64 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -457,6 +457,17 @@ void msg_consume(RdKafka::KafkaConsumer *consumer, " [" << (int)msg->partition() << "] at offset " << msg->offset() << std::endl; + RdKafka::Headers *headers = msg->get_headers(); + if (headers) { + std::vector sheaders = headers->get_all(); + std::cout << "Headers length: " << sheaders.size() << std::endl; + for(std::vector::const_iterator it = sheaders.begin(); + it != sheaders.end(); + it++) { + std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl; + } + } + if (state.maxMessages >= 0 && state.consumer.consumedMessages >= state.maxMessages) return; @@ -831,31 +842,42 @@ int main (int argc, char **argv) { msg << value_prefix << i; while (true) { RdKafka::ErrorCode resp; - if (create_time == -1) { - resp = producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(msg.str().c_str()), - msg.str().size(), NULL, NULL); - } else { - resp = producer->produce(topics[0], partition, + RdKafka::Headers *headers = 0; + if (create_time == -1) { + resp = producer->produce(topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast(msg.str().c_str()), + msg.str().size(), NULL, NULL); + } else { + std::string name = "kafkaheader"; + std::string val = "header_val"; + std::vector headers_arr; + headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str())); + + headers = RdKafka::Headers::create(headers_arr, false); + resp = producer->produce(topics[0], partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(msg.str().c_str()), msg.str().size(), NULL, 0, create_time, - NULL); - } + NULL, headers); + } if (resp == RdKafka::ERR__QUEUE_FULL) { producer->poll(100); continue; } else if (resp != RdKafka::ERR_NO_ERROR) { + headers->destroy_headers(); errorString("producer_send_error", RdKafka::err2str(resp), topic->name(), NULL, msg.str()); state.producer.numErr++; } else { state.producer.numSent++; } + if (headers) { + delete headers; + } break; } diff --git a/src-cpp/CMakeLists.txt b/src-cpp/CMakeLists.txt index f3deebf28b..18a7c49b82 100644 --- a/src-cpp/CMakeLists.txt +++ b/src-cpp/CMakeLists.txt @@ -5,6 +5,7 @@ set( ConfImpl.cpp ConsumerImpl.cpp HandleImpl.cpp + HeadersImpl.cpp KafkaConsumerImpl.cpp MessageImpl.cpp MetadataImpl.cpp diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp new file mode 100644 index 0000000000..27a655a612 --- /dev/null +++ b/src-cpp/HeadersImpl.cpp @@ -0,0 +1,49 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include + +#include "rdkafkacpp_int.h" + +RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) { + return new RdKafka::HeadersImpl(initial_count, free_rd_headers); +} + +RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers, bool free_rd_headers) { + if (headers.size() > 0) { + return new RdKafka::HeadersImpl(headers, free_rd_headers); + } else { + return 0; + } + +} + +RdKafka::Headers::~Headers() {} diff --git a/src-cpp/Makefile b/src-cpp/Makefile index c55007deb1..5a41ed51a2 100644 --- a/src-cpp/Makefile +++ b/src-cpp/Makefile @@ -5,7 +5,7 @@ LIBVER= 1 CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \ ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \ TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \ - QueueImpl.cpp MetadataImpl.cpp + HeadersImpl.cpp QueueImpl.cpp MetadataImpl.cpp HDRS= rdkafkacpp.h diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index 456bc33787..cd6a1e4838 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -143,14 +143,19 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic, } - RdKafka::ErrorCode RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque) { + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers) { + rd_kafka_headers_t *hdrs; + if (headers) { + hdrs = headers->c_headers(); + } else { + hdrs = 0; + } return static_cast ( @@ -162,6 +167,7 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END) ); } diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index fa7a5014d0..16acc1238f 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -75,11 +75,11 @@ extern "C" { struct rd_kafka_s; struct rd_kafka_topic_s; struct rd_kafka_message_s; -} + struct rd_kafka_headers_s; +}; namespace RdKafka { - /** * @name Miscellaneous APIs * @{ @@ -448,6 +448,7 @@ std::string err2str(RdKafka::ErrorCode err); /* Forward declarations */ class Producer; class Message; +class Headers; class Queue; class Event; class Topic; @@ -1449,7 +1450,130 @@ class RD_EXPORT MessageTimestamp { int64_t timestamp; /**< Milliseconds since epoch (UTC). */ }; +/** + * @brief Headers object + * + * This object encapsulates the C implementation logic into a C++ object + * for use in RdKafka::Messages object. + */ +class RD_EXPORT Headers { + public: + virtual ~Headers() = 0; + + /** + * @brief Header object + * + * This object represents a single Header with key value pair + * and an ErrorCode + * + * @remark dynamic allocation of this object is not supported. + * + */ + class Header { + public: + Header(const std::string& key, + const char* value, + RdKafka::ErrorCode err = ERR_NO_ERROR): + key(key), err(err) { + // Safe managed copy of the value preserving the bytes + value_container_ = value; + this->value = value_container_.c_str(); + }; + + std::string key; + const char* value; + RdKafka::ErrorCode err; + private: + std::string value_container_; + void *operator new(size_t); /* Prevent dynamic allocation */ + }; + + /** + * @brief create a new instance of the Headers object + */ + static Headers *create(size_t initial_size = 8, bool free_rd_headers = true); + + /** + * @brief create a new instance of the Headers object from a std::vector + */ + static Headers *create(const std::vector
&headers, bool free_rd_headers = true); + /** + * @brief adds a Header to the end + * + * @returns An ErrorCode signalling a success or failure to add the Header. + */ + virtual ErrorCode add(const std::string& key, const char* value) = 0; + + /** + * @brief removes all the Headers of a given key + * + * @returns An ErrorCode signalling a success or failure to remove the Header. + */ + virtual ErrorCode remove(const std::string& key) = 0; + + /** + * @brief gets all of the Headers of a given key + * + * @returns a std::vector containing all the Headers of the given key. + */ + virtual std::vector
get(const std::string &key) const = 0; + + /** + * @brief gets the last occurrence of a Header of a given key + * + * @returns the Header if found, otherwise a Header with an ErrorCode + */ + virtual Header get_last(const std::string& key) const = 0; + + /** + * @brief returns all the Headers of a Message + * + * @returns a std::vector containing all of the Headers of a message + */ + virtual std::vector
get_all() const = 0; + + /** + * @brief the count of all the Headers + * + * @returns a size_t count of all the headers + */ + virtual size_t size() const = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_headers_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ API + * does not provide the underlying functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer can be different than the lifetime + * of the Headers message due to how the producev function in the C API works + * if there is no error then the producev will take care of deallocation + * but if there is an error then it is the responsibility of the calling + * object to deallocate the underlying C implementation if an instance + * of the Headers object is created with free_rd_headers set to `false` + * + * @remark Include prior to including + * + * + * @returns \c rd_kafka_headers_t* + */ + virtual struct rd_kafka_headers_s *c_headers() = 0; + + /** + * @brief cleans up the underlying alocated C implementation headers if called + * + * @remark Safe to call even if the Headers object is set to clean up when + * when the destructor is called + * + * @remark Safe to call even if the underlyng C pointer is set to null + * + * @returns an ErrorCode signalling if the the operation was attempted + */ + virtual ErrorCode destroy_headers() = 0; +}; /** * @brief Message object @@ -1531,6 +1655,9 @@ class RD_EXPORT Message { /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ virtual void *msg_opaque () const = 0; + /** @returns The Headers instance for this Message (if applicable) */ + virtual RdKafka::Headers *get_headers() = 0; + virtual ~Message () = 0; /** @returns the latency in microseconds for a produced message measured @@ -2257,8 +2384,8 @@ class RD_EXPORT Producer : public virtual Handle { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque) = 0; + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers) = 0; /** diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 60e5a0f8f8..8c4f9d4846 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -48,7 +48,6 @@ typedef int mode_t; namespace RdKafka { - void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque); void log_cb_trampoline (const rd_kafka_t *rk, int level, const char *fac, const char *buf); @@ -120,6 +119,122 @@ class EventImpl : public Event { bool fatal_; }; +class HeadersImpl : public Headers { + public: + HeadersImpl (size_t initial_size, bool free_rd_headers): + headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + + HeadersImpl (rd_kafka_headers_t *headers): + headers_ (headers), free_headers_ (false) {}; + + HeadersImpl (const std::vector
&headers, bool free_rd_headers): + free_headers_ (free_rd_headers) { + if (headers.size() > 0) { + headers_ = rd_kafka_headers_new(headers.size()); + from_vector(headers); + } else { + headers_ = rd_kafka_headers_new(8); + } + } + + ~HeadersImpl() { + if(free_headers_ && headers_) { + rd_kafka_headers_destroy(headers_); + } + } + + ErrorCode add(const std::string& key, const char* value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value, strlen(value)); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_remove (headers_, key.c_str()); + return static_cast(err); + } + + std::vector get(const std::string &key) const { + std::vector headers; + const void *value; + size_t size; + rd_kafka_resp_err_t err; + for (size_t idx = 0; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + idx++) { + if (value) { + const char* casted_value = static_cast(value); + headers.push_back(Headers::Header(key, casted_value)); + } + } + return headers; + } + + Headers::Header get_last(const std::string& key) const { + const void *value; + size_t size; + rd_kafka_resp_err_t err; + err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); + const char* casted_value = static_cast(value); + ErrorCode cpp_error = static_cast(err); + return Headers::Header(key, casted_value, cpp_error); + } + + std::vector get_all() const { + std::vector headers; + size_t idx = 0; + const char *name; + const void *valuep; + size_t size; + while (!rd_kafka_header_get_all(headers_, idx++, + &name, &valuep, &size)) { + if (valuep != NULL) { + const char* casted_value = static_cast(valuep); + headers.push_back(Headers::Header(name, casted_value)); + } + } + return headers; + } + + size_t size() const { + return rd_kafka_header_cnt(headers_); + } + + struct rd_kafka_headers_s* c_headers() { + return headers_; + } + + ErrorCode destroy_headers() { + if (headers_) { + rd_kafka_headers_destroy(headers_); + headers_ = 0; + return RdKafka::ERR_NO_ERROR; + } else { + return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; + } + } + + private: + void from_vector(const std::vector
&headers) { + if (headers.size() > 0) { + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); + it++) { + this->add(it->key, it->value); + } + } + } + + HeadersImpl(HeadersImpl const&) /*= delete*/; + HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; + + rd_kafka_headers_t* headers_; + bool free_headers_; +}; + class MessageImpl : public Message { public: @@ -128,17 +243,21 @@ class MessageImpl : public Message { rd_kafka_message_destroy(const_cast(rkmessage_)); if (key_) delete key_; + delete headers_; }; MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): - topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {} + topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) {} MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage, bool dofree): - topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { } + topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) {} MessageImpl (rd_kafka_message_t *rkmessage): - topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) { + topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) { if (rkmessage->rkt) { /* Possibly NULL */ topic_ = static_cast(rd_kafka_topic_opaque(rkmessage->rkt)); @@ -147,10 +266,11 @@ class MessageImpl : public Message { /* Create errored message */ MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err): - topic_(topic), free_rkmessage_(false), key_(NULL) { + topic_(topic), free_rkmessage_(false), key_(NULL), headers_(0) { rkmessage_ = &rkmessage_err_; memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); rkmessage_err_.err = static_cast(err); + } std::string errstr() const { @@ -205,6 +325,10 @@ class MessageImpl : public Message { return rd_kafka_message_latency(rkmessage_); } + Headers* get_headers() { + return headers_; + } + struct rd_kafka_message_s *c_ptr () { return rkmessage_; } @@ -220,8 +344,18 @@ class MessageImpl : public Message { * used as a place holder and rkmessage_ is set to point to it. */ rd_kafka_message_t rkmessage_err_; mutable std::string *key_; /* mutable because it's a cached value */ + RdKafka::Headers *headers_; private: + RdKafka::Headers* get_headers_from_rkmessage(rd_kafka_message_t *rkmessage) { + rd_kafka_headers_t *hdrsp; + rd_kafka_resp_err_t err; + + if (rkmessage->len > 0 && !(err = rd_kafka_message_headers(rkmessage, &hdrsp))) { + return new HeadersImpl(hdrsp); + } + return 0; + } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; MessageImpl& operator=(MessageImpl const&) /*= delete*/; @@ -915,8 +1049,8 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque); + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers); ErrorCode flush (int timeout_ms) { return static_cast(rd_kafka_flush(rk_, diff --git a/tests/0054-offset_time.cpp b/tests/0054-offset_time.cpp index b550f2a9a3..c1641887b0 100644 --- a/tests/0054-offset_time.cpp +++ b/tests/0054-offset_time.cpp @@ -104,7 +104,7 @@ static void test_offset_time (void) { for (int ti = 0 ; ti < timestamp_cnt*2 ; ti += 2) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, - timestamps[ti], NULL); + timestamps[ti], NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0057-invalid_topic.cpp b/tests/0057-invalid_topic.cpp index d95ada65c3..66cf9e14c6 100644 --- a/tests/0057-invalid_topic.cpp +++ b/tests/0057-invalid_topic.cpp @@ -85,13 +85,13 @@ static void test_invalid_topic (void) { for (int i = -1 ; i < 3 ; i++) { err = p->produce(topic_bad, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"bad", 4, NULL, 0, 0, NULL); + (void *)"bad", 4, NULL, 0, 0, NULL, NULL); if (err) /* Error is probably delayed until delivery report */ check_err(err, RdKafka::ERR_TOPIC_EXCEPTION); err = p->produce(topic_good, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"good", 5, NULL, 0, 0, NULL); + (void *)"good", 5, NULL, 0, 0, NULL, NULL); check_err(err, RdKafka::ERR_NO_ERROR); } diff --git a/tests/0059-bsearch.cpp b/tests/0059-bsearch.cpp index 20f598efef..8caf84497c 100644 --- a/tests/0059-bsearch.cpp +++ b/tests/0059-bsearch.cpp @@ -132,7 +132,7 @@ static void do_test_bsearch (void) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, timestamp, - i == 357 ? (void *)1 /*golden*/ : NULL); + i == 357 ? (void *)1 /*golden*/ : NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); timestamp += 100 + (timestamp % 9); diff --git a/tests/0065-yield.cpp b/tests/0065-yield.cpp index ffbf1c6d7a..2d6323d359 100644 --- a/tests/0065-yield.cpp +++ b/tests/0065-yield.cpp @@ -93,7 +93,7 @@ static void do_test_producer (bool do_yield) { for (int i = 0 ; i < msgcnt ; i++) { err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)"hi", 2, NULL, 0, 0, NULL); + (void *)"hi", 2, NULL, 0, 0, NULL, NULL); if (err) Test::Fail("produce() failed: " + RdKafka::err2str(err)); } diff --git a/tests/0070-null_empty.cpp b/tests/0070-null_empty.cpp index 68502f06d0..a0850b0854 100644 --- a/tests/0070-null_empty.cpp +++ b/tests/0070-null_empty.cpp @@ -111,7 +111,7 @@ static void do_test_null_empty (bool api_version_request) { (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0, /* Key */ (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0, - 0, NULL); + 0, NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp new file mode 100644 index 0000000000..36e35fa969 --- /dev/null +++ b/tests/0085-headers.cpp @@ -0,0 +1,219 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include "testcpp.h" + +class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb { + public: + int seen_commit; + int seen_stats; + + void offset_commit_cb (RdKafka::ErrorCode err, + std::vector&offsets) { + seen_commit++; + Test::Say("Got commit callback!\n"); + } + + void event_cb (RdKafka::Event &event) { + switch (event.type()) + { + case RdKafka::Event::EVENT_STATS: + Test::Say("Got stats callback!\n"); + seen_stats++; + break; + default: + break; + } + } +}; + +static void assert_all_headers_match(RdKafka::Headers *actual, + std::vector &expected) { + if (actual->size() != expected.size()) { + Test::Fail(tostr() << "Expected headers length to equal" + << expected.size() << " instead equals " << actual->size() << "\n"); + } + + std::vector actual_headers = actual->get_all(); + for(size_t i = 0; i < actual_headers.size(); i++) { + RdKafka::Headers::Header actual_header = actual_headers[i]; + RdKafka::Headers::Header expected_header = expected[i]; + std::string actual_key = actual_header.key; + std::string actual_value = std::string(actual_header.value); + std::string expected_key = expected_header.key; + std::string expected_value = std::string(expected_header.value); + if (actual_key != expected_key) { + Test::Fail(tostr() << "Header key does not match, expected '" + << actual_key << "' but got '" << expected_key << "'\n"); + } + if (actual_value != expected_value) { + Test::Fail(tostr() << "Header value does not match, expected '" + << actual_value << "' but got '" << expected_value << "'\n"); + } + } +} + +static void test_n_headers (int n, const char* message) { + std::string topic = Test::mk_topic_name("0085-headers", 1); + RdKafka::Conf *conf; + std::string errstr; + + Test::conf_init(&conf, NULL, 0); + + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "socket.timeout.ms", "10000"); + Test::conf_set(conf, "enable.auto.commit", "false"); + Test::conf_set(conf, "enable.partition.eof", "false"); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "statistics.interval.ms", "1000"); + + MyCbs cbs; + cbs.seen_commit = 0; + cbs.seen_stats = 0; + if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set commit callback: " + errstr); + if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set event callback: " + errstr); + + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create Producer: " + errstr); + + RdKafka::ErrorCode err; + + std::vector headers_arr; + for (int i = 0; i < n; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + headers_arr.push_back(RdKafka::Headers::Header(key, val.c_str())); + } + RdKafka::Headers *produce_headers = RdKafka::Headers::create(headers_arr, false); + + err = p->produce(topic, 0, + RdKafka::Producer::RK_MSG_COPY, + (void *)message, message ? strlen(message) : 0, + (void *)"key", 3, 0, NULL, produce_headers); + + p->flush(tmout_multip(10000)); + + if (p->outq_len() > 0) + Test::Fail(tostr() << "Expected producer to be flushed, " << + p->outq_len() << " messages remain"); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + + std::vector topics; + topics.push_back(topic); + if ((err = c->subscribe(topics))) + Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + + int cnt = 0; + while (!cbs.seen_commit || !cbs.seen_stats) { + RdKafka::Message *msg = c->consume(tmout_multip(1000)); + if (!msg->err()) { + cnt++; + Test::Say(tostr() << "Received message #" << cnt << "\n"); + if (cnt > 10) + Test::Fail(tostr() << "Should've seen the " + "offset commit (" << cbs.seen_commit << ") and " + "stats callbacks (" << cbs.seen_stats << ") by now"); + + /* Commit the first message to trigger the offset commit_cb */ + if (cnt == 1) { + err = c->commitAsync(msg); + if (err) + Test::Fail("commitAsync() failed: " + RdKafka::err2str(err)); + rd_sleep(1); /* Sleep to simulate slow processing, making sure + * that the offset commit callback op gets + * inserted on the consume queue in front of + * the messages. */ + } + RdKafka::Headers *headers = msg->get_headers(); + if (!headers) { + Test::Fail("Expected RdKafka::Message to contain headers"); + } + + assert_all_headers_match(headers, headers_arr); + + } else if (msg->err() == RdKafka::ERR__TIMED_OUT) + ; /* Stil rebalancing? */ + else + Test::Fail("consume() failed: " + msg->errstr()); + delete msg; + } + + c->close(); + delete c; + delete p; + delete conf; + +} + +static void test_one_header () { + Test::Say("Test one header in consumed message.\n"); + std::string val = "valid"; + test_n_headers(1, val.c_str()); +} + +static void test_ten_headers () { + Test::Say("Test ten headers in consumed message.\n"); + std::string val = "valid"; + test_n_headers(10, val.c_str()); +} + +static void test_one_header_null_msg () { + Test::Say("Test one header in consumed message with a null value message.\n"); + test_n_headers(1, NULL); +} + +static void test_one_header_empty_msg () { + Test::Say("Test one header in consumed message with an empty value message.\n"); + std::string val = ""; + test_n_headers(1, val.c_str()); +} + +extern "C" { + int main_0085_headers (int argc, char **argv) { + test_one_header(); + test_ten_headers(); + // These two tests fail and I'm not sure if this is correct behaviour + // test_one_header_null_msg(); + // test_one_header_empty_msg(); + return 0; + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 898c9e18c3..c36eaaebbe 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -77,6 +77,7 @@ set( 0082-fetch_max_bytes.cpp 0083-cb_event.c 0084-destroy_flags.c + 0085-headers.cpp 0086-purge.c 0088-produce_metadata_timeout.c 0089-max_poll_interval.c diff --git a/tests/test.c b/tests/test.c index 6127c173cf..8e83bce5c9 100644 --- a/tests/test.c +++ b/tests/test.c @@ -177,6 +177,7 @@ _TEST_DECL(0082_fetch_max_bytes); _TEST_DECL(0083_cb_event); _TEST_DECL(0084_destroy_flags_local); _TEST_DECL(0084_destroy_flags); +_TEST_DECL(0085_headers); _TEST_DECL(0086_purge_local); _TEST_DECL(0086_purge_remote); _TEST_DECL(0088_produce_metadata_timeout); @@ -184,6 +185,7 @@ _TEST_DECL(0089_max_poll_interval); _TEST_DECL(0090_idempotence); _TEST_DECL(0091_max_poll_interval_timeout); + /* Manual tests */ _TEST_DECL(8000_idle); @@ -291,6 +293,7 @@ struct test tests[] = { _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)), _TEST(0084_destroy_flags_local, TEST_F_LOCAL), _TEST(0084_destroy_flags, 0), + _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)), _TEST(0086_purge_local, TEST_F_LOCAL), _TEST(0086_purge_remote, 0), #if WITH_SOCKEM diff --git a/win32/librdkafkacpp/librdkafkacpp.vcxproj b/win32/librdkafkacpp/librdkafkacpp.vcxproj index 789c0d127f..40cbabc8bd 100644 --- a/win32/librdkafkacpp/librdkafkacpp.vcxproj +++ b/win32/librdkafkacpp/librdkafkacpp.vcxproj @@ -85,6 +85,7 @@ + diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 27980ec532..27a5df489a 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -167,6 +167,7 @@ +