Skip to content

Commit

Permalink
Improve the implementation of migrating an object with lots of blobs (#…
Browse files Browse the repository at this point in the history
…1914)

Fixes #1905

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Jun 19, 2024
1 parent 3ddde84 commit 76d4829
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 110 deletions.
4 changes: 2 additions & 2 deletions src/client/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Status recv_bytes(int fd, void* data, size_t length) {
char* ptr = static_cast<char*>(data);

struct timeval timeout;
timeout.tv_sec = 60;
timeout.tv_sec = 300;
timeout.tv_usec = 0;

fd_set readfds;
Expand All @@ -214,7 +214,7 @@ Status recv_bytes(int fd, void* data, size_t length) {
if (errno == EINTR) {
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
timeout.tv_sec = 60;
timeout.tv_sec = 300;
timeout.tv_usec = 0;
continue;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
auto callback = [self, this, compress,
object](const Status& status) -> Status {
ReceiveRemoteBuffers(
socket_, {object}, 0, 0, compress,
socket_, {object}, compress,
[self, object](const Status& status) -> Status {
std::string message_out;
if (status.ok()) {
Expand Down Expand Up @@ -725,7 +725,7 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
auto callback = [self, this, compress, object_ids,
objects](const Status& status) -> Status {
ReceiveRemoteBuffers(
socket_, objects, 0, 0, compress,
socket_, objects, compress,
[self, object_ids, objects](const Status& status) -> Status {
std::string message_out;
if (status.ok()) {
Expand Down
39 changes: 35 additions & 4 deletions src/server/server/vineyard_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ limitations under the License.

#include "server/server/vineyard_server.h"

#include <future>
#include <iostream>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "gulrak/filesystem.hpp"
Expand Down Expand Up @@ -1034,20 +1036,49 @@ Status VineyardServer::MigrateObject(const ObjectID object_id,

std::string remote_endpoint =
(*instance)["rpc_endpoint"].get_ref<std::string const&>();

// check if the object is already migrated
// also ensure the migration is atomic with the same object_id
std::lock_guard<std::mutex> lock(self->migration_mutex_);
auto it = self->migrations_.find(object_id);
if (it != self->migrations_.end()) {
auto& shared_future = it->second;
auto migration_result = shared_future.get();
if (migration_result.first.ok()) {
return callback(migration_result.first, migration_result.second);
} else {
self->migrations_.erase(object_id);
}
}

auto promise_ptr =
std::make_shared<std::promise<std::pair<Status, ObjectID>>>();
std::shared_future<std::pair<Status, ObjectID>> shared_future =
promise_ptr->get_future().share();
self->migrations_[object_id] = shared_future;

// push to the async queues
boost::asio::post(
self->GetIOContext(),
[self, callback, remote_endpoint, object_id, metadata]() {
self->GetIOContext(), [self, callback, remote_endpoint, object_id,
metadata, promise_ptr]() mutable {
auto remote = std::make_shared<RemoteClient>(self);
RETURN_ON_ERROR(
remote->Connect(remote_endpoint, self->session_id()));
return remote->MigrateObject(
object_id, metadata,
[self, remote, callback](const Status& status,
const ObjectID result) {
[self, remote, callback, promise_ptr](
const Status& status, const ObjectID result) {
if (status.ok()) {
promise_ptr->set_value(
std::make_pair(Status::OK(), result));
} else {
promise_ptr->set_value(
std::make_pair(status, InvalidObjectID()));
}
return callback(status, result);
});
});

return Status::OK();
} else {
VLOG(100) << "Error: " << status.ToString();
Expand Down
8 changes: 8 additions & 0 deletions src/server/server/vineyard_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ limitations under the License.
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/util/asio.h" // IWYU pragma: keep
Expand Down Expand Up @@ -273,6 +276,11 @@ class VineyardServer : public std::enable_shared_from_this<VineyardServer> {
std::string instance_name_;
std::string hostname_;
std::string nodename_;

std::mutex migration_mutex_;
// Record the migration status of objects to avoid duplicated migration.
std::unordered_map<ObjectID, std::shared_future<std::pair<Status, ObjectID>>>
migrations_;
};

} // namespace vineyard
Expand Down
Loading

0 comments on commit 76d4829

Please sign in to comment.