Skip to content

Commit

Permalink
[core] Minor cpp changes around core worker (ray-project#48262)
Browse files Browse the repository at this point in the history
Signed-off-by: dayshah <[email protected]>
  • Loading branch information
dayshah authored Nov 9, 2024
1 parent ba22eaa commit e2dfcc8
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 84 deletions.
4 changes: 2 additions & 2 deletions doc/source/ray-contribute/debugging.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Debugging for Ray Developers
============================

This debugging guide is for contributors to the Ray project.
This debugging guide is for contributors to the Ray project.

Starting processes in a debugger
--------------------------------
Expand Down Expand Up @@ -63,7 +63,7 @@ If it worked, you should see as the first line in ``raylet.err``:

.. literalinclude:: /../../src/ray/util/logging.h
:language: C
:lines: 52,54
:lines: 113,120

Backend event stats
-------------------
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn
PushActorTask,
(std::unique_ptr<PushTaskRequest> request,
bool skip_queue,
const ClientCallback<PushTaskReply> &callback),
ClientCallback<PushTaskReply> &&callback),
(override));
MOCK_METHOD(void,
PushNormalTask,
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4037,7 +4037,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id)
<< "Cancel an actor task";
CancelActorTaskOnExecutor(
caller_worker_id, task_id, force_kill, recursive, on_cancel_callback);
caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback));
} else {
RAY_CHECK(current_actor_id.IsNil());
RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task";
Expand All @@ -4048,7 +4048,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
void CoreWorker::CancelTaskOnExecutor(TaskID task_id,
bool force_kill,
bool recursive,
OnCanceledCallback on_canceled) {
const OnCanceledCallback &on_canceled) {
bool requested_task_running;
{
absl::MutexLock lock(&mutex_);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void CancelTaskOnExecutor(TaskID intended_task_id,
bool force_kill,
bool recursive,
OnCanceledCallback on_canceled);
const OnCanceledCallback &on_canceled);

/// Cancel an actor task queued or running in the current worker.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ bool GetRequest::Wait(int64_t timeout_ms) {
auto remaining_timeout_ms = timeout_ms;
auto timeout_timestamp = current_time_ms() + timeout_ms;
while (!is_ready_) {
// TODO (dayshah): see if using cv condition function instead of busy while helps.
auto status = cv_.wait_for(lock, std::chrono::milliseconds(remaining_timeout_ms));
auto current_timestamp = current_time_ms();
remaining_timeout_ms =
Expand Down
38 changes: 19 additions & 19 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/constants.h"
#include "ray/core_worker/common.h"
#include "ray/gcs/pb_util.h"
#include "ray/util/exponential_backoff.h"
#include "ray/util/util.h"
Expand All @@ -26,10 +24,10 @@ namespace ray {
namespace core {

// Start throttling task failure logs once we hit this threshold.
const int64_t kTaskFailureThrottlingThreshold = 50;
constexpr int64_t kTaskFailureThrottlingThreshold = 50;

// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;
constexpr int64_t kTaskFailureLoggingFrequencyMillis = 5000;

absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
absl::flat_hash_set<ObjectID> result;
Expand Down Expand Up @@ -237,7 +235,9 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// Add new owned objects for the return values of the task.
size_t num_returns = spec.NumReturns();
std::vector<rpc::ObjectReference> returned_refs;
returned_refs.reserve(num_returns);
std::vector<ObjectID> return_ids;
return_ids.reserve(num_returns);
for (size_t i = 0; i < num_returns; i++) {
auto return_id = spec.ReturnId(i);
if (!spec.IsActorCreationTask()) {
Expand All @@ -252,7 +252,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// language frontend. Note that the language bindings should set
// skip_adding_local_ref=True to avoid double referencing the object.
reference_counter_->AddOwnedObject(return_id,
/*inner_ids=*/{},
/*contained_ids=*/{},
caller_address,
call_site,
-1,
Expand Down Expand Up @@ -398,7 +398,7 @@ void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {

bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
return submissible_tasks_.count(task_id) > 0;
return submissible_tasks_.contains(task_id);
}

bool TaskManager::IsTaskPending(const TaskID &task_id) const {
Expand Down Expand Up @@ -707,7 +707,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
/*store_in_plasma*/ store_in_plasma_ids.count(object_id));
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
}

// Handle backpressure if needed.
Expand Down Expand Up @@ -807,7 +807,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
if (HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.raylet_id()),
store_in_plasma_ids.count(object_id))) {
store_in_plasma_ids.contains(object_id))) {
direct_return_ids.push_back(object_id);
}
}
Expand Down Expand Up @@ -933,7 +933,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
HandleTaskReturn(generator_return_id,
return_object,
NodeID::FromBinary(worker_addr.raylet_id()),
store_in_plasma_ids.count(generator_return_id));
store_in_plasma_ids.contains(generator_return_id));
}
}
}
Expand Down Expand Up @@ -1047,18 +1047,18 @@ void TaskManager::FailPendingTask(const TaskID &task_id,
<< "Tried to fail task that was not pending " << task_id;
spec = it->second.spec;

if (status && status->IsIntentionalSystemExit()) {
if ((status != nullptr) && status->IsIntentionalSystemExit()) {
// We don't mark intentional system exit as failures, such as tasks that
// exit by exit_actor(), exit by ray.shutdown(), etc. These tasks are expected
// to exit and not be marked as failure.
SetTaskStatus(it->second, rpc::TaskStatus::FINISHED);
} else {
SetTaskStatus(
it->second,
rpc::TaskStatus::FAILED,
(ray_error_info == nullptr
? gcs::GetRayErrorInfo(error_type, (status ? status->ToString() : ""))
: *ray_error_info));
SetTaskStatus(it->second,
rpc::TaskStatus::FAILED,
(ray_error_info == nullptr
? gcs::GetRayErrorInfo(
error_type, (status != nullptr ? status->ToString() : ""))
: *ray_error_info));
}
submissible_tasks_.erase(it);
num_pending_tasks_--;
Expand Down Expand Up @@ -1308,15 +1308,15 @@ void TaskManager::MarkTaskReturnObjectsFailed(
int64_t num_returns = spec.NumReturns();
for (int i = 0; i < num_returns; i++) {
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
if (store_in_plasma_ids.count(object_id)) {
if (store_in_plasma_ids.contains(object_id)) {
put_in_local_plasma_callback_(error, object_id);
} else {
in_memory_store_->Put(error, object_id);
}
}
if (spec.ReturnsDynamic()) {
for (const auto &dynamic_return_id : spec.DynamicReturnIds()) {
if (store_in_plasma_ids.count(dynamic_return_id)) {
if (store_in_plasma_ids.contains(dynamic_return_id)) {
put_in_local_plasma_callback_(error, dynamic_return_id);
} else {
in_memory_store_->Put(error, dynamic_return_id);
Expand All @@ -1341,7 +1341,7 @@ void TaskManager::MarkTaskReturnObjectsFailed(
auto num_streaming_generator_returns = spec.NumStreamingGeneratorReturns();
for (size_t i = 0; i < num_streaming_generator_returns; i++) {
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
if (store_in_plasma_ids.count(generator_return_id)) {
if (store_in_plasma_ids.contains(generator_return_id)) {
put_in_local_plasma_callback_(error, generator_return_id);
} else {
in_memory_store_->Put(error, generator_return_id);
Expand Down
14 changes: 7 additions & 7 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ class TaskFinisherInterface {

virtual absl::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const = 0;

virtual ~TaskFinisherInterface() {}
virtual ~TaskFinisherInterface() = default;
};

class TaskResubmissionInterface {
public:
virtual bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) = 0;

virtual ~TaskResubmissionInterface() {}
virtual ~TaskResubmissionInterface() = default;
};

using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
Expand Down Expand Up @@ -222,7 +222,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
max_lineage_bytes_(max_lineage_bytes),
task_event_buffer_(task_event_buffer) {
task_counter_.SetOnChangeCallback(
[this](const std::tuple<std::string, rpc::TaskStatus, bool> key)
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
ray::stats::STATS_tasks.Record(
task_counter_.Get(key),
Expand Down Expand Up @@ -621,13 +621,13 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
num_retries_left(num_retries_left_arg),
counter(counter),
num_oom_retries_left(num_oom_retries_left) {
reconstructable_return_ids.reserve(num_returns);
for (size_t i = 0; i < num_returns; i++) {
reconstructable_return_ids.insert(spec.ReturnId(i));
}
auto new_status =
status =
std::make_tuple(spec.GetName(), rpc::TaskStatus::PENDING_ARGS_AVAIL, false);
counter.Increment(new_status);
status = new_status;
counter.Increment(status);
}

void SetStatus(rpc::TaskStatus new_status) {
Expand All @@ -640,7 +640,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
// for FINISHED and FAILED tasks.
counter.Increment(new_tuple);
}
status = new_tuple;
status = std::move(new_tuple);
}

void MarkRetry() { is_retry_ = true; }
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {

void PushActorTask(std::unique_ptr<rpc::PushTaskRequest> request,
bool skip_queue,
const rpc::ClientCallback<rpc::PushTaskReply> &callback) override {
rpc::ClientCallback<rpc::PushTaskReply> &&callback) override {
received_seq_nos.push_back(request->sequence_number());
callbacks.push_back(callback);
}
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/test/normal_task_submitter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class MockTaskFinisher : public TaskFinisherInterface {
const Status *status,
const rpc::RayErrorInfo *ray_error_info = nullptr) override {
num_fail_pending_task_calls++;
num_tasks_failed++;
}

bool FailOrRetryPendingTask(const TaskID &task_id,
Expand Down Expand Up @@ -2093,7 +2094,7 @@ TEST(NormalTaskSubmitterTest, TestKillPendingTask) {
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
ASSERT_EQ(task_finisher->num_tasks_complete, 0);
ASSERT_EQ(task_finisher->num_tasks_failed, 0);
ASSERT_EQ(task_finisher->num_tasks_failed, 1);
ASSERT_EQ(task_finisher->num_fail_pending_task_calls, 1);
ASSERT_EQ(raylet_client->num_leases_canceled, 1);
ASSERT_TRUE(raylet_client->ReplyCancelWorkerLease());
Expand Down
19 changes: 7 additions & 12 deletions src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@

#include "ray/core_worker/transport/actor_task_submitter.h"

#include <thread>

#include "ray/common/task/task.h"
#include "ray/gcs/pb_util.h"

using ray::rpc::ActorTableData;
using namespace ray::gcs;

namespace ray {
namespace core {

Expand Down Expand Up @@ -235,7 +229,7 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
absl::MutexLock lock(&mu_);
const auto queue_it = client_queues_.find(task_spec.ActorId());
const auto &death_cause = queue_it->second.death_cause;
error_info = GetErrorInfoFromActorDeathCause(death_cause);
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
error_type = error_info.error_type();
}
auto status = Status::IOError("cancelling task of dead actor");
Expand Down Expand Up @@ -366,7 +360,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
const rpc::ActorDeathCause &death_cause,
bool is_restartable) {
RAY_LOG(DEBUG).WithField(actor_id) << "Disconnecting from actor, death context type="
<< GetActorDeathCauseString(death_cause);
<< gcs::GetActorDeathCauseString(death_cause);

absl::flat_hash_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
Expand Down Expand Up @@ -432,7 +426,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
// Failing tasks has to be done without mu_ hold because the callback
// might require holding mu_ which will lead to a deadlock.
auto status = Status::IOError("cancelling all pending tasks of dead actor");
const auto error_info = GetErrorInfoFromActorDeathCause(death_cause);
const auto error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
const auto error_type = error_info.error_type();

for (auto &task_id : task_ids_to_fail) {
Expand Down Expand Up @@ -639,7 +633,8 @@ void ActorTaskSubmitter::PushActorTask(ClientQueue &queue,
task_finisher_.MarkTaskWaitingForExecution(task_id,
NodeID::FromBinary(addr.raylet_id()),
WorkerID::FromBinary(addr.worker_id()));
queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback);
queue.rpc_client->PushActorTask(
std::move(request), skip_queue, std::move(wrapped_callback));
}

void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
Expand Down Expand Up @@ -704,7 +699,7 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
is_actor_dead = queue.state == rpc::ActorTableData::DEAD;
if (is_actor_dead) {
const auto &death_cause = queue.death_cause;
error_info = GetErrorInfoFromActorDeathCause(death_cause);
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
fail_immediately = error_info.has_actor_died_error() &&
error_info.actor_died_error().has_oom_context() &&
error_info.actor_died_error().oom_context().fail_immediately();
Expand Down Expand Up @@ -953,7 +948,7 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv
request.set_recursive(recursive);
request.set_caller_worker_id(task_spec.CallerWorkerId().Binary());
client->CancelTask(request,
[this, task_spec, recursive, task_id](
[this, task_spec = std::move(task_spec), recursive, task_id](
const Status &status, const rpc::CancelTaskReply &reply) {
RAY_LOG(DEBUG).WithField(task_spec.TaskId())
<< "CancelTask RPC response received with status "
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/transport/dependency_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ void LocalDependencyResolver::ResolveDependencies(
// This is deleted when the last dependency fetch callback finishes.
auto inserted = pending_tasks_.emplace(
task_id,
std::make_unique<TaskState>(
task, local_dependency_ids, actor_dependency_ids, on_dependencies_resolved));
std::make_unique<TaskState>(task,
local_dependency_ids,
actor_dependency_ids,
std::move(on_dependencies_resolved)));
RAY_CHECK(inserted.second);
}

Expand Down Expand Up @@ -137,7 +139,7 @@ void LocalDependencyResolver::ResolveDependencies(

for (const auto &actor_id : actor_dependency_ids) {
actor_creator_.AsyncWaitForActorRegisterFinish(
actor_id, [this, task_id, on_dependencies_resolved](const Status &status) {
actor_id, [this, task_id](const Status &status) {
std::unique_ptr<TaskState> resolved_task_state = nullptr;

{
Expand Down
Loading

0 comments on commit e2dfcc8

Please sign in to comment.