Skip to content

Commit

Permalink
add async build api
Browse files Browse the repository at this point in the history
Signed-off-by: yusheng.ma <[email protected]>
  • Loading branch information
Presburger committed Oct 11, 2024
1 parent 4534162 commit 6167ead
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 1 deletion.
1 change: 1 addition & 0 deletions include/knowhere/expected.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum class Status {
invalid_index_error = 23,
invalid_cluster_error = 24,
cluster_inner_error = 25,
timeout = 26,
};

inline std::string
Expand Down
12 changes: 11 additions & 1 deletion include/knowhere/index/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
#include "knowhere/dataset.h"
#include "knowhere/expected.h"
#include "knowhere/index/index_node.h"

#include "knowhere/index/interrupt.h"
namespace knowhere {

template <typename T1>
class Index {
public:
Expand Down Expand Up @@ -141,6 +142,15 @@ class Index {
Status
Build(const DataSetPtr dataset, const Json& json);

#ifdef KNOWHERE_WITH_CARDINAL
const std::unique_ptr<Interrupt>
BuildAsync(const DataSetPtr dataset, const Json& json,
const std::chrono::seconds timeout = std::chrono::seconds::max());
#else
const std::unique_ptr<Interrupt>
BuildAsync(const DataSetPtr dataset, const Json& json);
#endif

Status
Train(const DataSetPtr dataset, const Json& json);

Expand Down
12 changes: 12 additions & 0 deletions include/knowhere/index/index_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

namespace knowhere {

class Interrupt;

class IndexNode : public Object {
public:
IndexNode(const int32_t ver) : version_(ver) {
Expand Down Expand Up @@ -71,6 +73,16 @@ class IndexNode : public Object {
return Add(dataset, std::move(cfg));
}

/*
*@ @brief Builds the index using the provided dataset,configuration and handle.
*/
#ifdef KNOWHERE_WITH_CARDINAL
virtual Status
BuildAsync(const DataSetPtr dataset, std::shared_ptr<Config> cfg, const Interrupt* = nullptr) {
return Build(dataset, std::move(cfg));
}
#endif

/**
* @brief Trains the index model using the provided dataset and configuration.
*
Expand Down
60 changes: 60 additions & 0 deletions include/knowhere/index/interrupt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifndef KNOWHERE_INTERRUPT_H
#define KNOWHERE_INTERRUPT_H
#include <atomic>
#include <chrono>
#include <memory>

#include "knowhere/expected.h"

namespace folly {
template <typename T>
class Future;

}

namespace knowhere {
class Interrupt {
public:
#ifdef KNOWHERE_WITH_CARDINAL
explicit Interrupt(const std::chrono::seconds& timeout);

void
Stop();

bool
Flag() const;

bool
IsTimeout() const;
#else
Interrupt();
#endif

Status
Get();

void
Set(folly::Future<Status>&& future);

~Interrupt();

private:
#ifdef KNOWHERE_WITH_CARDINAL
std::chrono::steady_clock::time_point start;
std::chrono::seconds timeout;
mutable std::atomic_bool flag = false;
#endif
std::unique_ptr<folly::Future<Status>> future = nullptr;
};
} // namespace knowhere
#endif /* INTERRUPT_H */
36 changes: 36 additions & 0 deletions src/index/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "knowhere/index/index.h"

#include "fmt/format.h"
#include "folly/futures/Future.h"
#include "knowhere/comp/thread_pool.h"
#include "knowhere/comp/time_recorder.h"
#include "knowhere/dataset.h"
#include "knowhere/expected.h"
Expand All @@ -34,6 +36,40 @@ LoadConfig(BaseConfig* cfg, const Json& json, knowhere::PARAM_TYPE param_type, c
return Config::Load(*cfg, json_, param_type, msg);
}

#ifdef KNOWHERE_WITH_CARDINAL
template <typename T>
inline const std::unique_ptr<Interrupt>
Index<T>::BuildAsync(const DataSetPtr dataset, const Json& json, const std::chrono::seconds timeout) {
auto pool = ThreadPool::GetGlobalBuildThreadPool();
auto interrupt = std::make_unique<Interrupt>(timeout);
interrupt->Set(pool->push([this, dataset, &json, &interrupt]() {
auto cfg = this->node->CreateConfig();
RETURN_IF_ERROR(LoadConfig(cfg.get(), json, knowhere::TRAIN, "Build"));

#if defined(NOT_COMPILE_FOR_SWIG) && !defined(KNOWHERE_WITH_LIGHT)
TimeRecorder rc("BuildAsync index ", 2);
auto res = this->node->BuildAsync(dataset, std::move(cfg), interrupt.get());
auto time = rc.ElapseFromBegin("done");
time *= 0.000001; // convert to s
knowhere_build_latency.Observe(time);
#else
auto res = this->node->BuildAsync(dataset, std::move(cfg), Interrupt.get());
#endif
return res;
}));
return interrupt;
}
#else
template <typename T>
inline const std::unique_ptr<Interrupt>
Index<T>::BuildAsync(const DataSetPtr dataset, const Json& json) {
auto pool = ThreadPool::GetGlobalBuildThreadPool();
auto interrupt = std::make_unique<Interrupt>();
interrupt->Set(pool->push([this, &dataset, &json]() { return this->Build(dataset, json); }));
return interrupt;
}
#endif

template <typename T>
inline Status
Index<T>::Build(const DataSetPtr dataset, const Json& json) {
Expand Down
57 changes: 57 additions & 0 deletions src/index/interrupt.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "knowhere/index/interrupt.h"

#include "folly/futures/Future.h"
namespace knowhere {
#ifdef KNOWHERE_WITH_CARDINAL
Interrupt::Interrupt(const std::chrono::seconds& timeout) : start(std::chrono::steady_clock::now()), timeout(timeout) {
}
#else
Interrupt::Interrupt() = default;
#endif

#ifdef KNOWHERE_WITH_CARDINAL
void
Interrupt::Stop() {
this->flag.store(true);
};

bool
Interrupt::Flag() const {
return this->flag.load();
}

bool
Interrupt::IsTimeout() const {
auto now = std::chrono::steady_clock::now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - start);
return dur.count() > timeout.count();
}
#endif
Status
Interrupt::Get() {
future->wait();
if (this->Flag() || this->IsTimeout())
return Status::timeout;
return std::move(*future).get();
}

void
Interrupt::Set(folly::Future<Status>&& future) {
this->future = std::make_unique<folly::Future<Status>>(std::move(future));
}

Interrupt::~Interrupt() {
}

} // namespace knowhere

0 comments on commit 6167ead

Please sign in to comment.