Skip to content

Commit

Permalink
[core] Stop waiting for dependencies when cancelling tasks (ray-proje…
Browse files Browse the repository at this point in the history
…ct#48661)

Signed-off-by: dayshah <[email protected]>
  • Loading branch information
dayshah authored Nov 15, 2024
1 parent a1d4cb1 commit 4dc514e
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 8 deletions.
20 changes: 20 additions & 0 deletions python/ray/tests/test_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,26 @@ def verify():
wait_for_condition(verify)


@pytest.mark.parametrize("use_force", [True, False])
def test_cancel_with_dependency(shutdown_only, use_force):
ray.init(num_cpus=4)

@ray.remote(num_cpus=1)
def wait_forever_task():
while True:
time.sleep(1000)

@ray.remote(num_cpus=1)
def square(x):
return x * x

wait_forever_obj = wait_forever_task.remote()
wait_forever_as_dep = square.remote(wait_forever_obj)
ray.cancel(wait_forever_as_dep, force=use_force)
with pytest.raises(valid_exceptions(use_force)):
ray.get(wait_forever_as_dep)


@pytest.mark.skip("Actor cancelation works now.")
def test_recursive_cancel_error_messages(shutdown_only, capsys):
"""
Expand Down
1 change: 1 addition & 0 deletions src/mock/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
MarkTaskWaitingForExecution,
(const TaskID &task_id, const NodeID &node_id, const WorkerID &worker_id),
(override));
MOCK_METHOD(bool, IsTaskPending, (const TaskID &task_id), (const, override));
};

} // namespace core
Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class TaskFinisherInterface {

virtual absl::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const = 0;

virtual bool IsTaskPending(const TaskID &task_id) const = 0;

virtual ~TaskFinisherInterface() = default;
};

Expand Down Expand Up @@ -551,7 +553,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
///
/// \param[in] task_id ID of the task to query.
/// \return Whether the task is pending.
bool IsTaskPending(const TaskID &task_id) const;
bool IsTaskPending(const TaskID &task_id) const override;

/// Return whether the task is scheduled adn waiting for execution.
///
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/test/dependency_resolver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class MockTaskFinisher : public TaskFinisherInterface {
const NodeID &node_id,
const WorkerID &worker_id) override {}

bool IsTaskPending(const TaskID &task_id) const override { return true; }

int num_tasks_complete = 0;
int num_tasks_failed = 0;
int num_inlined_dependencies = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/test/normal_task_submitter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ class MockTaskFinisher : public TaskFinisherInterface {
const NodeID &node_id,
const WorkerID &worker_id) override {}

bool IsTaskPending(const TaskID &task_id) const override { return true; }

int num_tasks_complete = 0;
int num_tasks_failed = 0;
int num_inlined_dependencies = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,8 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv

// Shouldn't hold a lock while accessing task_finisher_.
// Task is already canceled or finished.
if (!GetTaskFinisherWithoutMu().MarkTaskCanceled(task_id)) {
if (!GetTaskFinisherWithoutMu().MarkTaskCanceled(task_id) ||
!GetTaskFinisherWithoutMu().IsTaskPending(task_id)) {
RAY_LOG(DEBUG).WithField(task_id) << "Task is already finished or canceled";
return Status::OK();
}
Expand Down
11 changes: 5 additions & 6 deletions src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RequestNewWorkerIfNeeded(scheduling_key);
}
}
if (!keep_executing) {
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr));
}
});
return Status::OK();
}
Expand Down Expand Up @@ -709,7 +705,8 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec,
{
absl::MutexLock lock(&mu_);
if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end() ||
!task_finisher_->MarkTaskCanceled(task_spec.TaskId())) {
!task_finisher_->MarkTaskCanceled(task_spec.TaskId()) ||
!task_finisher_->IsTaskPending(task_spec.TaskId())) {
return Status::OK();
}

Expand All @@ -736,7 +733,9 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec,

if (rpc_client == executing_tasks_.end()) {
// This case is reached for tasks that have unresolved dependencies.
// No executing tasks, so cancelling is a noop.
resolver_.CancelDependencyResolution(task_spec.TaskId());
RAY_UNUSED(task_finisher_->FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED));
if (scheduling_key_entry.CanDelete()) {
// We can safely remove the entry keyed by scheduling_key from the
// scheduling_key_entries_ hashmap.
Expand Down

0 comments on commit 4dc514e

Please sign in to comment.