diff --git a/modules/basic/ds/arrow_utils.cc b/modules/basic/ds/arrow_utils.cc index 0c039c08..0b213965 100644 --- a/modules/basic/ds/arrow_utils.cc +++ b/modules/basic/ds/arrow_utils.cc @@ -47,6 +47,8 @@ std::shared_ptr ConcatenateTables( std::shared_ptr FromAnyType(AnyType type) { switch (type) { + case AnyType::Undefined: + return arrow::null(); case AnyType::Int32: return arrow::int32(); case AnyType::UInt32: @@ -59,6 +61,12 @@ std::shared_ptr FromAnyType(AnyType type) { return arrow::float32(); case AnyType::Double: return arrow::float64(); + case AnyType::String: + return arrow::large_utf8(); + case AnyType::Date32: + return arrow::int32(); + case AnyType::Date64: + return arrow::int64(); default: return arrow::null(); } diff --git a/modules/basic/ds/dataframe.cc b/modules/basic/ds/dataframe.cc index fbc27b24..cc144d1a 100644 --- a/modules/basic/ds/dataframe.cc +++ b/modules/basic/ds/dataframe.cc @@ -78,19 +78,42 @@ const std::shared_ptr DataFrame::AsBatch(bool copy) const { } else if (auto tensor = std::dynamic_pointer_cast>(df_col)) { num_rows = tensor->shape()[0]; + } else if (auto tensor = + std::dynamic_pointer_cast>(df_col)) { + num_rows = tensor->shape()[0]; } - std::shared_ptr copied_buffer; - if (copy) { - CHECK_ARROW_ERROR_AND_ASSIGN( - copied_buffer, - df_col->buffer()->CopySlice(0, df_col->buffer()->size())); - } else { - copied_buffer = df_col->buffer(); + std::vector> buffer{ + nullptr /* null bitmap */}; + + // process the second buffer for std::string type + if (auto tensor = std::dynamic_pointer_cast>(df_col)) { + std::shared_ptr copied_buffer; + if (copy) { + CHECK_ARROW_ERROR_AND_ASSIGN( + copied_buffer, + df_col->buffer()->CopySlice(0, df_col->auxiliary_buffer()->size())); + } else { + copied_buffer = df_col->buffer(); + } + buffer.push_back(copied_buffer); + } + + // process buffer + { + std::shared_ptr copied_buffer; + if (copy) { + CHECK_ARROW_ERROR_AND_ASSIGN( + copied_buffer, + df_col->buffer()->CopySlice(0, df_col->buffer()->size())); + } else { + copied_buffer = df_col->buffer(); + } + buffer.push_back(copied_buffer); } columns[i] = arrow::MakeArray(arrow::ArrayData::Make( - FromAnyType(df_col->value_type()), num_rows, {nullptr, copied_buffer})); + FromAnyType(df_col->value_type()), num_rows, buffer)); std::shared_ptr sca; CHECK_ARROW_ERROR_AND_ASSIGN(sca, columns[i]->GetScalar(0)); diff --git a/modules/basic/ds/tensor.h b/modules/basic/ds/tensor.h index 8065dc84..0f8cf914 100644 --- a/modules/basic/ds/tensor.h +++ b/modules/basic/ds/tensor.h @@ -51,6 +51,11 @@ class ITensorBuilder { */ template class TensorBuilder : public ITensorBuilder, public TensorBaseBuilder { + public: + using value_t = T; + using value_pointer_t = T*; + using value_const_pointer_t = const T*; + public: /** * @brief Initialize the TensorBuilder with the tensor shape. @@ -136,7 +141,7 @@ class TensorBuilder : public ITensorBuilder, public TensorBaseBuilder { * @brief Get the data pointer of the tensor. * */ - inline T* data() const { return this->data_; } + inline value_pointer_t data() const { return this->data_; } /** * @brief Build the tensor. @@ -153,6 +158,147 @@ class TensorBuilder : public ITensorBuilder, public TensorBaseBuilder { T* data_; }; +/** + * @brief TensorBuilder is used for building tensors that supported by vineyard + * + * @tparam T + */ +template <> +class TensorBuilder : public ITensorBuilder, + public TensorBaseBuilder { + public: + using value_t = detail::arrow_string_view; + using value_pointer_t = uint8_t*; + using value_const_pointer_t = const uint8_t*; + + public: + /** + * @brief Initialize the TensorBuilder with the tensor shape. + * + * @param client The client connected to the vineyard server. + * @param shape The shape of the tensor. + */ + TensorBuilder(Client& client, std::vector const& shape) + : TensorBaseBuilder(client) { + this->set_value_type_(AnyType(AnyTypeEnum::value)); + this->set_shape_(shape); + this->buffer_writer_ = std::make_shared(); + } + + /** + * @brief Initialize the TensorBuilder for a partition of a GlobalTensor. + * + * @param client The client connected to the vineyard server. + * @param shape The shape of the partition. + * @param partition_index The partition index in the global tensor. + */ + TensorBuilder(Client& client, std::vector const& shape, + std::vector const& partition_index) + : TensorBuilder(client, shape) { + this->set_partition_index_(partition_index); + } + + /** + * @brief Get the shape of the tensor. + * + * @return The shape vector where the ith element represents + * the size of the ith axis. + */ + std::vector const& shape() const { return this->shape_; } + + /** + * @brief Get the index of this partition in the global tensor. + * + * @return The index vector where the ith element represents the index + * in the ith axis. + */ + std::vector const& partition_index() const { + return this->partition_index_; + } + + /** + * @brief Set the shape of the tensor. + * + * @param shape The vector for the shape, where the ith element + * represents the size of the shape in the ith axis. + */ + void set_shape(std::vector const& shape) { this->set_shape_(shape); } + + /** + * @brief Set the index in the global tensor. + * + * @param partition_index The vector of indices, where the ith element + * represents the index in the ith axis. + */ + void set_partition_index(std::vector const& partition_index) { + this->set_partition_index_(partition_index); + } + + /** + * @brief Get the strides of the tensor. + * + * @return The strides of the tensor. The definition of the tensor's strides + * can be found in https://pytorch.org/docs/stable/tensor_attributes.html + */ + std::vector strides() const { + std::vector vec(this->shape_.size()); + vec[this->shape_.size() - 1] = 1 /* special case for std::string */; + for (size_t i = this->shape_.size() - 1; i > 0; --i) { + vec[i - 1] = vec[i] * this->shape_[i]; + } + return vec; + } + + /** + * @brief Get the data pointer of the tensor. + * + */ + inline value_pointer_t data() const { + return const_cast(this->buffer_writer_->value_data()); + } + + /** + * @brief Append value to the builder. + */ + inline Status Append(value_t const& value) { + RETURN_ON_ARROW_ERROR( + this->buffer_writer_->Append(value.data(), value.size())); + return Status::OK(); + } + + /** + * @brief Append value to the builder. + */ + inline Status Append(value_const_pointer_t value, const size_t length) { + RETURN_ON_ARROW_ERROR(this->buffer_writer_->Append(value, length)); + return Status::OK(); + } + + /** + * @brief Append value to the builder. + */ + inline Status Append(std::string const& value) { + RETURN_ON_ARROW_ERROR(this->buffer_writer_->Append(value)); + return Status::OK(); + } + + /** + * @brief Build the tensor. + * + * @param client The client connceted to the vineyard server. + */ + Status Build(Client& client) override { + std::shared_ptr array; + RETURN_ON_ARROW_ERROR_AND_ASSIGN(array, buffer_writer_->Finish()); + this->set_buffer_(std::make_shared( + client, std::dynamic_pointer_cast(array))); + return Status::OK(); + } + + private: + std::shared_ptr buffer_writer_; +}; + class GlobalTensorBaseBuilder; /** diff --git a/modules/basic/ds/tensor.vineyard-mod b/modules/basic/ds/tensor.vineyard-mod index 21a91c51..86da6ce8 100644 --- a/modules/basic/ds/tensor.vineyard-mod +++ b/modules/basic/ds/tensor.vineyard-mod @@ -30,6 +30,7 @@ limitations under the License. #include "arrow/io/api.h" #include "basic/ds/array.vineyard.h" +#include "basic/ds/arrow.h" #include "basic/ds/types.h" #include "client/client.h" #include "client/ds/blob.h" @@ -43,20 +44,6 @@ namespace vineyard { #pragma GCC diagnostic ignored "-Wattributes" #endif -template -class TensorBaseBuilder; - -class ITensor : public Object { - public: - [[shared]] virtual std::vector const& shape() const = 0; - - [[shared]] virtual std::vector const& partition_index() const = 0; - - [[shared]] virtual AnyType value_type() const = 0; - - [[shared]] virtual const std::shared_ptr buffer() const = 0; -}; - namespace detail { #if ARROW_VERSION_MAJOR >= 10 @@ -77,10 +64,29 @@ struct ArrowTensorType { } // namespace detail +template +class TensorBaseBuilder; + +class ITensor : public Object { + public: + [[shared]] virtual std::vector const& shape() const = 0; + + [[shared]] virtual std::vector const& partition_index() const = 0; + + [[shared]] virtual AnyType value_type() const = 0; + + [[shared]] virtual const std::shared_ptr buffer() const = 0; + + [[shared]] virtual const std::shared_ptr auxiliary_buffer() + const = 0; +}; + template class [[vineyard]] Tensor : public ITensor, public BareRegistered> { public: using value_t = T; + using value_pointer_t = T*; + using value_const_pointer_t = const T*; using ArrowTensorT = typename detail::ArrowTensorType::type; /** @@ -130,10 +136,19 @@ class [[vineyard]] Tensor : public ITensor, public BareRegistered> { * * @return The data pointer. */ - [[shared]] const T* data() const { + [[shared]] value_const_pointer_t data() const { return reinterpret_cast(buffer_->data()); } + /** + * @brief Get the data in the tensor by index. + * + * @return The data reference. + */ + [[shared]] const value_t operator[](size_t index) const { + return this->data()[index]; + } + /** * @brief Get the buffer of the tensor. * @@ -144,12 +159,16 @@ class [[vineyard]] Tensor : public ITensor, public BareRegistered> { return this->buffer_->Buffer(); } + [[shared]] const std::shared_ptr auxiliary_buffer() + const override { + return nullptr; + } + /** * @brief Return a view of the original tensor so that it can be used as * arrow's Tensor. * */ - template ::value>> [[shared]] const std::shared_ptr ArrowTensor() { return std::make_shared(buffer_->Buffer(), shape()); } @@ -164,6 +183,117 @@ class [[vineyard]] Tensor : public ITensor, public BareRegistered> { friend class TensorBaseBuilder; }; +template <> +class [[vineyard]] Tensor + : public ITensor, public BareRegistered> { + public: + using value_t = detail::arrow_string_view; + using value_pointer_t = uint8_t*; + using value_const_pointer_t = const uint8_t*; + using ArrowTensorT = + typename detail::ArrowTensorType::type; + + /** + * @brief Get the strides of the tensor. + * + * @return The strides of the tensor. The definition of the tensor's strides + * can be found in https://pytorch.org/docs/stable/tensor_attributes.html + */ + [[shared]] std::vector strides() const { + std::vector vec(shape_.size()); + vec[shape_.size() - 1] = 1 /* special case for tensors */; + for (size_t i = shape_.size() - 1; i > 0; --i) { + vec[i - 1] = vec[i] * shape_[i]; + } + return vec; + } + + /** + * @brief Get the shape of the tensor. + * + * @return The shape vector where the ith element represents + * the size of the ith axis. + */ + [[shared]] std::vector const& shape() const override { + return shape_; + } + + /** + * @brief Get the index of this partition in the global tensor. + * + * @return The index vector where the ith element represents the index + * in the ith axis. + */ + [[shared]] std::vector const& partition_index() const override { + return partition_index_; + } + + /** + * @brief Get the type of tensor's elements. + * + * @return The type of the tensor's elements. + */ + [[shared]] AnyType value_type() const override { return this->value_type_; } + + /** + * @brief Get the data pointer to the tensor's data buffer. + * + * @return The data pointer. + */ + [[shared]] value_const_pointer_t data() const { + return this->buffer_->GetArray()->raw_data(); + } + + /** + * @brief Get the data in the tensor by index. + * + * @return The data reference. + */ + [[shared]] const value_t operator[](size_t index) const { + return this->buffer_->GetArray()->GetView(index); + } + + /** + * @brief Get the buffer of the tensor. + * + * @return The shared pointer to an arrow buffer which + * holds the data buffer of the tensor. + */ + [[shared]] const std::shared_ptr buffer() const override { + return this->buffer_->GetArray()->value_data(); + } + + /** + * @brief Get the buffer of the tensor. + * + * @return The shared pointer to an arrow buffer which + * holds the data buffer of the tensor. + */ + [[shared]] const std::shared_ptr auxiliary_buffer() + const override { + return this->buffer_->GetArray()->value_data(); + } + + /** + * @brief Return a view of the original tensor so that it can be used as + * arrow's Tensor. + * + */ + [[shared]] const std::shared_ptr ArrowTensor() { + // No corresponding arrow tensor type for std::string. + return nullptr; + } + + private: + [[shared]] AnyType value_type_; + [[shared]] std::shared_ptr buffer_; + [[shared]] Tuple shape_; + [[shared]] Tuple partition_index_; + + friend class Client; + friend class TensorBaseBuilder; +}; + #ifdef __GNUC__ #pragma GCC diagnostic pop #endif diff --git a/modules/graph/fragment/arrow_fragment.vineyard-mod b/modules/graph/fragment/arrow_fragment.vineyard-mod index 33051839..62fd0cac 100644 --- a/modules/graph/fragment/arrow_fragment.vineyard-mod +++ b/modules/graph/fragment/arrow_fragment.vineyard-mod @@ -268,6 +268,11 @@ class [[vineyard]] ArrowFragment return IsInnerVertex(v) ? GetInnerVertexId(v) : GetOuterVertexId(v); } + internal_oid_t GetInternalId(const vertex_t& v) const { + return IsInnerVertex(v) ? GetInnerVertexInternalId(v) + : GetOuterVertexInternalId(v); + } + fid_t GetFragId(const vertex_t& u) const { return IsInnerVertex(u) ? fid_ : vid_parser_.GetFid(GetOuterVertexGid(u)); } @@ -356,19 +361,27 @@ class [[vineyard]] ArrowFragment } inline oid_t GetInnerVertexId(const vertex_t& v) const { + return oid_t(GetInnerVertexInternalId(v)); + } + + inline internal_oid_t GetInnerVertexInternalId(const vertex_t& v) const { internal_oid_t internal_oid; vid_t gid = vid_parser_.GenerateId(fid_, vid_parser_.GetLabelId(v.GetValue()), vid_parser_.GetOffset(v.GetValue())); CHECK(vm_ptr_->GetOid(gid, internal_oid)); - return oid_t(internal_oid); + return internal_oid; } inline oid_t GetOuterVertexId(const vertex_t& v) const { + return oid_t(GetOuterVertexInternalId(v)); + } + + inline internal_oid_t GetOuterVertexInternalId(const vertex_t& v) const { vid_t gid = GetOuterVertexGid(v); internal_oid_t internal_oid; CHECK(vm_ptr_->GetOid(gid, internal_oid)); - return oid_t(internal_oid); + return internal_oid; } inline oid_t Gid2Oid(const vid_t& gid) const { diff --git a/python/vineyard/core/codegen/cpp.py b/python/vineyard/core/codegen/cpp.py index d19f7516..96dbce12 100644 --- a/python/vineyard/core/codegen/cpp.py +++ b/python/vineyard/core/codegen/cpp.py @@ -249,7 +249,7 @@ def codegen_construct( base_builder_tpl = ''' {class_header} -class {class_name}BaseBuilder: public ObjectBuilder {{ +class {class_name}BaseBuilder{type_params}: public ObjectBuilder {{ public: {using_alias} @@ -675,6 +675,7 @@ def codegen_using_alia(alia, extent): def codegen_base_builder( class_header, + type_parameters, class_name, class_name_elaborated, fields, @@ -714,8 +715,14 @@ def codegen_base_builder( else: post_ctor = '' + if type_parameters: + type_params = '<' + ', '.join(type_parameters) + '>' + else: + type_params = '' + code = base_builder_tpl.format( class_header=class_header, + type_params=type_params, class_name=class_name, class_name_elaborated=class_name_elaborated, post_construct=post_ctor, @@ -827,6 +834,7 @@ def generate_base_builder( fields, using_alias_values, header, + type_parameters, name, name_elaborated, has_post_ctor, @@ -836,6 +844,7 @@ def generate_base_builder( print('base_builder: ', name, [(n.type.spelling, n.spelling) for n in fields]) return codegen_base_builder( header, + type_parameters, name, name_elaborated, fields, @@ -883,7 +892,13 @@ def codegen( # pylint: disable=too-many-statements code_blocks = [] for kind, namespaces, node in to_reflect: - fields, using_alias, first_member_offset, has_post_ctor = find_fields(node) + ( + fields, + using_alias, + type_parameters, + first_member_offset, + has_post_ctor, + ) = find_fields(node) name, ts = check_class(node) members, _methods = split_members_and_methods(fields) @@ -899,10 +914,12 @@ def codegen( # pylint: disable=too-many-statements content[t.start.offset : t.end.offset] for (_, t) in ts ] # with `typename` - name_elaborated = generate_template_type(name, ts_names) + name_elaborated = generate_template_type(name, ts_names, type_parameters) - header = generate_template_header(ts_names) - header_elaborated = generate_template_header(ts_name_values) + header = generate_template_header(ts_names, type_parameters) + header_elaborated = generate_template_header( + ts_name_values, type_parameters + ) meth_create = generate_create_meth(header, name, name_elaborated) meth_construct = generate_construct_meth( @@ -927,6 +944,7 @@ def codegen( # pylint: disable=too-many-statements members, using_alias_values, header_elaborated, + type_parameters, name, name_elaborated, has_post_ctor, diff --git a/python/vineyard/core/codegen/parsing.py b/python/vineyard/core/codegen/parsing.py index 2caa61f8..fb9693fd 100644 --- a/python/vineyard/core/codegen/parsing.py +++ b/python/vineyard/core/codegen/parsing.py @@ -449,16 +449,25 @@ def traverse(node, to_reflect, to_include, namespaces=None): def find_fields(definition): - fields, using_alias, first_mmeber_offset, has_post_construct = [], [], -1, False + fields = [] + using_alias = [] + type_parameters = [] + first_mmeber_offset = -1 + has_post_construct = False for child in definition.get_children(): if first_mmeber_offset == -1: if child.kind not in [ CursorKind.TEMPLATE_TYPE_PARAMETER, CursorKind.CXX_BASE_SPECIFIER, CursorKind.ANNOTATE_ATTR, + CursorKind.NAMESPACE_REF, + CursorKind.TYPE_REF, ]: first_mmeber_offset = child.extent.start.offset + if child.kind == CursorKind.TYPE_REF: + type_parameters.append(child.spelling) + if child.kind == CursorKind.FIELD_DECL: attribute = check_serialize_attribute(child) if attribute in ['shared', 'distributed']: @@ -484,7 +493,7 @@ def find_fields(definition): using_alias.append((child.spelling, child.extent)) continue - return fields, using_alias, first_mmeber_offset, has_post_construct + return fields, using_alias, type_parameters, first_mmeber_offset, has_post_construct def find_distributed_field(definitions: List["CursorKind"]) -> "CursorKind": @@ -524,9 +533,12 @@ def check_class(node): return node.spelling, template_parameters -def generate_template_header(ts): +def generate_template_header(ts, type_parameters): if not ts: - return '' + if not type_parameters: + return '' + else: + return 'template <>' ps = [] for t in ts: if t.startswith('typename'): @@ -536,7 +548,9 @@ def generate_template_header(ts): return 'template<{ps}>'.format(ps=', '.join(ps)) -def generate_template_type(name, ts): +def generate_template_type(name, ts, type_parameters): + if type_parameters: + return '{name}<{ps}>'.format(name=name, ps=', '.join(type_parameters)) if not ts: return name return '{name}<{ps}>'.format(name=name, ps=', '.join(ts)) diff --git a/python/vineyard/data/tensor.py b/python/vineyard/data/tensor.py index b0dbfd51..ef100497 100644 --- a/python/vineyard/data/tensor.py +++ b/python/vineyard/data/tensor.py @@ -74,6 +74,14 @@ def numpy_ndarray_resolver(obj): return pickle.loads(view, fix_imports=True) value_type = normalize_dtype(value_type_name, meta.get('value_type_meta_', None)) + # process string ndarray from c++ + if value_type_name in ['str', 'string', 'std::string', 'std::__1::string']: + from .arrow import string_array_resolver + + return string_array_resolver(obj.member('buffer_')).to_numpy( + zero_copy_only=False + ) + shape = from_json(meta['shape_']) if 'order_' in meta: order = from_json(meta['order_'])