diff --git a/python/client.cc b/python/client.cc index c05812b4..a5d99b5c 100644 --- a/python/client.cc +++ b/python/client.cc @@ -625,7 +625,8 @@ void bind_client(py::module& mod) { "target"_a) .def("is_shared_memory", [](Client* self, const uintptr_t target) -> bool { - return self->IsSharedMemory(target); + ObjectID object_id = InvalidObjectID(); + return self->IsSharedMemory(target, object_id); }) .def("find_shared_memory", [](Client* self, const uintptr_t target) -> py::object { diff --git a/python/vineyard/core/resolver.py b/python/vineyard/core/resolver.py index 242792d5..035633da 100644 --- a/python/vineyard/core/resolver.py +++ b/python/vineyard/core/resolver.py @@ -55,11 +55,11 @@ def run(self, obj, **kw): value = resolver(obj, resolver=self) else: value = resolver(obj) - except Exception: + except Exception as e: raise RuntimeError( # pylint: disable=raise-missing-from 'Unable to construct the object using resolver: ' 'typename is %s, resolver is %s' % (obj.meta.typename, resolver) - ) + ) from e if value is None: # if the obj has been resolved by pybind types, and there's no proper # resolver, it shouldn't be an error diff --git a/python/vineyard/data/tensor.py b/python/vineyard/data/tensor.py index 73f5f464..b0dbfd51 100644 --- a/python/vineyard/data/tensor.py +++ b/python/vineyard/data/tensor.py @@ -68,12 +68,12 @@ def numpy_ndarray_builder(client, value, **kw): def numpy_ndarray_resolver(obj): meta = obj.meta - value_name = meta['value_type_'] - if value_name == 'object': + value_type_name = meta['value_type_'] + if value_type_name == 'object': view = memoryview(obj.member('buffer_')) return pickle.loads(view, fix_imports=True) - value_type = normalize_dtype(value_name, meta.get('value_type_meta_', None)) + value_type = normalize_dtype(value_type_name, meta.get('value_type_meta_', None)) shape = from_json(meta['shape_']) if 'order_' in meta: order = from_json(meta['order_']) @@ -81,9 +81,10 @@ def numpy_ndarray_resolver(obj): order = 'C' if np.prod(shape) == 0: return np.zeros(shape, dtype=value_type) - c_array = np.frombuffer( - memoryview(obj.member('buffer_')), dtype=value_type - ).reshape(shape) + mem = memoryview(obj.member('buffer_'))[ + 0 : int(np.prod(shape)) * np.dtype(value_type).itemsize + ] + c_array = np.frombuffer(mem, dtype=value_type).reshape(shape) # TODO: revise the memory copy of asfortranarray array = c_array if order == 'C' else np.asfortranarray(c_array) return array.view(ndarray) diff --git a/src/client/client.cc b/src/client/client.cc index 3916d0c8..45ef1fd4 100644 --- a/src/client/client.cc +++ b/src/client/client.cc @@ -494,19 +494,27 @@ std::vector> Client::ListObjects( } bool Client::IsSharedMemory(const void* target) const { - return shm_->Exists(target); + ObjectID object_id = InvalidObjectID(); + return IsSharedMemory(target, object_id); } bool Client::IsSharedMemory(const uintptr_t target) const { - return shm_->Exists(target); + ObjectID object_id = InvalidObjectID(); + return IsSharedMemory(target, object_id); } bool Client::IsSharedMemory(const void* target, ObjectID& object_id) const { - return shm_->Exists(target, object_id); + return IsSharedMemory(reinterpret_cast(target), object_id); } bool Client::IsSharedMemory(const uintptr_t target, ObjectID& object_id) const { - return shm_->Exists(target, object_id); + if (shm_->Exists(target, object_id)) { + // verify that the blob is not deleted on the server side + json tree; + Client* mutable_this = const_cast(this); + return mutable_this->GetData(object_id, tree, false, false).ok(); + } + return false; } Status Client::AllocatedSize(const ObjectID id, size_t& size) { @@ -1346,8 +1354,9 @@ Status SharedMemoryManager::Mmap(int fd, ObjectID id, int64_t map_size, uint8_t* pointer, bool readonly, bool realign, uint8_t** ptr) { RETURN_ON_ERROR(this->Mmap(fd, map_size, pointer, readonly, realign, ptr)); - segments_.emplace(reinterpret_cast(*ptr) + data_offset, - std::make_pair(data_size, id)); + // override deleted blobs + segments_[reinterpret_cast(*ptr) + data_offset] = + std::make_pair(data_size, id); return Status::OK(); } @@ -1366,12 +1375,12 @@ void SharedMemoryManager::PreMmap(int fd, std::vector& fds, } bool SharedMemoryManager::Exists(const uintptr_t target) { - ObjectID id; + ObjectID id = InvalidObjectID(); return Exists(target, id); } bool SharedMemoryManager::Exists(const void* target) { - ObjectID id; + ObjectID id = InvalidObjectID(); return Exists(target, id); } @@ -1386,7 +1395,8 @@ bool SharedMemoryManager::Exists(const uintptr_t target, ObjectID& object_id) { std::clog << "[trace] pointer that been queried: " << reinterpret_cast(target) << std::endl; for (auto const& item : segments_) { - std::clog << "[trace] [" << reinterpret_cast(item.first) << ", " + std::clog << "[trace] " << ObjectIDToString(item.second.second) << ": [" + << reinterpret_cast(item.first) << ", " << reinterpret_cast(item.first + item.second.first) << ")" << std::endl; } @@ -1420,7 +1430,11 @@ ObjectID SharedMemoryManager::resolveObjectID(const uintptr_t target, const uintptr_t key, const uintptr_t data_size, const ObjectID object_id) { - if (key <= target && target < key + data_size) { + // With a more strict constraint: the target pointer must be starts froms the + // given blob (key), as blob slicing is not supported yet. + // + // if (key <= target && target < key + data_size) { + if (key == target) { #if defined(WITH_VERBOSE) std::clog << "[trace] resuing blob " << ObjectIDToString(object_id) << " for pointer " << reinterpret_cast(target) diff --git a/src/client/client.h b/src/client/client.h index 37e09123..cc43be4f 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -107,9 +107,11 @@ class SharedMemoryManager { // compute the set of fds that needs to `recv` from the server void PreMmap(int fd, std::vector& fds, std::set& dedup); - bool Exists(const uintptr_t target); + bool Exists(const uintptr_t target) + __attribute__((deprecated("Use Exists(target, object_id) instead."))); - bool Exists(const void* target); + bool Exists(const void* target) + __attribute__((deprecated("Use Exists(target, object_id) instead."))); bool Exists(const uintptr_t target, ObjectID& object_id); @@ -638,7 +640,8 @@ class Client final : public BasicIPCClient, * Return true if the address (client-side address) comes from the vineyard * server. */ - bool IsSharedMemory(const void* target) const; + bool IsSharedMemory(const void* target) const __attribute__(( + deprecated("Use IsSharedMemory(target, object_id) instead."))); /** * Check if the given address belongs to the shared memory region. @@ -648,7 +651,8 @@ class Client final : public BasicIPCClient, * Return true if the address (client-side address) comes from the vineyard * server. */ - bool IsSharedMemory(const uintptr_t target) const; + bool IsSharedMemory(const uintptr_t target) const __attribute__(( + deprecated("Use IsSharedMemory(target, object_id) instead."))); /** * Check if the given address belongs to the shared memory region. diff --git a/src/client/client_base.cc b/src/client/client_base.cc index 40260b05..5f7bedd7 100644 --- a/src/client/client_base.cc +++ b/src/client/client_base.cc @@ -41,8 +41,9 @@ Status ClientBase::GetData(const ObjectID id, json& tree, RETURN_ON_ERROR(doWrite(message_out)); json message_in; RETURN_ON_ERROR(doRead(message_in)); - RETURN_ON_ERROR(ReadGetDataReply(message_in, tree)); - return Status::OK(); + auto status = ReadGetDataReply(message_in, tree); + return Status::Wrap( + status, "failed to get metadata for '" + ObjectIDToString(id) + "'"); } Status ClientBase::GetData(const std::vector& ids, diff --git a/src/common/util/status.h b/src/common/util/status.h index a3f8d317..79aa95a3 100644 --- a/src/common/util/status.h +++ b/src/common/util/status.h @@ -321,6 +321,14 @@ class VINEYARD_MUST_USE_TYPE Status { /// Return a success status inline static Status OK() { return Status(); } + /// Wrap a status with customized extra message + inline static Status Wrap(const Status& s, const std::string& message) { + if (s.ok()) { + return s; + } + return Status(s.code(), message + ": " + s.message()); + } + /// Return an error status for invalid data (for example a string that /// fails parsing). static Status Invalid() { return Status(StatusCode::kInvalid, ""); }