Skip to content

Commit

Permalink
[shortfin] Make TimelineResource hold scopes alive.
Browse files Browse the repository at this point in the history
Prior to this, TimelineResource held a raw C++ reference to the scope but did not hold a shared_ptr ref to it. The storage class, which holds buffers alive, was keeping the raw IREE buffer and device alive but not the shortfin object hierarchy. On the happy path, this does not cause issues, but during abnormal termination and other race conditions, it can cause lifetime problems.

Here we:

* Make TimelineResource hold a std::shared_ptr<Scope>, which transitively keeps everything alive that can be accessed.
* Clean up incorrect ordering of fields in key classes, which could cause dependent objects to be destroyed out of order.
* Remove cases in iree::object_ptr which could have resulted in incorrect accounting in certain scenarios.
* Adds compile-time flags to perform verbose shortfin lifetime logging and application-side IREE reference checks.
* Change System::Shutdown() to not clear object references that still may be live (leave that for the destructor).
* Correct an issue where drivers could be retained forever, which was then masking another lifetime issue during abnormal termination.
* Find a low rate shutdown race triggering use-after-free in the BlockingExecutor and fix it (found by lifetime logging).
* Ensure that all tests are ASAN and LSAN clean, even with a new abnormal termination case added.
  • Loading branch information
stellaraccident committed Aug 27, 2024
1 parent 686d9a8 commit 037e2ab
Show file tree
Hide file tree
Showing 20 changed files with 443 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_linux_x64-libshortfin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
- name: Test libshortfin (full)
run: |
cd ${{ env.LIBSHORTFIN_DIR }}/build
cmake --build . --target test
ctest --timeout 30 --output-on-failure
cd ${{ env.LIBSHORTFIN_DIR }}
pytest -s -v -m "not requires_amd_gpu"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci_linux_x64_asan-libshortfin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ jobs:
CTEST_OUTPUT_ON_FAILURE: 1
run: |
cd ${{ env.LIBSHORTFIN_DIR }}/build
cmake --build . --target test
ctest --timeout 30 --output-on-failure
- name: Run pytest
if: ${{ !cancelled() }}
Expand Down
3 changes: 3 additions & 0 deletions libshortfin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ option(SHORTFIN_ENABLE_ASAN "Enable ASAN" OFF)
if(SHORTFIN_ENABLE_ASAN)
add_compile_options(-fsanitize=address)
add_link_options(-fsanitize=address)

# Enable more ASAN checks.
add_compile_definitions(IREE_SANITIZER_ADDRESS)
endif()

option(SHORTFIN_SYSTEMS_AMDGPU "Builds for AMD GPU systems" ON)
Expand Down
14 changes: 7 additions & 7 deletions libshortfin/bindings/python/array_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void BindArray(py::module_ &m) {
PyBufferReleaser py_view_releaser(py_view);
self.Fill(py_view.buf, py_view.len);
})
.def("copy_from", [](storage &self, storage &src) { self.CopyFrom(src); })
.def(
"map",
[](storage &self, bool read, bool write, bool discard) {
Expand Down Expand Up @@ -232,13 +233,12 @@ void BindArray(py::module_ &m) {
py::type<device_array>(), /*keep_alive=*/device.scope(),
device_array::for_host(device, shape, dtype));
})
.def_static("for_transfer",
[](device_array &existing) {
return custom_new_keep_alive<device_array>(
py::type<device_array>(),
/*keep_alive=*/existing.device().scope(),
device_array::for_transfer(existing));
})
.def("for_transfer",
[](device_array &self) {
return custom_new_keep_alive<device_array>(
py::type<device_array>(),
/*keep_alive=*/self.device().scope(), self.for_transfer());
})
.def_prop_ro("device", &device_array::device,
py::rv_policy::reference_internal)
.def_prop_ro("storage", &device_array::storage,
Expand Down
20 changes: 14 additions & 6 deletions libshortfin/examples/python/mobilenet_server/inference_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import shortfin as sf
import shortfin.array as sfnp

MAX_BATCH = 8
MAX_BATCH = 1


class InferenceRequest(sf.Message):
Expand All @@ -27,18 +27,23 @@ def __init__(self, program, request_queue, **kwargs):
self.program = program
self.request_reader = request_queue.reader()
self.device = self.scope.device(0)
self.host_staging = sfnp.host_array(
self.device, [MAX_BATCH, 3, 224, 224], sfnp.float32
)
self.device_input = sfnp.device_array(
self.device, [MAX_BATCH, 3, 224, 224], sfnp.float32
)
self.host_staging = self.device_input.for_transfer()

async def run(self):
print(f"Inference process: {self.pid}")
while request := await self.request_reader():
print(f"[{self.pid}] Got request {request}")
# self.host_staging.data = self.raw_image_data
# TODO: Should really be taking a slice and writing that. For now,
# just writing to the backing storage is the best we have API
# support for. Generally, APIs on storage should be mirrored onto
# the array.
self.host_staging.storage.data = request.raw_image_data
print(self.host_staging)
self.device_input.storage.copy_from(self.host_staging.storage)
print(self.device_input)


class Main:
Expand Down Expand Up @@ -95,7 +100,10 @@ def client():
# Dumb way to prepare some data to feed [1, 3, 224, 224] f32.
import array

dummy_data = array.array("f", [0.2] * (3 * 224 * 224))
dummy_data = array.array(
"f", ([0.2] * (224 * 224)) + ([0.4] * (224 * 224)) + ([-0.2] * (224 * 224))
)
# dummy_data = array.array("f", [0.2] * (3 * 224 * 224))
message = InferenceRequest(dummy_data)
writer(message)

Expand Down
7 changes: 3 additions & 4 deletions libshortfin/src/shortfin/array/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ class SHORTFIN_API device_array
shape, dtype);
}

// Allocates a host array for transfer to/from the given device array.
static device_array for_transfer(device_array &with_device_array) {
return for_host(with_device_array.storage().device(),
with_device_array.shape(), with_device_array.dtype());
// Allocates a host array for transfer to/from this array.
device_array for_transfer() {
return for_host(storage().device(), shape(), dtype());
}

// Untyped access to the backing data. The array must be mappable. Specific
Expand Down
47 changes: 41 additions & 6 deletions libshortfin/src/shortfin/array/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ void ThrowIllegalDeviceAffinity(Device *first, Device *second) {
}
} // namespace detail

storage::storage(local::ScopedDevice device, iree::hal_buffer_ptr buffer,
local::detail::TimelineResource::Ref timeline_resource)
: timeline_resource_(std::move(timeline_resource)),
buffer_(std::move(buffer)),
device_(device) {
logging::construct("array::storage", this);
}
storage::~storage() { logging::destruct("array::storage", this); }

storage storage::AllocateDevice(ScopedDevice &device,
iree_device_size_t allocation_size) {
if (!device.raw_device()) {
Expand Down Expand Up @@ -103,7 +112,28 @@ void storage::Fill(const void *pattern, iree_host_size_t pattern_length) {
}

void storage::CopyFrom(storage &source_storage) {
throw std::logic_error("CopyFrom NYI");
device_.scope().scheduler().AppendCommandBuffer(
device_, TransactionType::TRANSFER, [&](Account &account) {
// Must depend on the source's mutation dependencies to avoid
// read-before-write hazard.
account.active_deps_extend(
source_storage.timeline_resource_->mutation_barrier());
// And depend on our own use and mutations dependencies.
account.active_deps_extend(timeline_resource_->use_barrier());
account.active_deps_extend(timeline_resource_->mutation_barrier());

SHORTFIN_THROW_IF_ERROR(iree_hal_command_buffer_copy_buffer(
account.active_command_buffer(),
/*source_ref=*/
iree_hal_make_buffer_ref(source_storage.buffer_, 0, byte_length()),
/*target_ref=*/
iree_hal_make_buffer_ref(buffer_, 0, byte_length())));

// And move our own mutation barrier to the current pending timeline
// value.
timeline_resource_->set_mutation_barrier(
account.timeline_sem(), account.timeline_idle_timepoint());
});
}

bool storage::is_mappable_for_read() const {
Expand All @@ -127,8 +157,7 @@ void storage::MapExplicit(mapping &mapping, iree_hal_memory_access_t access) {
buffer_, IREE_HAL_MAPPING_MODE_SCOPED, access,
/*byte_offset=*/0, byte_length(), &mapping.mapping_));
mapping.access_ = access;
mapping.hal_device_ownership_baton_ =
iree::hal_device_ptr::borrow_reference(hal_device_ownership_baton_);
mapping.timeline_resource_ = timeline_resource_;
}

iree_hal_memory_type_t storage::memory_type() const {
Expand Down Expand Up @@ -169,16 +198,22 @@ std::string storage::to_s() const {
// mapping
// -------------------------------------------------------------------------- //

mapping::mapping() { std::memset(&mapping_, 0, sizeof(mapping_)); }
mapping::mapping() {
logging::construct("array::mapping", this);
std::memset(&mapping_, 0, sizeof(mapping_));
}

mapping::~mapping() noexcept { reset(); }
mapping::~mapping() noexcept {
logging::destruct("array::mapping", this);
reset();
}

void mapping::reset() noexcept {
if (*this) {
// Crash the process on failure to unmap. We don't have a good mitigation,
IREE_CHECK_OK(iree_hal_buffer_unmap_range(&mapping_));
access_ = IREE_HAL_MEMORY_ACCESS_NONE;
hal_device_ownership_baton_.reset();
timeline_resource_.reset();
}
}

Expand Down
33 changes: 12 additions & 21 deletions libshortfin/src/shortfin/array/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ class SHORTFIN_API mapping {
mapping(const mapping &) = delete;
mapping &operator=(const mapping &) = delete;
mapping &operator=(mapping &&other) {
timeline_resource_ = std::move(other.timeline_resource_);
access_ = other.access_;
mapping_ = other.mapping_;
hal_device_ownership_baton_ = std::move(other.hal_device_ownership_baton_);
other.access_ = IREE_HAL_MEMORY_ACCESS_NONE;
std::memset(&other.mapping_, 0, sizeof(other.mapping_));
return *this;
}
mapping(mapping &&other)
: access_(other.access_),
mapping_(other.mapping_),
hal_device_ownership_baton_(
std::move(other.hal_device_ownership_baton_)) {
: timeline_resource_(std::move(other.timeline_resource_)),
access_(other.access_),
mapping_(other.mapping_) {
other.access_ = IREE_HAL_MEMORY_ACCESS_NONE;
std::memset(&other.mapping_, 0, sizeof(other.mapping_));
}
Expand Down Expand Up @@ -63,15 +62,17 @@ class SHORTFIN_API mapping {
bool writable() const { return access_ & IREE_HAL_MEMORY_ACCESS_WRITE; }

private:
// See note on storage::timeline_resource_. Must be declared first.
local::detail::TimelineResource::Ref timeline_resource_;
iree_hal_memory_access_t access_ = IREE_HAL_MEMORY_ACCESS_NONE;
iree_hal_buffer_mapping_t mapping_;
iree::hal_device_ptr hal_device_ownership_baton_;
friend class storage;
};

// Array storage backed by an IREE buffer of some form.
class SHORTFIN_API storage {
public:
~storage();
local::ScopedDevice &device() { return device_; }
local::Scope &scope() { return device_.scope(); }
const local::ScopedDevice &device() const { return device_; }
Expand Down Expand Up @@ -162,23 +163,13 @@ class SHORTFIN_API storage {

private:
storage(local::ScopedDevice device, iree::hal_buffer_ptr buffer,
local::detail::TimelineResource::Ref timeline_resource)
: hal_device_ownership_baton_(iree::hal_device_ptr::borrow_reference(
device.raw_device()->hal_device())),
buffer_(std::move(buffer)),
device_(device),
timeline_resource_(std::move(timeline_resource)) {}
// TODO(ownership): Since storage is a free-standing object in the system,
// it needs an ownership baton that keeps the device/driver alive.
// Otherwise, it can outlive the backing device and then then crashes on
// buffer deallocation. For now, we stash an RAII hal_device_ptr, which
// keeps everything alive. This isn't quite what we want but keeps us going
// for now. When fixing, add a test that creates an array, destroys the
// System, and then frees the array.
iree::hal_device_ptr hal_device_ownership_baton_;
local::detail::TimelineResource::Ref timeline_resource);
// The timeline resource holds the back reference to the owning scope,
// which keeps all devices alive. Buffers must be destroyed before devices,
// so this must be declared first.
local::detail::TimelineResource::Ref timeline_resource_;
iree::hal_buffer_ptr buffer_;
local::ScopedDevice device_;
local::detail::TimelineResource::Ref timeline_resource_;
};

// Wraps an untyped mapping, providing typed access.
Expand Down
31 changes: 27 additions & 4 deletions libshortfin/src/shortfin/local/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void Account::Initialize() {

void Account::Reset() {
active_tx_type_ = TransactionType::NONE;
// if (active_command_buffer_) {
// iree_hal_command_buffer_end(active_command_buffer_);
// }
active_command_buffer_.reset();
}

Expand Down Expand Up @@ -67,10 +70,17 @@ CompletionEvent Account::OnSync() {
// TimelineResource
// -------------------------------------------------------------------------- //

TimelineResource::TimelineResource(iree_allocator_t host_allocator,
size_t semaphore_capacity) {
SHORTFIN_THROW_IF_ERROR(iree_hal_fence_create(
semaphore_capacity, host_allocator, use_barrier_fence_.for_output()));
TimelineResource::TimelineResource(std::shared_ptr<Scope> scope,
size_t semaphore_capacity)
: scope_(std::move(scope)) {
logging::construct("TimelineResource", this);
SHORTFIN_THROW_IF_ERROR(
iree_hal_fence_create(semaphore_capacity, scope_->host_allocator(),
use_barrier_fence_.for_output()));
}

TimelineResource::~TimelineResource() {
logging::destruct("TimelineResource", this);
}

void TimelineResource::use_barrier_insert(iree_hal_semaphore_t *sem,
Expand All @@ -83,6 +93,19 @@ void TimelineResource::use_barrier_insert(iree_hal_semaphore_t *sem,
// Scheduler
// -------------------------------------------------------------------------- //

Scheduler::Scheduler(System &system) : system_(system) {
logging::construct("Scheduler", this);
}

Scheduler::~Scheduler() {
logging::destruct("Scheduler", this);

// Explicitly reset account state prior to implicit destruction.
for (auto &account : accounts_) {
account.Reset();
}
}

void Scheduler::Initialize(std::span<Device *const> devices) {
for (Device *device : devices) {
accounts_.emplace_back(*this, device);
Expand Down
Loading

0 comments on commit 037e2ab

Please sign in to comment.