Skip to content

Commit

Permalink
API changes, and some refactoring and cleanup of C++ Headers
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Nov 26, 2018
1 parent 9821a3d commit a3753a7
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 508 deletions.
68 changes: 47 additions & 21 deletions examples/rdkafka_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class MyHashPartitionerCb : public RdKafka::PartitionerCb {
};

void msg_consume(RdKafka::Message* message, void* opaque) {
const RdKafka::Headers *headers;

switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
Expand All @@ -213,6 +215,20 @@ void msg_consume(RdKafka::Message* message, void* opaque) {
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0 ; i < hdrs.size() ; i++) {
const RdKafka::Headers::Header hdr = hdrs[i];

if (hdr.value() != NULL)
printf(" Header: %s = \"%.*s\"\n",
hdr.key().c_str(),
(int)hdr.value_size(), (const char *)hdr.value());
else
printf(" Header: %s = NULL\n", hdr.key().c_str());
}
}
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
Expand Down Expand Up @@ -479,6 +495,8 @@ int main (int argc, char **argv) {
/* Set delivery report callback */
conf->set("dr_cb", &ex_dr_cb, errstr);

conf->set("default_topic_conf", tconf, errstr);

/*
* Create producer using accumulated global configuration.
*/
Expand All @@ -490,15 +508,6 @@ int main (int argc, char **argv) {

std::cout << "% Created producer " << producer->name() << std::endl;

/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}

/*
* Read messages from stdin and produce to broker.
Expand All @@ -509,20 +518,36 @@ int main (int argc, char **argv) {
continue;
}

RdKafka::Headers *headers = RdKafka::Headers::create();
headers->add("my header", "header value");
headers->add("other header", "yes");

/*
* Produce message
*/
RdKafka::ErrorCode resp =
producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(line.c_str()), line.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
else
std::cerr << "% Produced message (" << line.size() << " bytes)" <<
std::endl;
producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
/* Value */
const_cast<char *>(line.c_str()), line.size(),
/* Key */
NULL, 0,
/* Timestamp (defaults to now) */
0,
/* Message headers, if any */
headers,
/* Per-message opaque value passed to
* delivery report */
NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
delete headers; /* Headers are automatically deleted on produce()
* success. */
} else {
std::cerr << "% Produced message (" << line.size() << " bytes)" <<
std::endl;
}

producer->poll(0);
}
Expand All @@ -533,7 +558,6 @@ int main (int argc, char **argv) {
producer->poll(1000);
}

delete topic;
delete producer;


Expand Down Expand Up @@ -635,7 +659,7 @@ int main (int argc, char **argv) {
RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic,
&metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "%% Failed to acquire metadata: "
std::cerr << "%% Failed to acquire metadata: "
<< RdKafka::err2str(err) << std::endl;
run = 0;
break;
Expand All @@ -649,6 +673,8 @@ int main (int argc, char **argv) {

}

delete conf;
delete tconf;

/*
* Wait for RdKafka to decommission.
Expand Down
14 changes: 6 additions & 8 deletions src-cpp/HeadersImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@

#include "rdkafkacpp_int.h"

RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) {
return new RdKafka::HeadersImpl(initial_count, false);
RdKafka::Headers *RdKafka::Headers::create() {
return new RdKafka::HeadersImpl();
}

RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers) {
if (headers.size() > 0) {
return new RdKafka::HeadersImpl(headers, false);
} else {
return new RdKafka::HeadersImpl(8, false);
}

if (headers.size() > 0)
return new RdKafka::HeadersImpl(headers);
else
return new RdKafka::HeadersImpl();
}

RdKafka::Headers::~Headers() {}
43 changes: 26 additions & 17 deletions src-cpp/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,34 @@ RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp, void *msg_opaque,
RdKafka::Headers *headers) {
int64_t timestamp,
RdKafka::Headers *headers,
void *msg_opaque) {
rd_kafka_headers_t *hdrs = NULL;
RdKafka::HeadersImpl *headersimpl = NULL;
rd_kafka_resp_err_t err;

if (headers) {
hdrs = headers->c_headers();
headersimpl = static_cast<RdKafka::HeadersImpl*>(headers);
hdrs = headersimpl->c_ptr();
}

return
static_cast<RdKafka::ErrorCode>
(
rd_kafka_producev(rk_,
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(payload, len),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_HEADERS(hdrs),
RD_KAFKA_V_END)
);
err = rd_kafka_producev(rk_,
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(payload, len),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_HEADERS(hdrs),
RD_KAFKA_V_END);

if (!err && headersimpl) {
/* A successful producev() call will destroy the C headers. */
headersimpl->c_headers_destroyed();
delete headers;
}

return static_cast<RdKafka::ErrorCode>(err);
}
Loading

0 comments on commit a3753a7

Please sign in to comment.