From 9821a3d10d26c8af42171d92b9c8602cffe5f1b5 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 15 Oct 2018 15:25:05 +0000 Subject: [PATCH] Applied most review comments --- examples/kafkatest_verifiable_client.cpp | 40 +-- src-cpp/HeadersImpl.cpp | 10 +- src-cpp/ProducerImpl.cpp | 26 +- src-cpp/rdkafkacpp.h | 181 +++++++++-- src-cpp/rdkafkacpp_int.h | 74 +++-- tests/0054-offset_time.cpp | 2 +- tests/0057-invalid_topic.cpp | 4 +- tests/0059-bsearch.cpp | 2 +- tests/0065-yield.cpp | 2 +- tests/0070-null_empty.cpp | 2 +- tests/0085-headers.cpp | 397 ++++++++++++++++------- 11 files changed, 531 insertions(+), 209 deletions(-) diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index 4c9e1e1a64..542ef823f6 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -457,17 +457,6 @@ void msg_consume(RdKafka::KafkaConsumer *consumer, " [" << (int)msg->partition() << "] at offset " << msg->offset() << std::endl; - RdKafka::Headers *headers = msg->get_headers(); - if (headers) { - std::vector sheaders = headers->get_all(); - std::cout << "Headers length: " << sheaders.size() << std::endl; - for(std::vector::const_iterator it = sheaders.begin(); - it != sheaders.end(); - it++) { - std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl; - } - } - if (state.maxMessages >= 0 && state.consumer.consumedMessages >= state.maxMessages) return; @@ -842,42 +831,31 @@ int main (int argc, char **argv) { msg << value_prefix << i; while (true) { RdKafka::ErrorCode resp; - RdKafka::Headers *headers = 0; - if (create_time == -1) { - resp = producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(msg.str().c_str()), - msg.str().size(), NULL, NULL); - } else { - std::string name = "kafkaheader"; - std::string val = "header_val"; - std::vector headers_arr; - headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str())); - - headers = RdKafka::Headers::create(headers_arr, false); - resp = producer->produce(topics[0], partition, + if (create_time == -1) { + resp = producer->produce(topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast(msg.str().c_str()), + msg.str().size(), NULL, NULL); + } else { + resp = producer->produce(topics[0], partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(msg.str().c_str()), msg.str().size(), NULL, 0, create_time, - NULL, headers); - } + NULL); + } if (resp == RdKafka::ERR__QUEUE_FULL) { producer->poll(100); continue; } else if (resp != RdKafka::ERR_NO_ERROR) { - headers->destroy_headers(); errorString("producer_send_error", RdKafka::err2str(resp), topic->name(), NULL, msg.str()); state.producer.numErr++; } else { state.producer.numSent++; } - if (headers) { - delete headers; - } break; } diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp index 27a655a612..d7b5f9357d 100644 --- a/src-cpp/HeadersImpl.cpp +++ b/src-cpp/HeadersImpl.cpp @@ -33,15 +33,15 @@ #include "rdkafkacpp_int.h" -RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) { - return new RdKafka::HeadersImpl(initial_count, free_rd_headers); +RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) { + return new RdKafka::HeadersImpl(initial_count, false); } -RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers, bool free_rd_headers) { +RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers) { if (headers.size() > 0) { - return new RdKafka::HeadersImpl(headers, free_rd_headers); + return new RdKafka::HeadersImpl(headers, false); } else { - return 0; + return new RdKafka::HeadersImpl(8, false); } } diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index cd6a1e4838..5df139e72b 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -143,6 +143,27 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic, } +RdKafka::ErrorCode +RdKafka::ProducerImpl::produce (const std::string topic_name, + int32_t partition, int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque) { + return + static_cast + ( + rd_kafka_producev(rk_, + RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), + RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), + RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_END) + ); +} + RdKafka::ErrorCode RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, @@ -150,12 +171,11 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque, RdKafka::Headers *headers) { - rd_kafka_headers_t *hdrs; + rd_kafka_headers_t *hdrs = NULL; if (headers) { hdrs = headers->c_headers(); - } else { - hdrs = 0; } + return static_cast ( diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 8e61ece983..70685a4ff2 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -50,9 +50,10 @@ #include #include #include +#include +#include #include - #ifdef _MSC_VER #undef RD_EXPORT #ifdef LIBRDKAFKA_STATICLIB @@ -1455,6 +1456,8 @@ class RD_EXPORT MessageTimestamp { * * This object encapsulates the C implementation logic into a C++ object * for use in RdKafka::Messages object. + * + * @remark Requires Apache Kafka >= 0.11.0 brokers */ class RD_EXPORT Headers { public: @@ -1467,55 +1470,175 @@ class RD_EXPORT Headers { * and an ErrorCode * * @remark dynamic allocation of this object is not supported. - * */ class Header { public: - Header(const std::string& key, - const char* value, - RdKafka::ErrorCode err = ERR_NO_ERROR): - key_(key), err_(err) { - value_container_.assign(value); + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * + * @param value the bytes of the header value + * + * @param value_size the length in bytes of the header value + */ + Header(const std::string &key, + const void *value, + size_t value_size): + key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { + value_ = copy_value(value, value_size); + }; + + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * + * @param value the bytes of the header value + * + * @param value_size the length in bytes of the header value + * + * @param err the error code if one returned + * + * @remark The error code is used for when the Header is constructed internally + * by using something like RdKafka::Headers::get_last which constructs + * a Header encapsulating the ErrorCode in the process + */ + Header(const std::string &key, + const void *value, + size_t value_size, + const RdKafka::ErrorCode &err): + key_(key), err_(err), value_size_(value_size) { + value_ = copy_value(value, value_size); }; + + /** + * @brief Copy constructor + * + * @param other the other Header used for the copy constructor + */ + Header(const Header &other) + { + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; + + value_ = copy_value(other.value_, value_size_); + } + + Header& operator=(const Header &other) + { + if(&other == this) { + return *this; + } + + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; + + value_ = copy_value(other.value_, value_size_); + + return *this; + } + + ~Header() { + if (value_ != NULL) { + free(value_); + } + } + /** @returns Key the Key associated with this Header */ std::string key() const { return key_; } - const char* value() const { - return value_container_.c_str(); + /** @returns Value returns the binary value */ + const void *value() const { + return value_; + } + + /** @returns Value returns the value casted to a C string */ + const char *value_string() const { + return static_cast(value_); } + /** @returns Value Size the length of the Value in bytes */ + size_t value_size() const { + return value_size_; + } + + /** @returns Error Code the error code of this Header (usually ERR_NO_ERROR) */ RdKafka::ErrorCode err() const { return err_; } private: + char *copy_value(const void* value, size_t value_size) { + char * dest = NULL; + if (value != NULL) { + dest = (char*) malloc(value_size + 1); + memcpy(dest, (char*)value, value_size); + dest[value_size] = '\0'; + } + return dest; + } std::string key_; RdKafka::ErrorCode err_; - std::string value_container_; + char *value_; + size_t value_size_; void *operator new(size_t); /* Prevent dynamic allocation */ }; /** * @brief create a new instance of the Headers object + * + * @params initial_size initial size to set the Headers list to + * + * @returns Empty Headers list set to the initial size */ - static Headers *create(size_t initial_size = 8, bool free_rd_headers = true); + static Headers *create(size_t initial_size); /** * @brief create a new instance of the Headers object from a std::vector + * + * @params headers std::vector of RdKafka::Headers::Header objects + * + * @returns Headers list from std::vector set to the size of the std::vector */ - static Headers *create(const std::vector
&headers, bool free_rd_headers = true); + static Headers *create(const std::vector
&headers); /** * @brief adds a Header to the end + * + * @param key the header key as a std::string + * + * @param value the value as a binary value + * + * @param value_size the size of the value added * - * @returns An ErrorCode signalling a success or failure to add the Header. + * @returns An ErrorCode signalling a success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const char* value) = 0; + virtual ErrorCode add(const std::string& key, const void* value, size_t value_size) = 0; + + /** + * @brief adds a Header to the end + * + * @param key the header key as a std::string + * + * @param value the value as a std::string + * + * @remark convenience method for adding a std::string as a value for the header + * + * @returns An ErrorCode signalling a success or failure to add the header. + */ + virtual ErrorCode add(const std::string& key, const std::string &value) = 0; /** * @brief removes all the Headers of a given key + * + * @param key the header key as a std::string you want to remove + * + * @remark if duplicate keys exist this will remove all of them * * @returns An ErrorCode signalling a success or failure to remove the Header. */ @@ -1523,6 +1646,10 @@ class RD_EXPORT Headers { /** * @brief gets all of the Headers of a given key + * + * @param key the header key as a std::string you want to get + * + * @remark if duplicate keys exist this will return them all as a std::vector * * @returns a std::vector containing all the Headers of the given key. */ @@ -1530,6 +1657,10 @@ class RD_EXPORT Headers { /** * @brief gets the last occurrence of a Header of a given key + * + * @param key the header key as a std::string you want to get + * + * @remark this will only return the most recently added header * * @returns the Header if found, otherwise a Header with an ErrorCode */ @@ -1573,16 +1704,14 @@ class RD_EXPORT Headers { virtual struct rd_kafka_headers_s *c_headers() = 0; /** - * @brief cleans up the underlying alocated C implementation headers if called + * @brief cleans up the underlying allocated C implementation headers if called * * @remark Safe to call even if the Headers object is set to clean up when * when the destructor is called * * @remark Safe to call even if the underlyng C pointer is set to null - * - * @returns an ErrorCode signalling if the the operation was attempted */ - virtual ErrorCode destroy_headers() = 0; + virtual void destroy_headers() = 0; }; /** @@ -1665,9 +1794,6 @@ class RD_EXPORT Message { /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ virtual void *msg_opaque () const = 0; - /** @returns The Headers instance for this Message (if applicable) */ - virtual RdKafka::Headers *get_headers() = 0; - virtual ~Message () = 0; /** @returns the latency in microseconds for a produced message measured @@ -1696,6 +1822,9 @@ class RD_EXPORT Message { * @brief Returns the message's persistance status in the topic log. */ virtual Status status () const = 0; + + /** @returns The Headers instance for this Message (if applicable) */ + virtual RdKafka::Headers *get_headers() = 0; }; /**@}*/ @@ -2390,6 +2519,16 @@ class RD_EXPORT Producer : public virtual Handle { * message timestamp (microseconds since beginning of epoch, UTC). * Otherwise identical to produce() above. */ + virtual ErrorCode produce (const std::string topic_name, int32_t partition, + int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque) = 0; + + /** + * @brief produce() variant that that allows for Header support on produce + * Otherwise identical to produce() above. + */ virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 776c3babb5..782ee5f031 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -240,8 +240,8 @@ class HeadersImpl : public Headers { HeadersImpl (size_t initial_size, bool free_rd_headers): headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} - HeadersImpl (rd_kafka_headers_t *headers): - headers_ (headers), free_headers_ (false) {}; + HeadersImpl (rd_kafka_headers_t *headers, bool free_rd_headers): + headers_ (headers), free_headers_ (free_rd_headers) {}; HeadersImpl (const std::vector
&headers, bool free_rd_headers): free_headers_ (free_rd_headers) { @@ -254,20 +254,28 @@ class HeadersImpl : public Headers { } ~HeadersImpl() { - if(free_headers_ && headers_) { + if (free_headers_ && headers_) { rd_kafka_headers_destroy(headers_); } } - ErrorCode add(const std::string& key, const char* value) { + ErrorCode add(const std::string &key, const void *value, size_t value_size) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), - value, strlen(value)); + value, value_size); return static_cast(err); } - ErrorCode remove(const std::string& key) { + ErrorCode add(const std::string &key, const std::string &value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value.c_str(), value.size()); + return static_cast(err); + } + + ErrorCode remove(const std::string &key) { rd_kafka_resp_err_t err; err = rd_kafka_header_remove (headers_, key.c_str()); return static_cast(err); @@ -279,11 +287,10 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ; idx++) { if (value) { - const char* casted_value = static_cast(value); - headers.push_back(Headers::Header(key, casted_value)); + headers.push_back(Headers::Header(key, value, size)); } } return headers; @@ -293,23 +300,21 @@ class HeadersImpl : public Headers { const void *value; size_t size; rd_kafka_resp_err_t err; - err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); - const char* casted_value = static_cast(value); + err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, casted_value, cpp_error); + return Headers::Header(key, value, size, cpp_error); } std::vector get_all() const { std::vector headers; size_t idx = 0; const char *name; - const void *valuep; + const void *value; size_t size; while (!rd_kafka_header_get_all(headers_, idx++, - &name, &valuep, &size)) { - if (valuep != NULL) { - const char* casted_value = static_cast(valuep); - headers.push_back(Headers::Header(name, casted_value)); + &name, &value, &size)) { + if (value != NULL) { + headers.push_back(Headers::Header(name, value, size)); } } return headers; @@ -323,13 +328,10 @@ class HeadersImpl : public Headers { return headers_; } - ErrorCode destroy_headers() { + void destroy_headers() { if (headers_) { rd_kafka_headers_destroy(headers_); headers_ = 0; - return RdKafka::ERR_NO_ERROR; - } else { - return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; } } @@ -339,7 +341,7 @@ class HeadersImpl : public Headers { for (std::vector
::const_iterator it = headers.begin(); it != headers.end(); it++) { - this->add(it->key(), it->value()); + this->add(it->key(), it->value(), it->value_size()); } } } @@ -358,8 +360,9 @@ class MessageImpl : public Message { if (free_rkmessage_) rd_kafka_message_destroy(const_cast(rkmessage_)); if (key_) - delete key_; - delete headers_; + delete key_; + if (headers_) + delete headers_; }; MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): @@ -382,11 +385,10 @@ class MessageImpl : public Message { /* Create errored message */ MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err): - topic_(topic), free_rkmessage_(false), key_(NULL), headers_(0) { + topic_(topic), free_rkmessage_(false), key_(NULL), headers_(NULL) { rkmessage_ = &rkmessage_err_; memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); rkmessage_err_.err = static_cast(err); - } std::string errstr() const { @@ -441,10 +443,6 @@ class MessageImpl : public Message { return rd_kafka_message_latency(rkmessage_); } - Headers* get_headers() { - return headers_; - } - struct rd_kafka_message_s *c_ptr () { return rkmessage_; } @@ -453,6 +451,10 @@ class MessageImpl : public Message { return static_cast(rd_kafka_message_status(rkmessage_)); } + Headers* get_headers() { + return headers_; + } + RdKafka::Topic *topic_; rd_kafka_message_t *rkmessage_; bool free_rkmessage_; @@ -467,10 +469,10 @@ class MessageImpl : public Message { rd_kafka_headers_t *hdrsp; rd_kafka_resp_err_t err; - if (rkmessage->len > 0 && !(err = rd_kafka_message_headers(rkmessage, &hdrsp))) { - return new HeadersImpl(hdrsp); + if (rkmessage->len > 0 && !(err = rd_kafka_message_detach_headers(rkmessage, &hdrsp))) { + return new HeadersImpl(hdrsp, free_rkmessage_); } - return 0; + return NULL; } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; @@ -1161,6 +1163,12 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { const std::vector *key, void *msg_opaque); + ErrorCode produce (const std::string topic_name, int32_t partition, + int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque); + ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, diff --git a/tests/0054-offset_time.cpp b/tests/0054-offset_time.cpp index c1641887b0..b550f2a9a3 100644 --- a/tests/0054-offset_time.cpp +++ b/tests/0054-offset_time.cpp @@ -104,7 +104,7 @@ static void test_offset_time (void) { for (int ti = 0 ; ti < timestamp_cnt*2 ; ti += 2) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, - timestamps[ti], NULL, NULL); + timestamps[ti], NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0057-invalid_topic.cpp b/tests/0057-invalid_topic.cpp index 66cf9e14c6..d95ada65c3 100644 --- a/tests/0057-invalid_topic.cpp +++ b/tests/0057-invalid_topic.cpp @@ -85,13 +85,13 @@ static void test_invalid_topic (void) { for (int i = -1 ; i < 3 ; i++) { err = p->produce(topic_bad, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"bad", 4, NULL, 0, 0, NULL, NULL); + (void *)"bad", 4, NULL, 0, 0, NULL); if (err) /* Error is probably delayed until delivery report */ check_err(err, RdKafka::ERR_TOPIC_EXCEPTION); err = p->produce(topic_good, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"good", 5, NULL, 0, 0, NULL, NULL); + (void *)"good", 5, NULL, 0, 0, NULL); check_err(err, RdKafka::ERR_NO_ERROR); } diff --git a/tests/0059-bsearch.cpp b/tests/0059-bsearch.cpp index 8caf84497c..20f598efef 100644 --- a/tests/0059-bsearch.cpp +++ b/tests/0059-bsearch.cpp @@ -132,7 +132,7 @@ static void do_test_bsearch (void) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, timestamp, - i == 357 ? (void *)1 /*golden*/ : NULL, NULL); + i == 357 ? (void *)1 /*golden*/ : NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); timestamp += 100 + (timestamp % 9); diff --git a/tests/0065-yield.cpp b/tests/0065-yield.cpp index 2d6323d359..ffbf1c6d7a 100644 --- a/tests/0065-yield.cpp +++ b/tests/0065-yield.cpp @@ -93,7 +93,7 @@ static void do_test_producer (bool do_yield) { for (int i = 0 ; i < msgcnt ; i++) { err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)"hi", 2, NULL, 0, 0, NULL, NULL); + (void *)"hi", 2, NULL, 0, 0, NULL); if (err) Test::Fail("produce() failed: " + RdKafka::err2str(err)); } diff --git a/tests/0070-null_empty.cpp b/tests/0070-null_empty.cpp index a0850b0854..68502f06d0 100644 --- a/tests/0070-null_empty.cpp +++ b/tests/0070-null_empty.cpp @@ -111,7 +111,7 @@ static void do_test_null_empty (bool api_version_request) { (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0, /* Key */ (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0, - 0, NULL, NULL); + 0, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp index 36e35fa969..72f7189d48 100644 --- a/tests/0085-headers.cpp +++ b/tests/0085-headers.cpp @@ -29,45 +29,36 @@ #include #include "testcpp.h" -class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb { - public: - int seen_commit; - int seen_stats; - - void offset_commit_cb (RdKafka::ErrorCode err, - std::vector&offsets) { - seen_commit++; - Test::Say("Got commit callback!\n"); - } - - void event_cb (RdKafka::Event &event) { - switch (event.type()) - { - case RdKafka::Event::EVENT_STATS: - Test::Say("Got stats callback!\n"); - seen_stats++; - break; - default: - break; - } - } -}; - static void assert_all_headers_match(RdKafka::Headers *actual, - std::vector &expected) { - if (actual->size() != expected.size()) { - Test::Fail(tostr() << "Expected headers length to equal" - << expected.size() << " instead equals " << actual->size() << "\n"); + RdKafka::Headers *expected) { + if (!actual) { + Test::Fail("Expected RdKafka::Message to contain headers"); + } + if (actual->size() != expected->size()) { + Test::Fail(tostr() << "Expected headers length to equal " + << expected->size() << " instead equals " << actual->size() << "\n"); } std::vector actual_headers = actual->get_all(); + std::vector expected_headers = expected->get_all(); + Test::Say(tostr() << "Header size " << actual_headers.size() << "\n"); for(size_t i = 0; i < actual_headers.size(); i++) { RdKafka::Headers::Header actual_header = actual_headers[i]; - RdKafka::Headers::Header expected_header = expected[i]; - std::string actual_key = actual_header.key; - std::string actual_value = std::string(actual_header.value); - std::string expected_key = expected_header.key; - std::string expected_value = std::string(expected_header.value); + RdKafka::Headers::Header expected_header = expected_headers[i]; + std::string actual_key = actual_header.key(); + std::string actual_value = std::string( + actual_header.value_string(), + actual_header.value_size() + ); + std::string expected_key = expected_header.key(); + std::string expected_value = std::string( + actual_header.value_string(), + expected_header.value_size() + ); + + Test::Say(tostr() << "Expected Key " << expected_key << " Expected val " << expected_value + << " Actual key " << actual_key << " Actual val " << actual_value << "\n"); + if (actual_key != expected_key) { Test::Fail(tostr() << "Header key does not match, expected '" << actual_key << "' but got '" << expected_key << "'\n"); @@ -79,7 +70,8 @@ static void assert_all_headers_match(RdKafka::Headers *actual, } } -static void test_n_headers (int n, const char* message) { +static void test_headers (RdKafka::Headers *produce_headers, + RdKafka::Headers *compare_headers) { std::string topic = Test::mk_topic_name("0085-headers", 1); RdKafka::Conf *conf; std::string errstr; @@ -87,22 +79,6 @@ static void test_n_headers (int n, const char* message) { Test::conf_init(&conf, NULL, 0); Test::conf_set(conf, "group.id", topic); - Test::conf_set(conf, "group.id", topic); - Test::conf_set(conf, "socket.timeout.ms", "10000"); - Test::conf_set(conf, "enable.auto.commit", "false"); - Test::conf_set(conf, "enable.partition.eof", "false"); - Test::conf_set(conf, "auto.offset.reset", "earliest"); - Test::conf_set(conf, "statistics.interval.ms", "1000"); - - MyCbs cbs; - cbs.seen_commit = 0; - cbs.seen_stats = 0; - if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) != - RdKafka::Conf::CONF_OK) - Test::Fail("Failed to set commit callback: " + errstr); - if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) != - RdKafka::Conf::CONF_OK) - Test::Fail("Failed to set event callback: " + errstr); RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); if (!p) @@ -110,21 +86,9 @@ static void test_n_headers (int n, const char* message) { RdKafka::ErrorCode err; - std::vector headers_arr; - for (int i = 0; i < n; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - headers_arr.push_back(RdKafka::Headers::Header(key, val.c_str())); - } - RdKafka::Headers *produce_headers = RdKafka::Headers::create(headers_arr, false); - err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)message, message ? strlen(message) : 0, + (void *)"message", 7, (void *)"key", 3, 0, NULL, produce_headers); p->flush(tmout_multip(10000)); @@ -136,84 +100,297 @@ static void test_n_headers (int n, const char* message) { RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); if (!c) Test::Fail("Failed to create KafkaConsumer: " + errstr); - - std::vector topics; - topics.push_back(topic); - if ((err = c->subscribe(topics))) - Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + std::vector parts; + parts.push_back(RdKafka::TopicPartition::create(topic, 0, + RdKafka::Topic::OFFSET_BEGINNING)); + err = c->assign(parts); + if (err != RdKafka::ERR_NO_ERROR) + Test::Fail("assign() failed: " + RdKafka::err2str(err)); + RdKafka::TopicPartition::destroy(parts); int cnt = 0; - while (!cbs.seen_commit || !cbs.seen_stats) { - RdKafka::Message *msg = c->consume(tmout_multip(1000)); - if (!msg->err()) { + bool running = true; + + while (running) { + RdKafka::Message *msg = c->consume(10000); + Test::Say(tostr() << msg->err()); + if (msg->err() == RdKafka::ERR_NO_ERROR) { cnt++; Test::Say(tostr() << "Received message #" << cnt << "\n"); - if (cnt > 10) - Test::Fail(tostr() << "Should've seen the " - "offset commit (" << cbs.seen_commit << ") and " - "stats callbacks (" << cbs.seen_stats << ") by now"); - - /* Commit the first message to trigger the offset commit_cb */ - if (cnt == 1) { - err = c->commitAsync(msg); - if (err) - Test::Fail("commitAsync() failed: " + RdKafka::err2str(err)); - rd_sleep(1); /* Sleep to simulate slow processing, making sure - * that the offset commit callback op gets - * inserted on the consume queue in front of - * the messages. */ - } RdKafka::Headers *headers = msg->get_headers(); - if (!headers) { - Test::Fail("Expected RdKafka::Message to contain headers"); + if (compare_headers->size() > 0) { + assert_all_headers_match(headers, compare_headers); + } else { + if (headers != 0) { + Test::Fail("Expected get_headers to return a NULL pointer"); + } } - - assert_all_headers_match(headers, headers_arr); - - } else if (msg->err() == RdKafka::ERR__TIMED_OUT) - ; /* Stil rebalancing? */ - else + running = false; + } else if (msg->err() == RdKafka::ERR__TIMED_OUT) { + Test::Say("I'm rebalancing?"); + /* Stil rebalancing? */ + } else { Test::Fail("consume() failed: " + msg->errstr()); + } delete msg; } - c->close(); delete c; delete p; delete conf; - } static void test_one_header () { Test::Say("Test one header in consumed message.\n"); - std::string val = "valid"; - test_n_headers(1, val.c_str()); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); } static void test_ten_headers () { Test::Say("Test ten headers in consumed message.\n"); - std::string val = "valid"; - test_n_headers(10, val.c_str()); + int num_hdrs = 10; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); } -static void test_one_header_null_msg () { - Test::Say("Test one header in consumed message with a null value message.\n"); - test_n_headers(1, NULL); +static void test_add_with_void_param () { + Test::Say("Test adding one header using add method that takes void*.\n"); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val.c_str(), val.size()); + compare_headers->add(key, val.c_str(), val.size()); + } + test_headers(produce_headers, compare_headers); } -static void test_one_header_empty_msg () { - Test::Say("Test one header in consumed message with an empty value message.\n"); - std::string val = ""; - test_n_headers(1, val.c_str()); +static void test_no_headers () { + Test::Say("Test no headers produced.\n"); + int num_hdrs = 0; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); +} + +static void test_header_with_null_value () { + Test::Say("Test one header with null value.\n"); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, NULL, 0); + compare_headers->add(key, NULL, 0); + } + test_headers(produce_headers, compare_headers); +} + +static void test_duplicate_keys () { + Test::Say("Test multiple headers with duplicate keys.\n"); + int num_hdrs = 4; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::string dup_key = "dup_key"; + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(dup_key, val); + compare_headers->add(dup_key, val); + } + test_headers(produce_headers, compare_headers); +} + +static void test_remove_after_add () { + Test::Say("Test removing after adding headers.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add one unique key + std::string key_one = "key1"; + std::string val_one = "val_one"; + headers->add(key_one, val_one); + + // Add a second unique key + std::string key_two = "key2"; + std::string val_two = "val_two"; + headers->add(key_two, val_one); + + // Assert header length is 2 + size_t expected_size = 2; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Remove key_one and assert headers == 1 + headers->remove(key_one); + size_t expected_remove_size = 1; + if (headers->size() != expected_remove_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_remove_size << ", instead got " + << headers->size() << "\n"); + } +} + +static void test_remove_all_duplicate_keys () { + Test::Say("Test removing duplicate keys removes all headers.\n"); + int num_hdrs = 4; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add one unique key + std::string key_one = "key1"; + std::string val_one = "val_one"; + headers->add(key_one, val_one); + + // Add 2 duplicate keys + std::string dup_key = "dup_key"; + std::string val_two = "val_two"; + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + + // Assert header length is 3 + size_t expected_size = 3; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Remove key_one and assert headers == 1 + headers->remove(dup_key); + size_t expected_size_remove = 1; + if (headers->size() != expected_size_remove) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size_remove << ", instead got " + << headers->size() << "\n"); + } +} + +static void test_get_last_gives_last_added_val () { + Test::Say("Test get_last returns the last added value of duplicate keys.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add two duplicate keys + std::string dup_key = "dup_key"; + std::string val_one = "val_one"; + std::string val_two = "val_two"; + std::string val_three = "val_three"; + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + headers->add(dup_key, val_three); + + // Assert header length is 3 + size_t expected_size = 3; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Get last of duplicate key and assert it equals val_two + RdKafka::Headers::Header last = headers->get_last(dup_key); + std::string value = std::string(last.value_string()); + if (value != val_three) { + Test::Fail(tostr() << "Expected get_last to return " << val_two + << " as the value of the header instead got " + << value << "\n"); + } +} + +static void test_get_of_key_returns_all () { + Test::Say("Test get returns all the headers of a duplicate key.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add two duplicate keys + std::string unique_key = "unique"; + std::string dup_key = "dup_key"; + std::string val_one = "val_one"; + std::string val_two = "val_two"; + std::string val_three = "val_three"; + headers->add(unique_key, val_one); + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + headers->add(dup_key, val_three); + + // Assert header length is 4 + size_t expected_size = 4; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Get all of the duplicate key + std::vector get = headers->get(dup_key); + size_t expected_get_size = 3; + if (get.size() != expected_get_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_get_size << ", instead got " + << headers->size() << "\n"); + } } extern "C" { int main_0085_headers (int argc, char **argv) { test_one_header(); test_ten_headers(); - // These two tests fail and I'm not sure if this is correct behaviour - // test_one_header_null_msg(); - // test_one_header_empty_msg(); + test_add_with_void_param(); + test_no_headers(); + test_header_with_null_value(); + test_duplicate_keys(); + test_remove_after_add(); + test_remove_all_duplicate_keys(); + test_get_last_gives_last_added_val(); + test_get_of_key_returns_all(); return 0; } }