Skip to content

Commit

Permalink
Applied most review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtrihy-genesys authored and edenhill committed Nov 26, 2018
1 parent c44fd07 commit 9821a3d
Show file tree
Hide file tree
Showing 11 changed files with 531 additions and 209 deletions.
40 changes: 9 additions & 31 deletions examples/kafkatest_verifiable_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,6 @@ void msg_consume(RdKafka::KafkaConsumer *consumer,
" [" << (int)msg->partition() << "] at offset " <<
msg->offset() << std::endl;

RdKafka::Headers *headers = msg->get_headers();
if (headers) {
std::vector<RdKafka::Headers::Header> sheaders = headers->get_all();
std::cout << "Headers length: " << sheaders.size() << std::endl;
for(std::vector<RdKafka::Headers::Header>::const_iterator it = sheaders.begin();
it != sheaders.end();
it++) {
std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl;
}
}

if (state.maxMessages >= 0 &&
state.consumer.consumedMessages >= state.maxMessages)
return;
Expand Down Expand Up @@ -842,42 +831,31 @@ int main (int argc, char **argv) {
msg << value_prefix << i;
while (true) {
RdKafka::ErrorCode resp;
RdKafka::Headers *headers = 0;
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
std::string name = "kafkaheader";
std::string val = "header_val";
std::vector<RdKafka::Headers::Header> headers_arr;
headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str()));

headers = RdKafka::Headers::create(headers_arr, false);
resp = producer->produce(topics[0], partition,
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
resp = producer->produce(topics[0], partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(),
NULL, 0,
create_time,
NULL, headers);
}
NULL);
}

if (resp == RdKafka::ERR__QUEUE_FULL) {
producer->poll(100);
continue;
} else if (resp != RdKafka::ERR_NO_ERROR) {
headers->destroy_headers();
errorString("producer_send_error",
RdKafka::err2str(resp), topic->name(), NULL, msg.str());
state.producer.numErr++;
} else {
state.producer.numSent++;
}
if (headers) {
delete headers;
}
break;
}

Expand Down
10 changes: 5 additions & 5 deletions src-cpp/HeadersImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@

#include "rdkafkacpp_int.h"

RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) {
return new RdKafka::HeadersImpl(initial_count, free_rd_headers);
RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) {
return new RdKafka::HeadersImpl(initial_count, false);
}

RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers, bool free_rd_headers) {
RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers) {
if (headers.size() > 0) {
return new RdKafka::HeadersImpl(headers, free_rd_headers);
return new RdKafka::HeadersImpl(headers, false);
} else {
return 0;
return new RdKafka::HeadersImpl(8, false);
}

}
Expand Down
26 changes: 23 additions & 3 deletions src-cpp/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,39 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,

}

RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp, void *msg_opaque) {
return
static_cast<RdKafka::ErrorCode>
(
rd_kafka_producev(rk_,
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(payload, len),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_END)
);
}

RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp, void *msg_opaque,
RdKafka::Headers *headers) {
rd_kafka_headers_t *hdrs;
rd_kafka_headers_t *hdrs = NULL;
if (headers) {
hdrs = headers->c_headers();
} else {
hdrs = 0;
}

return
static_cast<RdKafka::ErrorCode>
(
Expand Down
Loading

0 comments on commit 9821a3d

Please sign in to comment.