diff --git a/examples/rdkafka_example.cpp b/examples/rdkafka_example.cpp index 944843c3dc..dec77476ba 100644 --- a/examples/rdkafka_example.cpp +++ b/examples/rdkafka_example.cpp @@ -203,6 +203,8 @@ class MyHashPartitionerCb : public RdKafka::PartitionerCb { }; void msg_consume(RdKafka::Message* message, void* opaque) { + const RdKafka::Headers *headers; + switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; @@ -213,6 +215,20 @@ void msg_consume(RdKafka::Message* message, void* opaque) { if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); @@ -479,6 +495,8 @@ int main (int argc, char **argv) { /* Set delivery report callback */ conf->set("dr_cb", &ex_dr_cb, errstr); + conf->set("default_topic_conf", tconf, errstr); + /* * Create producer using accumulated global configuration. */ @@ -490,15 +508,6 @@ int main (int argc, char **argv) { std::cout << "% Created producer " << producer->name() << std::endl; - /* - * Create topic handle. - */ - RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, - tconf, errstr); - if (!topic) { - std::cerr << "Failed to create topic: " << errstr << std::endl; - exit(1); - } /* * Read messages from stdin and produce to broker. @@ -509,20 +518,36 @@ int main (int argc, char **argv) { continue; } + RdKafka::Headers *headers = RdKafka::Headers::create(); + headers->add("my header", "header value"); + headers->add("other header", "yes"); + /* * Produce message */ RdKafka::ErrorCode resp = - producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(line.c_str()), line.size(), - NULL, NULL); - if (resp != RdKafka::ERR_NO_ERROR) - std::cerr << "% Produce failed: " << - RdKafka::err2str(resp) << std::endl; - else - std::cerr << "% Produced message (" << line.size() << " bytes)" << - std::endl; + producer->produce(topic_str, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + /* Value */ + const_cast(line.c_str()), line.size(), + /* Key */ + NULL, 0, + /* Timestamp (defaults to now) */ + 0, + /* Message headers, if any */ + headers, + /* Per-message opaque value passed to + * delivery report */ + NULL); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "% Produce failed: " << + RdKafka::err2str(resp) << std::endl; + delete headers; /* Headers are automatically deleted on produce() + * success. */ + } else { + std::cerr << "% Produced message (" << line.size() << " bytes)" << + std::endl; + } producer->poll(0); } @@ -533,7 +558,6 @@ int main (int argc, char **argv) { producer->poll(1000); } - delete topic; delete producer; @@ -635,7 +659,7 @@ int main (int argc, char **argv) { RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka::ERR_NO_ERROR) { - std::cerr << "%% Failed to acquire metadata: " + std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err) << std::endl; run = 0; break; @@ -649,6 +673,8 @@ int main (int argc, char **argv) { } + delete conf; + delete tconf; /* * Wait for RdKafka to decommission. diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp index d7b5f9357d..b31912c677 100644 --- a/src-cpp/HeadersImpl.cpp +++ b/src-cpp/HeadersImpl.cpp @@ -33,17 +33,15 @@ #include "rdkafkacpp_int.h" -RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) { - return new RdKafka::HeadersImpl(initial_count, false); +RdKafka::Headers *RdKafka::Headers::create() { + return new RdKafka::HeadersImpl(); } RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers) { - if (headers.size() > 0) { - return new RdKafka::HeadersImpl(headers, false); - } else { - return new RdKafka::HeadersImpl(8, false); - } - + if (headers.size() > 0) + return new RdKafka::HeadersImpl(headers); + else + return new RdKafka::HeadersImpl(); } RdKafka::Headers::~Headers() {} diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index 5df139e72b..c8631fd694 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -169,25 +169,34 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers) { + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) { rd_kafka_headers_t *hdrs = NULL; + RdKafka::HeadersImpl *headersimpl = NULL; + rd_kafka_resp_err_t err; + if (headers) { - hdrs = headers->c_headers(); + headersimpl = static_cast(headers); + hdrs = headersimpl->c_ptr(); } - return - static_cast - ( - rd_kafka_producev(rk_, - RD_KAFKA_V_TOPIC(topic_name.c_str()), - RD_KAFKA_V_PARTITION(partition), - RD_KAFKA_V_MSGFLAGS(msgflags), - RD_KAFKA_V_VALUE(payload, len), - RD_KAFKA_V_KEY(key, key_len), - RD_KAFKA_V_TIMESTAMP(timestamp), - RD_KAFKA_V_OPAQUE(msg_opaque), - RD_KAFKA_V_HEADERS(hdrs), - RD_KAFKA_V_END) - ); + err = rd_kafka_producev(rk_, + RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), + RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), + RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_HEADERS(hdrs), + RD_KAFKA_V_END); + + if (!err && headersimpl) { + /* A successful producev() call will destroy the C headers. */ + headersimpl->c_headers_destroyed(); + delete headers; + } + + return static_cast(err); } diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 70685a4ff2..0787b8ccd3 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -76,7 +76,6 @@ extern "C" { struct rd_kafka_s; struct rd_kafka_topic_s; struct rd_kafka_message_s; - struct rd_kafka_headers_s; }; namespace RdKafka { @@ -1451,22 +1450,24 @@ class RD_EXPORT MessageTimestamp { int64_t timestamp; /**< Milliseconds since epoch (UTC). */ }; + /** * @brief Headers object * - * This object encapsulates the C implementation logic into a C++ object - * for use in RdKafka::Messages object. + * Represents message headers. + * + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers * * @remark Requires Apache Kafka >= 0.11.0 brokers */ class RD_EXPORT Headers { - public: +public: virtual ~Headers() = 0; /** * @brief Header object * - * This object represents a single Header with key value pair + * This object represents a single Header with a key value pair * and an ErrorCode * * @remark dynamic allocation of this object is not supported. @@ -1477,111 +1478,107 @@ class RD_EXPORT Headers { * @brief Header object to encapsulate a single Header * * @param key the string value for the header key - * - * @param value the bytes of the header value - * + * @param value the bytes of the header value, or NULL * @param value_size the length in bytes of the header value + * + * @remark key and value are copied. + * */ Header(const std::string &key, const void *value, size_t value_size): key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { - value_ = copy_value(value, value_size); - }; + value_ = copy_value(value, value_size); + } /** * @brief Header object to encapsulate a single Header * * @param key the string value for the header key - * * @param value the bytes of the header value - * * @param value_size the length in bytes of the header value - * * @param err the error code if one returned * - * @remark The error code is used for when the Header is constructed internally - * by using something like RdKafka::Headers::get_last which constructs + * @remark The error code is used for when the Header is constructed + * internally by using RdKafka::Headers::get_last which constructs * a Header encapsulating the ErrorCode in the process */ Header(const std::string &key, const void *value, size_t value_size, - const RdKafka::ErrorCode &err): - key_(key), err_(err), value_size_(value_size) { + const RdKafka::ErrorCode err): + key_(key), err_(err), value_size_(value_size) { value_ = copy_value(value, value_size); - }; + } /** * @brief Copy constructor * - * @param other the other Header used for the copy constructor + * @param other other Header used for the copy constructor */ - Header(const Header &other) - { - key_ = other.key_; - err_ = other.err_; - value_size_ = other.value_size_; - - value_ = copy_value(other.value_, value_size_); + Header(const Header &other): + key_(other.key_), err_(other.err_), value_size_(other.value_size_) { + value_ = copy_value(other.value_, value_size_); } Header& operator=(const Header &other) { - if(&other == this) { - return *this; - } + if (&other == this) { + return *this; + } - key_ = other.key_; - err_ = other.err_; - value_size_ = other.value_size_; + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; - value_ = copy_value(other.value_, value_size_); - - return *this; + value_ = copy_value(other.value_, value_size_); + + return *this; } ~Header() { - if (value_ != NULL) { - free(value_); - } + if (value_ != NULL) + free(value_); } - - /** @returns Key the Key associated with this Header */ + + /** @returns the key/name associated with this Header */ std::string key() const { - return key_; + return key_; } - /** @returns Value returns the binary value */ + /** @returns returns the binary value, or NULL */ const void *value() const { - return value_; + return value_; } - /** @returns Value returns the value casted to a C string */ + /** @returns returns the value casted to a nul-terminated C string, + * or NULL. */ const char *value_string() const { - return static_cast(value_); + return static_cast(value_); } - /** @returns Value Size the length of the Value in bytes */ + /** @returns Value Size the length of the Value in bytes */ size_t value_size() const { - return value_size_; + return value_size_; } - /** @returns Error Code the error code of this Header (usually ERR_NO_ERROR) */ + /** @returns the error code of this Header (usually ERR_NO_ERROR) */ RdKafka::ErrorCode err() const { - return err_; + return err_; } - - private: - char *copy_value(const void* value, size_t value_size) { - char * dest = NULL; - if (value != NULL) { - dest = (char*) malloc(value_size + 1); - memcpy(dest, (char*)value, value_size); - dest[value_size] = '\0'; - } - return dest; + + private: + char *copy_value(const void *value, size_t value_size) { + if (!value) + return NULL; + + char *dest = (char *)malloc(value_size + 1); + memcpy(dest, (const char *)value, value_size); + dest[value_size] = '\0'; + + return dest; } + std::string key_; RdKafka::ErrorCode err_; char *value_; @@ -1590,130 +1587,103 @@ class RD_EXPORT Headers { }; /** - * @brief create a new instance of the Headers object + * @brief Create a new instance of the Headers object * - * @params initial_size initial size to set the Headers list to - * - * @returns Empty Headers list set to the initial size + * @returns an empty Headers list */ - static Headers *create(size_t initial_size); + static Headers *create(); /** - * @brief create a new instance of the Headers object from a std::vector + * @brief Create a new instance of the Headers object from a std::vector * - * @params headers std::vector of RdKafka::Headers::Header objects + * @params headers std::vector of RdKafka::Headers::Header objects. + * The headers are copied, not referenced. * - * @returns Headers list from std::vector set to the size of the std::vector + * @returns a Headers list from std::vector set to the size of the std::vector */ static Headers *create(const std::vector
&headers); - /** - * @brief adds a Header to the end - * - * @param key the header key as a std::string - * - * @param value the value as a binary value + /** + * @brief Adds a Header to the end of the list. * - * @param value_size the size of the value added + * @param key header key/name + * @param value binary value, or NULL + * @param value_size size of the value * - * @returns An ErrorCode signalling a success or failure to add the header. + * @returns an ErrorCode signalling success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const void* value, size_t value_size) = 0; + virtual ErrorCode add(const std::string &key, const void *value, + size_t value_size) = 0; - /** - * @brief adds a Header to the end - * - * @param key the header key as a std::string + /** + * @brief Adds a Header to the end of the list. + * + * Convenience method for adding a std::string as a value for the header. * - * @param value the value as a std::string + * @param key header key/name + * @param value value string * - * @remark convenience method for adding a std::string as a value for the header + * @returns an ErrorCode signalling success or failure to add the header. + */ + virtual ErrorCode add(const std::string &key, const std::string &value) = 0; + + /** + * @brief Adds a Header to the end of the list. + * + * This method makes a copy of the passed header. + * + * @param header Existing header to copy * - * @returns An ErrorCode signalling a success or failure to add the header. + * @returns an ErrorCode signalling success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const std::string &value) = 0; + virtual ErrorCode add(const Header &header) = 0; - /** - * @brief removes all the Headers of a given key + /** + * @brief Removes all the Headers of a given key * - * @param key the header key as a std::string you want to remove + * @param key header key/name to remove * - * @remark if duplicate keys exist this will remove all of them - * * @returns An ErrorCode signalling a success or failure to remove the Header. */ - virtual ErrorCode remove(const std::string& key) = 0; + virtual ErrorCode remove(const std::string &key) = 0; - /** - * @brief gets all of the Headers of a given key + /** + * @brief Gets all of the Headers of a given key * - * @param key the header key as a std::string you want to get + * @param key header key/name * - * @remark if duplicate keys exist this will return them all as a std::vector + * @remark If duplicate keys exist this will return them all as a std::vector * * @returns a std::vector containing all the Headers of the given key. */ virtual std::vector
get(const std::string &key) const = 0; - /** - * @brief gets the last occurrence of a Header of a given key + /** + * @brief Gets the last occurrence of a Header of a given key * - * @param key the header key as a std::string you want to get + * @param key header key/name * - * @remark this will only return the most recently added header + * @remark This will only return the most recently added header * - * @returns the Header if found, otherwise a Header with an ErrorCode + * @returns the Header if found, otherwise a Header with an err set to + * ERR__NOENT. */ - virtual Header get_last(const std::string& key) const = 0; + virtual Header get_last(const std::string &key) const = 0; - /** - * @brief returns all the Headers of a Message + /** + * @brief Returns all Headers * - * @returns a std::vector containing all of the Headers of a message + * @returns a std::vector containing all of the Headers */ virtual std::vector
get_all() const = 0; - - /** - * @brief the count of all the Headers - * - * @returns a size_t count of all the headers - */ - virtual size_t size() const = 0; - /** - * @brief Returns the underlying librdkafka C rd_kafka_headers_t handle. - * - * @warning Calling the C API on this handle is not recommended and there - * is no official support for it, but for cases where the C++ API - * does not provide the underlying functionality this C handle can be - * used to interact directly with the core librdkafka API. - * - * @remark The lifetime of the returned pointer can be different than the lifetime - * of the Headers message due to how the producev function in the C API works - * if there is no error then the producev will take care of deallocation - * but if there is an error then it is the responsibility of the calling - * object to deallocate the underlying C implementation if an instance - * of the Headers object is created with free_rd_headers set to `false` - * - * @remark Include prior to including - * - * - * @returns \c rd_kafka_headers_t* - */ - virtual struct rd_kafka_headers_s *c_headers() = 0; - - /** - * @brief cleans up the underlying allocated C implementation headers if called - * - * @remark Safe to call even if the Headers object is set to clean up when - * when the destructor is called - * - * @remark Safe to call even if the underlyng C pointer is set to null + * @returns the number of headers. */ - virtual void destroy_headers() = 0; + virtual size_t size() const = 0; }; + /** * @brief Message object * @@ -1823,8 +1793,19 @@ class RD_EXPORT Message { */ virtual Status status () const = 0; - /** @returns The Headers instance for this Message (if applicable) */ - virtual RdKafka::Headers *get_headers() = 0; + /** @returns the Headers instance for this Message, or NULL if there + * are no headers. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers () = 0; + + /** @returns the Headers instance for this Message (if applicable). + * If NULL is returned the reason is given in \p err, which + * is either ERR__NOENT if there were no headers, or another + * error code if header parsing failed. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; }; /**@}*/ @@ -2528,13 +2509,17 @@ class RD_EXPORT Producer : public virtual Handle { /** * @brief produce() variant that that allows for Header support on produce * Otherwise identical to produce() above. + * + * @warning The \p headers will be freed/deleted if the produce() call + * succeeds, or left untouched if produce() fails. */ virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers) = 0; + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) = 0; /** diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 782ee5f031..4b04fce066 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -119,16 +119,16 @@ class EventImpl : public Event { bool fatal_; }; + class HeadersImpl : public Headers { public: - HeadersImpl (size_t initial_size, bool free_rd_headers): - headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + HeadersImpl (): + headers_ (rd_kafka_headers_new(8)) {} HeadersImpl (rd_kafka_headers_t *headers): - headers_ (headers), free_headers_ (false) {}; + headers_ (headers) {} - HeadersImpl (const std::vector
&headers, bool free_rd_headers): - free_headers_ (free_rd_headers) { + HeadersImpl (const std::vector
&headers) { if (headers.size() > 0) { headers_ = rd_kafka_headers_new(headers.size()); from_vector(headers); @@ -138,128 +138,20 @@ class HeadersImpl : public Headers { } ~HeadersImpl() { - if(free_headers_ && headers_) { + if (headers_) { rd_kafka_headers_destroy(headers_); } } - ErrorCode add(const std::string& key, const char* value) { + ErrorCode add(const std::string& key, const char *value) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), - value, strlen(value)); - return static_cast(err); - } - - ErrorCode remove(const std::string& key) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_remove (headers_, key.c_str()); + value, -1); return static_cast(err); } - std::vector get(const std::string &key) const { - std::vector headers; - const void *value; - size_t size; - rd_kafka_resp_err_t err; - for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ - idx++) { - if (value) { - const char* casted_value = static_cast(value); - headers.push_back(Headers::Header(key, casted_value)); - } - } - return headers; - } - - Headers::Header get_last(const std::string& key) const { - const void *value; - size_t size; - rd_kafka_resp_err_t err; - err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); - const char* casted_value = static_cast(value); - ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, casted_value, cpp_error); - } - - std::vector get_all() const { - std::vector headers; - size_t idx = 0; - const char *name; - const void *valuep; - size_t size; - while (!rd_kafka_header_get_all(headers_, idx++, - &name, &valuep, &size)) { - if (valuep != NULL) { - const char* casted_value = static_cast(valuep); - headers.push_back(Headers::Header(name, casted_value)); - } - } - return headers; - } - - size_t size() const { - return rd_kafka_header_cnt(headers_); - } - - struct rd_kafka_headers_s* c_headers() { - return headers_; - } - - ErrorCode destroy_headers() { - if (headers_) { - rd_kafka_headers_destroy(headers_); - headers_ = 0; - return RdKafka::ERR_NO_ERROR; - } else { - return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; - } - } - - private: - void from_vector(const std::vector
&headers) { - if (headers.size() > 0) { - for (std::vector
::const_iterator it = headers.begin(); - it != headers.end(); - it++) { - this->add(it->key, it->value); - } - } - } - - HeadersImpl(HeadersImpl const&) /*= delete*/; - HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; - - rd_kafka_headers_t* headers_; - bool free_headers_; -}; - -class HeadersImpl : public Headers { - public: - HeadersImpl (size_t initial_size, bool free_rd_headers): - headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} - - HeadersImpl (rd_kafka_headers_t *headers, bool free_rd_headers): - headers_ (headers), free_headers_ (free_rd_headers) {}; - - HeadersImpl (const std::vector
&headers, bool free_rd_headers): - free_headers_ (free_rd_headers) { - if (headers.size() > 0) { - headers_ = rd_kafka_headers_new(headers.size()); - from_vector(headers); - } else { - headers_ = rd_kafka_headers_new(8); - } - } - - ~HeadersImpl() { - if (free_headers_ && headers_) { - rd_kafka_headers_destroy(headers_); - } - } - - ErrorCode add(const std::string &key, const void *value, size_t value_size) { + ErrorCode add(const std::string& key, const void *value, size_t value_size) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), @@ -275,7 +167,15 @@ class HeadersImpl : public Headers { return static_cast(err); } - ErrorCode remove(const std::string &key) { + ErrorCode add(const Header &header) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + header.key().c_str(), header.key().size(), + header.value(), header.value_size()); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { rd_kafka_resp_err_t err; err = rd_kafka_header_remove (headers_, key.c_str()); return static_cast(err); @@ -287,11 +187,10 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), + &value, &size)) ; idx++) { - if (value) { - headers.push_back(Headers::Header(key, value, size)); - } + headers.push_back(Headers::Header(key, value, size)); } return headers; } @@ -301,21 +200,19 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); - ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, value, size, cpp_error); + return Headers::Header(key, value, size, + static_cast(err)); } std::vector get_all() const { std::vector headers; size_t idx = 0; const char *name; - const void *value; + const void *valuep; size_t size; while (!rd_kafka_header_get_all(headers_, idx++, - &name, &value, &size)) { - if (value != NULL) { - headers.push_back(Headers::Header(name, value, size)); - } + &name, &valuep, &size)) { + headers.push_back(Headers::Header(name, valuep, size)); } return headers; } @@ -324,36 +221,34 @@ class HeadersImpl : public Headers { return rd_kafka_header_cnt(headers_); } - struct rd_kafka_headers_s* c_headers() { - return headers_; + /** @brief Reset the C headers pointer to NULL. */ + void c_headers_destroyed() { + headers_ = NULL; } - void destroy_headers() { - if (headers_) { - rd_kafka_headers_destroy(headers_); - headers_ = 0; - } + /** @returns the underlying C headers, or NULL. */ + rd_kafka_headers_t *c_ptr() { + return headers_; } - - private: + + +private: void from_vector(const std::vector
&headers) { - if (headers.size() > 0) { - for (std::vector
::const_iterator it = headers.begin(); - it != headers.end(); - it++) { - this->add(it->key(), it->value(), it->value_size()); - } - } + if (headers.size() == 0) + return; + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); it++) + this->add(*it); } HeadersImpl(HeadersImpl const&) /*= delete*/; HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; - rd_kafka_headers_t* headers_; - bool free_headers_; + rd_kafka_headers_t *headers_; }; + class MessageImpl : public Message { public: ~MessageImpl () { @@ -367,16 +262,16 @@ class MessageImpl : public Message { MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) {} + headers_(NULL) {} MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage, bool dofree): topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) {} + headers_(NULL) {} MessageImpl (rd_kafka_message_t *rkmessage): topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) { + headers_(NULL) { if (rkmessage->rkt) { /* Possibly NULL */ topic_ = static_cast(rd_kafka_topic_opaque(rkmessage->rkt)); @@ -451,8 +346,27 @@ class MessageImpl : public Message { return static_cast(rd_kafka_message_status(rkmessage_)); } - Headers* get_headers() { - return headers_; + Headers *headers () { + ErrorCode err; + return headers(&err); + } + + Headers *headers (ErrorCode *err) { + *err = ERR_NO_ERROR; + + if (!headers_) { + rd_kafka_headers_t *c_hdrs; + rd_kafka_resp_err_t c_err; + + if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) { + *err = static_cast(c_err); + return NULL; + } + + headers_ = new HeadersImpl(c_hdrs); + } + + return headers_; } RdKafka::Topic *topic_; @@ -462,21 +376,13 @@ class MessageImpl : public Message { * used as a place holder and rkmessage_ is set to point to it. */ rd_kafka_message_t rkmessage_err_; mutable std::string *key_; /* mutable because it's a cached value */ - RdKafka::Headers *headers_; private: - RdKafka::Headers* get_headers_from_rkmessage(rd_kafka_message_t *rkmessage) { - rd_kafka_headers_t *hdrsp; - rd_kafka_resp_err_t err; - - if (rkmessage->len > 0 && !(err = rd_kafka_message_detach_headers(rkmessage, &hdrsp))) { - return new HeadersImpl(hdrsp, free_rkmessage_); - } - return NULL; - } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; MessageImpl& operator=(MessageImpl const&) /*= delete*/; + + RdKafka::Headers *headers_; }; @@ -1173,8 +1079,9 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers); + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque); ErrorCode flush (int timeout_ms) { return static_cast(rd_kafka_flush(rk_, diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp index 72f7189d48..2ce24b6e3e 100644 --- a/tests/0085-headers.cpp +++ b/tests/0085-headers.cpp @@ -29,22 +29,27 @@ #include #include "testcpp.h" + +static RdKafka::Producer *producer; +static RdKafka::KafkaConsumer *consumer; +static std::string topic; + static void assert_all_headers_match(RdKafka::Headers *actual, - RdKafka::Headers *expected) { + const RdKafka::Headers *expected) { if (!actual) { Test::Fail("Expected RdKafka::Message to contain headers"); } if (actual->size() != expected->size()) { - Test::Fail(tostr() << "Expected headers length to equal " + Test::Fail(tostr() << "Expected headers length to equal " << expected->size() << " instead equals " << actual->size() << "\n"); } std::vector actual_headers = actual->get_all(); std::vector expected_headers = expected->get_all(); - Test::Say(tostr() << "Header size " << actual_headers.size() << "\n"); + Test::Say(3, tostr() << "Header size " << actual_headers.size() << "\n"); for(size_t i = 0; i < actual_headers.size(); i++) { RdKafka::Headers::Header actual_header = actual_headers[i]; - RdKafka::Headers::Header expected_header = expected_headers[i]; + const RdKafka::Headers::Header expected_header = expected_headers[i]; std::string actual_key = actual_header.key(); std::string actual_value = std::string( actual_header.value_string(), @@ -56,185 +61,108 @@ static void assert_all_headers_match(RdKafka::Headers *actual, expected_header.value_size() ); - Test::Say(tostr() << "Expected Key " << expected_key << " Expected val " << expected_value - << " Actual key " << actual_key << " Actual val " << actual_value << "\n"); + Test::Say(3, + tostr() << + "Expected Key " << expected_key << + ", Expected val " << expected_value << + ", Actual key " << actual_key << + ", Actual val " << actual_value << "\n"); if (actual_key != expected_key) { - Test::Fail(tostr() << "Header key does not match, expected '" + Test::Fail(tostr() << "Header key does not match, expected '" << actual_key << "' but got '" << expected_key << "'\n"); } if (actual_value != expected_value) { - Test::Fail(tostr() << "Header value does not match, expected '" + Test::Fail(tostr() << "Header value does not match, expected '" << actual_value << "' but got '" << expected_value << "'\n"); } } } static void test_headers (RdKafka::Headers *produce_headers, - RdKafka::Headers *compare_headers) { - std::string topic = Test::mk_topic_name("0085-headers", 1); - RdKafka::Conf *conf; - std::string errstr; - - Test::conf_init(&conf, NULL, 0); - - Test::conf_set(conf, "group.id", topic); - - RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); - if (!p) - Test::Fail("Failed to create Producer: " + errstr); + const RdKafka::Headers *compare_headers) { RdKafka::ErrorCode err; - err = p->produce(topic, 0, - RdKafka::Producer::RK_MSG_COPY, - (void *)"message", 7, - (void *)"key", 3, 0, NULL, produce_headers); + err = producer->produce(topic, 0, + RdKafka::Producer::RK_MSG_COPY, + (void *)"message", 7, + (void *)"key", 3, 0, produce_headers, NULL); + if (err) + Test::Fail("produce() failed: " + RdKafka::err2str(err)); - p->flush(tmout_multip(10000)); + producer->flush(tmout_multip(10*1000)); - if (p->outq_len() > 0) + if (producer->outq_len() > 0) Test::Fail(tostr() << "Expected producer to be flushed, " << - p->outq_len() << " messages remain"); - - RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); - if (!c) - Test::Fail("Failed to create KafkaConsumer: " + errstr); - std::vector parts; - parts.push_back(RdKafka::TopicPartition::create(topic, 0, - RdKafka::Topic::OFFSET_BEGINNING)); - err = c->assign(parts); - if (err != RdKafka::ERR_NO_ERROR) - Test::Fail("assign() failed: " + RdKafka::err2str(err)); - RdKafka::TopicPartition::destroy(parts); + producer->outq_len() << " messages remain"); int cnt = 0; bool running = true; while (running) { - RdKafka::Message *msg = c->consume(10000); - Test::Say(tostr() << msg->err()); + RdKafka::Message *msg = consumer->consume(10*1000); + if (msg->err() == RdKafka::ERR_NO_ERROR) { cnt++; - Test::Say(tostr() << "Received message #" << cnt << "\n"); - RdKafka::Headers *headers = msg->get_headers(); + RdKafka::Headers *headers = msg->headers(); if (compare_headers->size() > 0) { assert_all_headers_match(headers, compare_headers); } else { if (headers != 0) { - Test::Fail("Expected get_headers to return a NULL pointer"); + Test::Fail("Expected headers to return a NULL pointer"); } } running = false; - } else if (msg->err() == RdKafka::ERR__TIMED_OUT) { - Test::Say("I'm rebalancing?"); - /* Stil rebalancing? */ } else { Test::Fail("consume() failed: " + msg->errstr()); } delete msg; } - c->close(); - delete c; - delete p; - delete conf; } -static void test_one_header () { - Test::Say("Test one header in consumed message.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); +static void test_headers (int num_hdrs) { + Test::Say(tostr() << "Test " << num_hdrs << + " headers in consumed message.\n"); + RdKafka::Headers *produce_headers = RdKafka::Headers::create(); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(); for (int i = 0; i < num_hdrs; ++i) { std::stringstream key_s; key_s << "header_" << i; std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} -static void test_ten_headers () { - Test::Say("Test ten headers in consumed message.\n"); - int num_hdrs = 10; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} - -static void test_add_with_void_param () { - Test::Say("Test adding one header using add method that takes void*.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val.c_str(), val.size()); - compare_headers->add(key, val.c_str(), val.size()); - } - test_headers(produce_headers, compare_headers); -} - -static void test_no_headers () { - Test::Say("Test no headers produced.\n"); - int num_hdrs = 0; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} - -static void test_header_with_null_value () { - Test::Say("Test one header with null value.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, NULL, 0); - compare_headers->add(key, NULL, 0); + if ((i % 4) == 0) { + /* NULL value */ + produce_headers->add(key, NULL, 0); + compare_headers->add(key, NULL, 0); + } else if ((i % 5) == 0) { + /* Empty value, use different methods for produce + * and compare to make sure they behave the same way. */ + std::string val = ""; + produce_headers->add(key, val); + compare_headers->add(key, "", 0); + } else if ((i % 6) == 0) { + /* Binary value (no nul-term) */ + produce_headers->add(key, "binary", 6); + compare_headers->add(key, "binary"); /* auto-nul-terminated */ + } else { + /* Standard string value */ + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } } test_headers(produce_headers, compare_headers); + delete compare_headers; } static void test_duplicate_keys () { Test::Say("Test multiple headers with duplicate keys.\n"); int num_hdrs = 4; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *produce_headers = RdKafka::Headers::create(); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(); for (int i = 0; i < num_hdrs; ++i) { std::string dup_key = "dup_key"; std::stringstream val_s; @@ -244,12 +172,12 @@ static void test_duplicate_keys () { compare_headers->add(dup_key, val); } test_headers(produce_headers, compare_headers); + delete compare_headers; } static void test_remove_after_add () { Test::Say("Test removing after adding headers.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add one unique key std::string key_one = "key1"; @@ -264,7 +192,7 @@ static void test_remove_after_add () { // Assert header length is 2 size_t expected_size = 2; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -273,16 +201,17 @@ static void test_remove_after_add () { headers->remove(key_one); size_t expected_remove_size = 1; if (headers->size() != expected_remove_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_remove_size << ", instead got " << headers->size() << "\n"); } + + delete headers; } static void test_remove_all_duplicate_keys () { Test::Say("Test removing duplicate keys removes all headers.\n"); - int num_hdrs = 4; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add one unique key std::string key_one = "key1"; @@ -298,7 +227,7 @@ static void test_remove_all_duplicate_keys () { // Assert header length is 3 size_t expected_size = 3; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -307,16 +236,17 @@ static void test_remove_all_duplicate_keys () { headers->remove(dup_key); size_t expected_size_remove = 1; if (headers->size() != expected_size_remove) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size_remove << ", instead got " << headers->size() << "\n"); } + + delete headers; } static void test_get_last_gives_last_added_val () { Test::Say("Test get_last returns the last added value of duplicate keys.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add two duplicate keys std::string dup_key = "dup_key"; @@ -330,7 +260,7 @@ static void test_get_last_gives_last_added_val () { // Assert header length is 3 size_t expected_size = 3; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -339,16 +269,17 @@ static void test_get_last_gives_last_added_val () { RdKafka::Headers::Header last = headers->get_last(dup_key); std::string value = std::string(last.value_string()); if (value != val_three) { - Test::Fail(tostr() << "Expected get_last to return " << val_two + Test::Fail(tostr() << "Expected get_last to return " << val_two << " as the value of the header instead got " << value << "\n"); } + + delete headers; } static void test_get_of_key_returns_all () { Test::Say("Test get returns all the headers of a duplicate key.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add two duplicate keys std::string unique_key = "unique"; @@ -364,7 +295,7 @@ static void test_get_of_key_returns_all () { // Assert header length is 4 size_t expected_size = 4; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -373,24 +304,78 @@ static void test_get_of_key_returns_all () { std::vector get = headers->get(dup_key); size_t expected_get_size = 3; if (get.size() != expected_get_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_get_size << ", instead got " << headers->size() << "\n"); } + + delete headers; +} + +static void test_failed_produce () { + + RdKafka::Headers *headers = RdKafka::Headers::create(); + headers->add("my", "header"); + + RdKafka::ErrorCode err; + + err = producer->produce(topic, 999 /* invalid partition */, + RdKafka::Producer::RK_MSG_COPY, + (void *)"message", 7, + (void *)"key", 3, 0, headers, NULL); + if (!err) + Test::Fail("Expected produce() to fail"); + + delete headers; } extern "C" { int main_0085_headers (int argc, char **argv) { - test_one_header(); - test_ten_headers(); - test_add_with_void_param(); - test_no_headers(); - test_header_with_null_value(); + topic = Test::mk_topic_name("0085-headers", 1); + + RdKafka::Conf *conf; + std::string errstr; + + Test::conf_init(&conf, NULL, 0); + + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create Producer: " + errstr); + + Test::conf_set(conf, "group.id", topic); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + + delete conf; + + std::vector parts; + parts.push_back(RdKafka::TopicPartition::create(topic, 0, + RdKafka::Topic:: + OFFSET_BEGINNING)); + RdKafka::ErrorCode err = c->assign(parts); + if (err != RdKafka::ERR_NO_ERROR) + Test::Fail("assign() failed: " + RdKafka::err2str(err)); + RdKafka::TopicPartition::destroy(parts); + + producer = p; + consumer = c; + + test_headers(0); + test_headers(1); + test_headers(261); test_duplicate_keys(); test_remove_after_add(); test_remove_all_duplicate_keys(); test_get_last_gives_last_added_val(); test_get_of_key_returns_all(); + test_failed_produce(); + + c->close(); + delete c; + delete p; + return 0; } }