Skip to content

Commit

Permalink
Header support for C++ API
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtrihy-genesys authored and edenhill committed Nov 26, 2018
1 parent 72756d7 commit c44fd07
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 9 deletions.
26 changes: 18 additions & 8 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1474,16 +1474,25 @@ class RD_EXPORT Headers {
Header(const std::string& key,
const char* value,
RdKafka::ErrorCode err = ERR_NO_ERROR):
key(key), err(err) {
// Safe managed copy of the value preserving the bytes
value_container_ = value;
this->value = value_container_.c_str();
key_(key), err_(err) {
value_container_.assign(value);
};

std::string key;
const char* value;
RdKafka::ErrorCode err;

std::string key() const {
return key_;
}

const char* value() const {
return value_container_.c_str();
}

RdKafka::ErrorCode err() const {
return err_;
}

private:
std::string key_;
RdKafka::ErrorCode err_;
std::string value_container_;
void *operator new(size_t); /* Prevent dynamic allocation */
};
Expand Down Expand Up @@ -1533,6 +1542,7 @@ class RD_EXPORT Headers {
*/
virtual std::vector<Header> get_all() const = 0;


/**
* @brief the count of all the Headers
*
Expand Down
116 changes: 116 additions & 0 deletions src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,122 @@ class HeadersImpl : public Headers {
bool free_headers_;
};

class HeadersImpl : public Headers {
public:
HeadersImpl (size_t initial_size, bool free_rd_headers):
headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {}

HeadersImpl (rd_kafka_headers_t *headers):
headers_ (headers), free_headers_ (false) {};

HeadersImpl (const std::vector<Header> &headers, bool free_rd_headers):
free_headers_ (free_rd_headers) {
if (headers.size() > 0) {
headers_ = rd_kafka_headers_new(headers.size());
from_vector(headers);
} else {
headers_ = rd_kafka_headers_new(8);
}
}

~HeadersImpl() {
if(free_headers_ && headers_) {
rd_kafka_headers_destroy(headers_);
}
}

ErrorCode add(const std::string& key, const char* value) {
rd_kafka_resp_err_t err;
err = rd_kafka_header_add(headers_,
key.c_str(), key.size(),
value, strlen(value));
return static_cast<RdKafka::ErrorCode>(err);
}

ErrorCode remove(const std::string& key) {
rd_kafka_resp_err_t err;
err = rd_kafka_header_remove (headers_, key.c_str());
return static_cast<RdKafka::ErrorCode>(err);
}

std::vector<Headers::Header> get(const std::string &key) const {
std::vector<Headers::Header> headers;
const void *value;
size_t size;
rd_kafka_resp_err_t err;
for (size_t idx = 0;
!(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\
idx++) {
if (value) {
const char* casted_value = static_cast<const char*>(value);
headers.push_back(Headers::Header(key, casted_value));
}
}
return headers;
}

Headers::Header get_last(const std::string& key) const {
const void *value;
size_t size;
rd_kafka_resp_err_t err;
err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size);
const char* casted_value = static_cast<const char*>(value);
ErrorCode cpp_error = static_cast<RdKafka::ErrorCode>(err);
return Headers::Header(key, casted_value, cpp_error);
}

std::vector<Headers::Header> get_all() const {
std::vector<Headers::Header> headers;
size_t idx = 0;
const char *name;
const void *valuep;
size_t size;
while (!rd_kafka_header_get_all(headers_, idx++,
&name, &valuep, &size)) {
if (valuep != NULL) {
const char* casted_value = static_cast<const char*>(valuep);
headers.push_back(Headers::Header(name, casted_value));
}
}
return headers;
}

size_t size() const {
return rd_kafka_header_cnt(headers_);
}

struct rd_kafka_headers_s* c_headers() {
return headers_;
}

ErrorCode destroy_headers() {
if (headers_) {
rd_kafka_headers_destroy(headers_);
headers_ = 0;
return RdKafka::ERR_NO_ERROR;
} else {
return RdKafka::ERR_OPERATION_NOT_ATTEMPTED;
}
}

private:
void from_vector(const std::vector<Header> &headers) {
if (headers.size() > 0) {
for (std::vector<Header>::const_iterator it = headers.begin();
it != headers.end();
it++) {
this->add(it->key(), it->value());
}
}
}

HeadersImpl(HeadersImpl const&) /*= delete*/;
HeadersImpl& operator=(HeadersImpl const&) /*= delete*/;

rd_kafka_headers_t* headers_;
bool free_headers_;
};


class MessageImpl : public Message {
public:
Expand Down
1 change: 0 additions & 1 deletion tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ _TEST_DECL(0089_max_poll_interval);
_TEST_DECL(0090_idempotence);
_TEST_DECL(0091_max_poll_interval_timeout);


/* Manual tests */
_TEST_DECL(8000_idle);

Expand Down

0 comments on commit c44fd07

Please sign in to comment.