Skip to content

Commit

Permalink
Improve the varint encoding for edges by avoiding copy and extra memo…
Browse files Browse the repository at this point in the history
…ry (#1516)

This pull request enhances the previous varint encoding implementation by:

- use serialized, batch encoding (not slow than previous implementation, thanks to batching)
- reuse the ie list when encoding, avoid extra memory allocation and copy
- use the shrink API to adjust the size of the encoded blob writer.

After this pull request, compacting edges with varint no longer requires extra memory,
and the encoding execution time has been optimized (almost half) as well.

Fixes #1373

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Aug 12, 2023
1 parent b85a478 commit 6d7dfc4
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 76 deletions.
9 changes: 8 additions & 1 deletion modules/basic/ds/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ template <typename T>
class ArrayBuilder : public ArrayBaseBuilder<T> {
public:
ArrayBuilder(Client& client, size_t size)
: ArrayBaseBuilder<T>(client), size_(size) {
: ArrayBaseBuilder<T>(client), client_(client), size_(size) {
VINEYARD_CHECK_OK(client.CreateBlob(size_ * sizeof(T), buffer_writer_));
this->data_ = reinterpret_cast<T*>(buffer_writer_->data());
}
Expand Down Expand Up @@ -67,6 +67,12 @@ class ArrayBuilder : public ArrayBaseBuilder<T> {
memcpy(data_, data, size_ * sizeof(T));
}

~ArrayBuilder() {
if (!this->sealed() && buffer_writer_ != nullptr) {
VINEYARD_DISCARD(buffer_writer_->Abort(client_));
}
}

/**
* @brief Get the size of the array, i.e., number of elements in the array.
*
Expand Down Expand Up @@ -108,6 +114,7 @@ class ArrayBuilder : public ArrayBaseBuilder<T> {
}

private:
Client& client_;
std::unique_ptr<BlobWriter> buffer_writer_;
T* data_;
size_t size_;
Expand Down
70 changes: 69 additions & 1 deletion modules/basic/ds/arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,81 @@ template class NumericArrayBuilder<double>;
template <typename T>
FixedNumericArrayBuilder<T>::FixedNumericArrayBuilder(Client& client,
const size_t size)
: NumericArrayBaseBuilder<T>(client), size_(size) {
: NumericArrayBaseBuilder<T>(client), client_(client), size_(size) {
if (size_ > 0) {
VINEYARD_CHECK_OK(client.CreateBlob(size_ * sizeof(T), writer_));
data_ = reinterpret_cast<T*>(writer_->data());
}
}

template <typename T>
FixedNumericArrayBuilder<T>::FixedNumericArrayBuilder(Client& client)
: NumericArrayBaseBuilder<T>(client), client_(client) {}

template <typename T>
FixedNumericArrayBuilder<T>::~FixedNumericArrayBuilder() {
if (!this->sealed() && writer_) {
VINEYARD_DISCARD(writer_->Abort(client_));
}
}

template <typename T>
Status FixedNumericArrayBuilder<T>::Make(
Client& client, const size_t size,
std::shared_ptr<FixedNumericArrayBuilder<T>>& out) {
out = std::shared_ptr<FixedNumericArrayBuilder<T>>(
new FixedNumericArrayBuilder<T>(client));
out->size_ = size;
if (out->size_ > 0) {
RETURN_ON_ERROR(client.CreateBlob(out->size_ * sizeof(T), out->writer_));
out->data_ = reinterpret_cast<T*>(out->writer_->data());
}
return Status::OK();
}

template <typename T>
Status FixedNumericArrayBuilder<T>::Make(
Client& client, std::unique_ptr<BlobWriter> writer, const size_t size,
std::shared_ptr<FixedNumericArrayBuilder<T>>& out) {
out = std::shared_ptr<FixedNumericArrayBuilder<T>>(
new FixedNumericArrayBuilder<T>(client));
out->size_ = size;
if (out->size_ > 0) {
if (!writer) {
return Status::Invalid(
"cannot make builder of size > 0 with a null buffer");
}
out->writer_ = std::move(writer);
out->data_ = reinterpret_cast<T*>(out->writer_->data());
}
return Status::OK();
}

template <typename T>
Status FixedNumericArrayBuilder<T>::Shrink(const size_t size) {
Status s;
if (writer_) {
s = writer_->Shrink(client_, size * sizeof(T));
if (s.ok()) {
size_ = size;
}
}
return s;
}

template <typename T>
Status FixedNumericArrayBuilder<T>::Release(
std::unique_ptr<BlobWriter>& writer) {
if (this->sealed()) {
return Status::ObjectSealed(
"sealed builder cannot release its internal buffer");
}
writer = std::move(writer_);
data_ = nullptr;
size_ = 0;
return Status::OK();
}

template <typename T>
size_t FixedNumericArrayBuilder<T>::size() const {
return size_;
Expand Down
79 changes: 77 additions & 2 deletions modules/basic/ds/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ class FixedNumericArrayBuilder : public NumericArrayBaseBuilder<T> {

FixedNumericArrayBuilder(Client& client, const size_t size);

~FixedNumericArrayBuilder();

static Status Make(Client& client, const size_t size,
std::shared_ptr<FixedNumericArrayBuilder<T>>& out);

static Status Make(Client& client, std::unique_ptr<BlobWriter> writer,
const size_t size,
std::shared_ptr<FixedNumericArrayBuilder<T>>& out);

Status Shrink(const size_t size);

Status Release(std::unique_ptr<BlobWriter>& writer);

size_t size() const;

T* MutablePointer(int64_t i) const;
Expand All @@ -100,6 +113,9 @@ class FixedNumericArrayBuilder : public NumericArrayBaseBuilder<T> {
Status Build(Client& client) override;

private:
explicit FixedNumericArrayBuilder(Client& client);

Client& client_;
size_t size_ = 0;
std::unique_ptr<BlobWriter> writer_ = nullptr;
T* data_ = nullptr;
Expand Down Expand Up @@ -225,14 +241,69 @@ class FixedSizeBinaryArrayBuilder : public FixedSizeBinaryArrayBaseBuilder {
template <typename T>
class PodArrayBuilder : public FixedSizeBinaryArrayBaseBuilder {
public:
explicit PodArrayBuilder(Client& client, size_t size)
: FixedSizeBinaryArrayBaseBuilder(client), size_(size) {
PodArrayBuilder(Client& client, size_t size)
: FixedSizeBinaryArrayBaseBuilder(client), client_(client), size_(size) {
if (size != 0) {
VINEYARD_CHECK_OK(client.CreateBlob(size * sizeof(T), buffer_));
data_ = reinterpret_cast<T*>(buffer_->Buffer()->mutable_data());
}
}

~PodArrayBuilder() {
if (!this->sealed() && buffer_) {
VINEYARD_DISCARD(buffer_->Abort(client_));
}
}

static Status Make(Client& client, const size_t size,
std::shared_ptr<PodArrayBuilder<T>>& out) {
out = std::shared_ptr<PodArrayBuilder<T>>(new PodArrayBuilder<T>(client));
out->size_ = size;
if (out->size_ > 0) {
RETURN_ON_ERROR(client.CreateBlob(out->size_ * sizeof(T), out->writer_));
out->data_ = reinterpret_cast<T*>(out->writer_->data());
}
return Status::OK();
}

static Status Make(Client& client, std::unique_ptr<BlobWriter> buffer,
const size_t size,
std::shared_ptr<PodArrayBuilder<T>>& out) {
out = std::shared_ptr<PodArrayBuilder<T>>(new PodArrayBuilder<T>(client));
out->size_ = size;
if (out->size_ > 0) {
if (!buffer) {
return Status::Invalid(
"cannot make builder of size > 0 with a null buffer");
}
out->buffer_ = std::move(buffer);
out->data_ = reinterpret_cast<T*>(out->buffer_->data());
}
return Status::OK();
}

Status Shrink(const size_t size) {
Status s;
if (buffer_) {
s = buffer_->Shrink(client_, size * sizeof(T));
if (s.ok()) {
size_ = size;
}
}
return s;
}

Status Release(std::unique_ptr<BlobWriter>& buffer) {
if (this->sealed()) {
return Status::ObjectSealed(
"sealed builder cannot release its internal buffer");
}
buffer = std::move(buffer_);
data_ = nullptr;
size_ = 0;
return Status::OK();
}

T* MutablePointer(int64_t i) const {
if (data_) {
return data_ + i;
Expand Down Expand Up @@ -260,6 +331,10 @@ class PodArrayBuilder : public FixedSizeBinaryArrayBaseBuilder {
}

private:
explicit PodArrayBuilder(Client& client)
: FixedSizeBinaryArrayBaseBuilder(client), client_(client) {}

Client& client_;
size_t size_;
std::unique_ptr<BlobWriter> buffer_;
T* data_ = nullptr;
Expand Down
10 changes: 5 additions & 5 deletions modules/graph/fragment/arrow_fragment_builder_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1353,11 +1353,11 @@ BasicArrowFragmentBuilder<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::initEdges(
<< (vineyard::GetCurrentTime() - gen_edge_start_time) << " seconds";

if (this->compact_edges_) {
varint_encoding_edges(client_, this->directed_, this->vertex_label_num_,
this->edge_label_num_, ie_lists_, oe_lists_,
compact_ie_lists_, compact_oe_lists_,
ie_offsets_lists_, oe_offsets_lists_,
ie_boffsets_lists_, oe_boffsets_lists_, concurrency);
BOOST_LEAF_CHECK(varint_encoding_edges(
client_, this->directed_, this->vertex_label_num_,
this->edge_label_num_, ie_lists_, oe_lists_, compact_ie_lists_,
compact_oe_lists_, ie_offsets_lists_, oe_offsets_lists_,
ie_boffsets_lists_, oe_boffsets_lists_, concurrency));
}
return {};
}
Expand Down
4 changes: 2 additions & 2 deletions modules/graph/fragment/property_graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ boost::leaf::result<void> varint_encoding_edges(
Client& client, const bool directed,
const property_graph_types::LABEL_ID_TYPE vertex_label_num,
const property_graph_types::LABEL_ID_TYPE edge_label_num,
const std::vector<std::vector<std::shared_ptr<
std::vector<std::vector<std::shared_ptr<
PodArrayBuilder<property_graph_utils::NbrUnit<VID_T, EID_T>>>>>&
ie_lists,
const std::vector<std::vector<std::shared_ptr<
std::vector<std::vector<std::shared_ptr<
PodArrayBuilder<property_graph_utils::NbrUnit<VID_T, EID_T>>>>>&
oe_lists,
std::vector<std::vector<std::shared_ptr<FixedUInt8Builder>>>&
Expand Down
Loading

0 comments on commit 6d7dfc4

Please sign in to comment.