From 6167ead4777a45de49f793a282248e9916efd523 Mon Sep 17 00:00:00 2001 From: "yusheng.ma" Date: Fri, 11 Oct 2024 11:22:05 +0800 Subject: [PATCH] add async build api Signed-off-by: yusheng.ma --- include/knowhere/expected.h | 1 + include/knowhere/index/index.h | 12 +++++- include/knowhere/index/index_node.h | 12 ++++++ include/knowhere/index/interrupt.h | 60 +++++++++++++++++++++++++++++ src/index/index.cc | 36 +++++++++++++++++ src/index/interrupt.cc | 57 +++++++++++++++++++++++++++ 6 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 include/knowhere/index/interrupt.h create mode 100644 src/index/interrupt.cc diff --git a/include/knowhere/expected.h b/include/knowhere/expected.h index ea487c4ff..efe263325 100644 --- a/include/knowhere/expected.h +++ b/include/knowhere/expected.h @@ -45,6 +45,7 @@ enum class Status { invalid_index_error = 23, invalid_cluster_error = 24, cluster_inner_error = 25, + timeout = 26, }; inline std::string diff --git a/include/knowhere/index/index.h b/include/knowhere/index/index.h index 9c3992f07..073f9727f 100644 --- a/include/knowhere/index/index.h +++ b/include/knowhere/index/index.h @@ -17,8 +17,9 @@ #include "knowhere/dataset.h" #include "knowhere/expected.h" #include "knowhere/index/index_node.h" - +#include "knowhere/index/interrupt.h" namespace knowhere { + template class Index { public: @@ -141,6 +142,15 @@ class Index { Status Build(const DataSetPtr dataset, const Json& json); +#ifdef KNOWHERE_WITH_CARDINAL + const std::unique_ptr + BuildAsync(const DataSetPtr dataset, const Json& json, + const std::chrono::seconds timeout = std::chrono::seconds::max()); +#else + const std::unique_ptr + BuildAsync(const DataSetPtr dataset, const Json& json); +#endif + Status Train(const DataSetPtr dataset, const Json& json); diff --git a/include/knowhere/index/index_node.h b/include/knowhere/index/index_node.h index ea3b98161..3200c624d 100644 --- a/include/knowhere/index/index_node.h +++ b/include/knowhere/index/index_node.h @@ -34,6 +34,8 @@ namespace knowhere { +class Interrupt; + class IndexNode : public Object { public: IndexNode(const int32_t ver) : version_(ver) { @@ -71,6 +73,16 @@ class IndexNode : public Object { return Add(dataset, std::move(cfg)); } +/* + *@ @brief Builds the index using the provided dataset,configuration and handle. + */ +#ifdef KNOWHERE_WITH_CARDINAL + virtual Status + BuildAsync(const DataSetPtr dataset, std::shared_ptr cfg, const Interrupt* = nullptr) { + return Build(dataset, std::move(cfg)); + } +#endif + /** * @brief Trains the index model using the provided dataset and configuration. * diff --git a/include/knowhere/index/interrupt.h b/include/knowhere/index/interrupt.h new file mode 100644 index 000000000..20a62465f --- /dev/null +++ b/include/knowhere/index/interrupt.h @@ -0,0 +1,60 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. +#ifndef KNOWHERE_INTERRUPT_H +#define KNOWHERE_INTERRUPT_H +#include +#include +#include + +#include "knowhere/expected.h" + +namespace folly { +template +class Future; + +} + +namespace knowhere { +class Interrupt { + public: +#ifdef KNOWHERE_WITH_CARDINAL + explicit Interrupt(const std::chrono::seconds& timeout); + + void + Stop(); + + bool + Flag() const; + + bool + IsTimeout() const; +#else + Interrupt(); +#endif + + Status + Get(); + + void + Set(folly::Future&& future); + + ~Interrupt(); + + private: +#ifdef KNOWHERE_WITH_CARDINAL + std::chrono::steady_clock::time_point start; + std::chrono::seconds timeout; + mutable std::atomic_bool flag = false; +#endif + std::unique_ptr> future = nullptr; +}; +} // namespace knowhere +#endif /* INTERRUPT_H */ diff --git a/src/index/index.cc b/src/index/index.cc index 78b826571..b20500537 100644 --- a/src/index/index.cc +++ b/src/index/index.cc @@ -12,6 +12,8 @@ #include "knowhere/index/index.h" #include "fmt/format.h" +#include "folly/futures/Future.h" +#include "knowhere/comp/thread_pool.h" #include "knowhere/comp/time_recorder.h" #include "knowhere/dataset.h" #include "knowhere/expected.h" @@ -34,6 +36,40 @@ LoadConfig(BaseConfig* cfg, const Json& json, knowhere::PARAM_TYPE param_type, c return Config::Load(*cfg, json_, param_type, msg); } +#ifdef KNOWHERE_WITH_CARDINAL +template +inline const std::unique_ptr +Index::BuildAsync(const DataSetPtr dataset, const Json& json, const std::chrono::seconds timeout) { + auto pool = ThreadPool::GetGlobalBuildThreadPool(); + auto interrupt = std::make_unique(timeout); + interrupt->Set(pool->push([this, dataset, &json, &interrupt]() { + auto cfg = this->node->CreateConfig(); + RETURN_IF_ERROR(LoadConfig(cfg.get(), json, knowhere::TRAIN, "Build")); + +#if defined(NOT_COMPILE_FOR_SWIG) && !defined(KNOWHERE_WITH_LIGHT) + TimeRecorder rc("BuildAsync index ", 2); + auto res = this->node->BuildAsync(dataset, std::move(cfg), interrupt.get()); + auto time = rc.ElapseFromBegin("done"); + time *= 0.000001; // convert to s + knowhere_build_latency.Observe(time); +#else + auto res = this->node->BuildAsync(dataset, std::move(cfg), Interrupt.get()); +#endif + return res; + })); + return interrupt; +} +#else +template +inline const std::unique_ptr +Index::BuildAsync(const DataSetPtr dataset, const Json& json) { + auto pool = ThreadPool::GetGlobalBuildThreadPool(); + auto interrupt = std::make_unique(); + interrupt->Set(pool->push([this, &dataset, &json]() { return this->Build(dataset, json); })); + return interrupt; +} +#endif + template inline Status Index::Build(const DataSetPtr dataset, const Json& json) { diff --git a/src/index/interrupt.cc b/src/index/interrupt.cc new file mode 100644 index 000000000..d1e801084 --- /dev/null +++ b/src/index/interrupt.cc @@ -0,0 +1,57 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "knowhere/index/interrupt.h" + +#include "folly/futures/Future.h" +namespace knowhere { +#ifdef KNOWHERE_WITH_CARDINAL +Interrupt::Interrupt(const std::chrono::seconds& timeout) : start(std::chrono::steady_clock::now()), timeout(timeout) { +} +#else +Interrupt::Interrupt() = default; +#endif + +#ifdef KNOWHERE_WITH_CARDINAL +void +Interrupt::Stop() { + this->flag.store(true); +}; + +bool +Interrupt::Flag() const { + return this->flag.load(); +} + +bool +Interrupt::IsTimeout() const { + auto now = std::chrono::steady_clock::now(); + auto dur = std::chrono::duration_cast(now - start); + return dur.count() > timeout.count(); +} +#endif +Status +Interrupt::Get() { + future->wait(); + if (this->Flag() || this->IsTimeout()) + return Status::timeout; + return std::move(*future).get(); +} + +void +Interrupt::Set(folly::Future&& future) { + this->future = std::make_unique>(std::move(future)); +} + +Interrupt::~Interrupt() { +} + +} // namespace knowhere