Skip to content

Commit

Permalink
Header support for C++ API
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtrihy-genesys authored and edenhill committed Nov 26, 2018
1 parent 5422b75 commit 72756d7
Show file tree
Hide file tree
Showing 17 changed files with 594 additions and 30 deletions.
40 changes: 31 additions & 9 deletions examples/kafkatest_verifiable_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,17 @@ void msg_consume(RdKafka::KafkaConsumer *consumer,
" [" << (int)msg->partition() << "] at offset " <<
msg->offset() << std::endl;

RdKafka::Headers *headers = msg->get_headers();
if (headers) {
std::vector<RdKafka::Headers::Header> sheaders = headers->get_all();
std::cout << "Headers length: " << sheaders.size() << std::endl;
for(std::vector<RdKafka::Headers::Header>::const_iterator it = sheaders.begin();
it != sheaders.end();
it++) {
std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl;
}
}

if (state.maxMessages >= 0 &&
state.consumer.consumedMessages >= state.maxMessages)
return;
Expand Down Expand Up @@ -831,31 +842,42 @@ int main (int argc, char **argv) {
msg << value_prefix << i;
while (true) {
RdKafka::ErrorCode resp;
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
resp = producer->produce(topics[0], partition,
RdKafka::Headers *headers = 0;
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
std::string name = "kafkaheader";
std::string val = "header_val";
std::vector<RdKafka::Headers::Header> headers_arr;
headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str()));

headers = RdKafka::Headers::create(headers_arr, false);
resp = producer->produce(topics[0], partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(),
NULL, 0,
create_time,
NULL);
}
NULL, headers);
}

if (resp == RdKafka::ERR__QUEUE_FULL) {
producer->poll(100);
continue;
} else if (resp != RdKafka::ERR_NO_ERROR) {
headers->destroy_headers();
errorString("producer_send_error",
RdKafka::err2str(resp), topic->name(), NULL, msg.str());
state.producer.numErr++;
} else {
state.producer.numSent++;
}
if (headers) {
delete headers;
}
break;
}

Expand Down
1 change: 1 addition & 0 deletions src-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set(
ConfImpl.cpp
ConsumerImpl.cpp
HandleImpl.cpp
HeadersImpl.cpp
KafkaConsumerImpl.cpp
MessageImpl.cpp
MetadataImpl.cpp
Expand Down
49 changes: 49 additions & 0 deletions src-cpp/HeadersImpl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* librdkafka - Apache Kafka C/C++ library
*
* Copyright (c) 2014 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <iostream>
#include <string>
#include <list>
#include <cerrno>

#include "rdkafkacpp_int.h"

RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) {
return new RdKafka::HeadersImpl(initial_count, free_rd_headers);
}

RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers, bool free_rd_headers) {
if (headers.size() > 0) {
return new RdKafka::HeadersImpl(headers, free_rd_headers);
} else {
return 0;
}

}

RdKafka::Headers::~Headers() {}
2 changes: 1 addition & 1 deletion src-cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LIBVER= 1
CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \
ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \
TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \
QueueImpl.cpp MetadataImpl.cpp
HeadersImpl.cpp QueueImpl.cpp MetadataImpl.cpp

HDRS= rdkafkacpp.h

Expand Down
12 changes: 9 additions & 3 deletions src-cpp/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,19 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,

}


RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp,
void *msg_opaque) {
int64_t timestamp, void *msg_opaque,
RdKafka::Headers *headers) {
rd_kafka_headers_t *hdrs;
if (headers) {
hdrs = headers->c_headers();
} else {
hdrs = 0;
}
return
static_cast<RdKafka::ErrorCode>
(
Expand All @@ -162,6 +167,7 @@ RdKafka::ProducerImpl::produce (const std::string topic_name,
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_HEADERS(hdrs),
RD_KAFKA_V_END)
);
}
135 changes: 131 additions & 4 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ extern "C" {
struct rd_kafka_s;
struct rd_kafka_topic_s;
struct rd_kafka_message_s;
}
struct rd_kafka_headers_s;
};

namespace RdKafka {


/**
* @name Miscellaneous APIs
* @{
Expand Down Expand Up @@ -448,6 +448,7 @@ std::string err2str(RdKafka::ErrorCode err);
/* Forward declarations */
class Producer;
class Message;
class Headers;
class Queue;
class Event;
class Topic;
Expand Down Expand Up @@ -1449,7 +1450,130 @@ class RD_EXPORT MessageTimestamp {
int64_t timestamp; /**< Milliseconds since epoch (UTC). */
};

/**
* @brief Headers object
*
* This object encapsulates the C implementation logic into a C++ object
* for use in RdKafka::Messages object.
*/
class RD_EXPORT Headers {
public:
virtual ~Headers() = 0;

/**
* @brief Header object
*
* This object represents a single Header with key value pair
* and an ErrorCode
*
* @remark dynamic allocation of this object is not supported.
*
*/
class Header {
public:
Header(const std::string& key,
const char* value,
RdKafka::ErrorCode err = ERR_NO_ERROR):
key(key), err(err) {
// Safe managed copy of the value preserving the bytes
value_container_ = value;
this->value = value_container_.c_str();
};

std::string key;
const char* value;
RdKafka::ErrorCode err;
private:
std::string value_container_;
void *operator new(size_t); /* Prevent dynamic allocation */
};

/**
* @brief create a new instance of the Headers object
*/
static Headers *create(size_t initial_size = 8, bool free_rd_headers = true);

/**
* @brief create a new instance of the Headers object from a std::vector
*/
static Headers *create(const std::vector<Header> &headers, bool free_rd_headers = true);

/**
* @brief adds a Header to the end
*
* @returns An ErrorCode signalling a success or failure to add the Header.
*/
virtual ErrorCode add(const std::string& key, const char* value) = 0;

/**
* @brief removes all the Headers of a given key
*
* @returns An ErrorCode signalling a success or failure to remove the Header.
*/
virtual ErrorCode remove(const std::string& key) = 0;

/**
* @brief gets all of the Headers of a given key
*
* @returns a std::vector containing all the Headers of the given key.
*/
virtual std::vector<Header> get(const std::string &key) const = 0;

/**
* @brief gets the last occurrence of a Header of a given key
*
* @returns the Header if found, otherwise a Header with an ErrorCode
*/
virtual Header get_last(const std::string& key) const = 0;

/**
* @brief returns all the Headers of a Message
*
* @returns a std::vector containing all of the Headers of a message
*/
virtual std::vector<Header> get_all() const = 0;

/**
* @brief the count of all the Headers
*
* @returns a size_t count of all the headers
*/
virtual size_t size() const = 0;

/**
* @brief Returns the underlying librdkafka C rd_kafka_headers_t handle.
*
* @warning Calling the C API on this handle is not recommended and there
* is no official support for it, but for cases where the C++ API
* does not provide the underlying functionality this C handle can be
* used to interact directly with the core librdkafka API.
*
* @remark The lifetime of the returned pointer can be different than the lifetime
* of the Headers message due to how the producev function in the C API works
* if there is no error then the producev will take care of deallocation
* but if there is an error then it is the responsibility of the calling
* object to deallocate the underlying C implementation if an instance
* of the Headers object is created with free_rd_headers set to `false`
*
* @remark Include <rdkafka/rdkafka.h> prior to including
* <rdkafka/rdkafkacpp.h>
*
* @returns \c rd_kafka_headers_t*
*/
virtual struct rd_kafka_headers_s *c_headers() = 0;

/**
* @brief cleans up the underlying alocated C implementation headers if called
*
* @remark Safe to call even if the Headers object is set to clean up when
* when the destructor is called
*
* @remark Safe to call even if the underlyng C pointer is set to null
*
* @returns an ErrorCode signalling if the the operation was attempted
*/
virtual ErrorCode destroy_headers() = 0;
};

/**
* @brief Message object
Expand Down Expand Up @@ -1531,6 +1655,9 @@ class RD_EXPORT Message {
/** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */
virtual void *msg_opaque () const = 0;

/** @returns The Headers instance for this Message (if applicable) */
virtual RdKafka::Headers *get_headers() = 0;

virtual ~Message () = 0;

/** @returns the latency in microseconds for a produced message measured
Expand Down Expand Up @@ -2257,8 +2384,8 @@ class RD_EXPORT Producer : public virtual Handle {
int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp,
void *msg_opaque) = 0;
int64_t timestamp, void *msg_opaque,
RdKafka::Headers *headers) = 0;


/**
Expand Down
Loading

0 comments on commit 72756d7

Please sign in to comment.