diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 16acc1238f..8e61ece983 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -1474,16 +1474,25 @@ class RD_EXPORT Headers { Header(const std::string& key, const char* value, RdKafka::ErrorCode err = ERR_NO_ERROR): - key(key), err(err) { - // Safe managed copy of the value preserving the bytes - value_container_ = value; - this->value = value_container_.c_str(); + key_(key), err_(err) { + value_container_.assign(value); }; - - std::string key; - const char* value; - RdKafka::ErrorCode err; + + std::string key() const { + return key_; + } + + const char* value() const { + return value_container_.c_str(); + } + + RdKafka::ErrorCode err() const { + return err_; + } + private: + std::string key_; + RdKafka::ErrorCode err_; std::string value_container_; void *operator new(size_t); /* Prevent dynamic allocation */ }; @@ -1533,6 +1542,7 @@ class RD_EXPORT Headers { */ virtual std::vector
get_all() const = 0; + /** * @brief the count of all the Headers * diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 8c4f9d4846..776c3babb5 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -235,6 +235,122 @@ class HeadersImpl : public Headers { bool free_headers_; }; +class HeadersImpl : public Headers { + public: + HeadersImpl (size_t initial_size, bool free_rd_headers): + headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + + HeadersImpl (rd_kafka_headers_t *headers): + headers_ (headers), free_headers_ (false) {}; + + HeadersImpl (const std::vector
&headers, bool free_rd_headers): + free_headers_ (free_rd_headers) { + if (headers.size() > 0) { + headers_ = rd_kafka_headers_new(headers.size()); + from_vector(headers); + } else { + headers_ = rd_kafka_headers_new(8); + } + } + + ~HeadersImpl() { + if(free_headers_ && headers_) { + rd_kafka_headers_destroy(headers_); + } + } + + ErrorCode add(const std::string& key, const char* value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value, strlen(value)); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_remove (headers_, key.c_str()); + return static_cast(err); + } + + std::vector get(const std::string &key) const { + std::vector headers; + const void *value; + size_t size; + rd_kafka_resp_err_t err; + for (size_t idx = 0; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + idx++) { + if (value) { + const char* casted_value = static_cast(value); + headers.push_back(Headers::Header(key, casted_value)); + } + } + return headers; + } + + Headers::Header get_last(const std::string& key) const { + const void *value; + size_t size; + rd_kafka_resp_err_t err; + err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); + const char* casted_value = static_cast(value); + ErrorCode cpp_error = static_cast(err); + return Headers::Header(key, casted_value, cpp_error); + } + + std::vector get_all() const { + std::vector headers; + size_t idx = 0; + const char *name; + const void *valuep; + size_t size; + while (!rd_kafka_header_get_all(headers_, idx++, + &name, &valuep, &size)) { + if (valuep != NULL) { + const char* casted_value = static_cast(valuep); + headers.push_back(Headers::Header(name, casted_value)); + } + } + return headers; + } + + size_t size() const { + return rd_kafka_header_cnt(headers_); + } + + struct rd_kafka_headers_s* c_headers() { + return headers_; + } + + ErrorCode destroy_headers() { + if (headers_) { + rd_kafka_headers_destroy(headers_); + headers_ = 0; + return RdKafka::ERR_NO_ERROR; + } else { + return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; + } + } + + private: + void from_vector(const std::vector
&headers) { + if (headers.size() > 0) { + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); + it++) { + this->add(it->key(), it->value()); + } + } + } + + HeadersImpl(HeadersImpl const&) /*= delete*/; + HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; + + rd_kafka_headers_t* headers_; + bool free_headers_; +}; + class MessageImpl : public Message { public: diff --git a/tests/test.c b/tests/test.c index 8e83bce5c9..2baf032831 100644 --- a/tests/test.c +++ b/tests/test.c @@ -185,7 +185,6 @@ _TEST_DECL(0089_max_poll_interval); _TEST_DECL(0090_idempotence); _TEST_DECL(0091_max_poll_interval_timeout); - /* Manual tests */ _TEST_DECL(8000_idle);