From e2dfcc853f1c50d3ac22023db368d990a5dedc22 Mon Sep 17 00:00:00 2001 From: Dhyey Shah Date: Fri, 8 Nov 2024 19:10:20 -0500 Subject: [PATCH] [core] Minor cpp changes around core worker (#48262) Signed-off-by: dayshah --- doc/source/ray-contribute/debugging.rst | 4 +- src/mock/ray/rpc/worker/core_worker_client.h | 2 +- src/ray/core_worker/core_worker.cc | 4 +- src/ray/core_worker/core_worker.h | 2 +- .../memory_store/memory_store.cc | 1 + src/ray/core_worker/task_manager.cc | 38 +++++++++---------- src/ray/core_worker/task_manager.h | 14 +++---- .../test/direct_actor_transport_test.cc | 2 +- .../test/normal_task_submitter_test.cc | 3 +- .../transport/actor_task_submitter.cc | 19 ++++------ .../transport/dependency_resolver.cc | 8 ++-- .../transport/normal_task_submitter.cc | 38 +++++++++---------- .../transport/normal_task_submitter.h | 11 +++--- src/ray/rpc/worker/core_worker_client.h | 14 +++---- 14 files changed, 76 insertions(+), 84 deletions(-) diff --git a/doc/source/ray-contribute/debugging.rst b/doc/source/ray-contribute/debugging.rst index 66a602e891b0..5b31261ba34f 100644 --- a/doc/source/ray-contribute/debugging.rst +++ b/doc/source/ray-contribute/debugging.rst @@ -1,7 +1,7 @@ Debugging for Ray Developers ============================ -This debugging guide is for contributors to the Ray project. +This debugging guide is for contributors to the Ray project. Starting processes in a debugger -------------------------------- @@ -63,7 +63,7 @@ If it worked, you should see as the first line in ``raylet.err``: .. literalinclude:: /../../src/ray/util/logging.h :language: C - :lines: 52,54 + :lines: 113,120 Backend event stats ------------------- diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index abc82eb42999..78d36c22672e 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -22,7 +22,7 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn PushActorTask, (std::unique_ptr request, bool skip_queue, - const ClientCallback &callback), + ClientCallback &&callback), (override)); MOCK_METHOD(void, PushNormalTask, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 85a253339cdd..a09d1e4afc0c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -4037,7 +4037,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id) << "Cancel an actor task"; CancelActorTaskOnExecutor( - caller_worker_id, task_id, force_kill, recursive, on_cancel_callback); + caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback)); } else { RAY_CHECK(current_actor_id.IsNil()); RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task"; @@ -4048,7 +4048,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, void CoreWorker::CancelTaskOnExecutor(TaskID task_id, bool force_kill, bool recursive, - OnCanceledCallback on_canceled) { + const OnCanceledCallback &on_canceled) { bool requested_task_running; { absl::MutexLock lock(&mutex_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 72b74774b43f..3357bad4cfd9 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1710,7 +1710,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void CancelTaskOnExecutor(TaskID intended_task_id, bool force_kill, bool recursive, - OnCanceledCallback on_canceled); + const OnCanceledCallback &on_canceled); /// Cancel an actor task queued or running in the current worker. /// diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 3d90c3c8fa36..812aae33f955 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -104,6 +104,7 @@ bool GetRequest::Wait(int64_t timeout_ms) { auto remaining_timeout_ms = timeout_ms; auto timeout_timestamp = current_time_ms() + timeout_ms; while (!is_ready_) { + // TODO (dayshah): see if using cv condition function instead of busy while helps. auto status = cv_.wait_for(lock, std::chrono::milliseconds(remaining_timeout_ms)); auto current_timestamp = current_time_ms(); remaining_timeout_ms = diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index dc3c723b20c0..bc5a78c7862e 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -16,8 +16,6 @@ #include "ray/common/buffer.h" #include "ray/common/common_protocol.h" -#include "ray/common/constants.h" -#include "ray/core_worker/common.h" #include "ray/gcs/pb_util.h" #include "ray/util/exponential_backoff.h" #include "ray/util/util.h" @@ -26,10 +24,10 @@ namespace ray { namespace core { // Start throttling task failure logs once we hit this threshold. -const int64_t kTaskFailureThrottlingThreshold = 50; +constexpr int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. -const int64_t kTaskFailureLoggingFrequencyMillis = 5000; +constexpr int64_t kTaskFailureLoggingFrequencyMillis = 5000; absl::flat_hash_set ObjectRefStream::GetItemsUnconsumed() const { absl::flat_hash_set result; @@ -237,7 +235,9 @@ std::vector TaskManager::AddPendingTask( // Add new owned objects for the return values of the task. size_t num_returns = spec.NumReturns(); std::vector returned_refs; + returned_refs.reserve(num_returns); std::vector return_ids; + return_ids.reserve(num_returns); for (size_t i = 0; i < num_returns; i++) { auto return_id = spec.ReturnId(i); if (!spec.IsActorCreationTask()) { @@ -252,7 +252,7 @@ std::vector TaskManager::AddPendingTask( // language frontend. Note that the language bindings should set // skip_adding_local_ref=True to avoid double referencing the object. reference_counter_->AddOwnedObject(return_id, - /*inner_ids=*/{}, + /*contained_ids=*/{}, caller_address, call_site, -1, @@ -398,7 +398,7 @@ void TaskManager::DrainAndShutdown(std::function shutdown) { bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - return submissible_tasks_.count(task_id) > 0; + return submissible_tasks_.contains(task_id); } bool TaskManager::IsTaskPending(const TaskID &task_id) const { @@ -707,7 +707,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( HandleTaskReturn(object_id, return_object, NodeID::FromBinary(request.worker_addr().raylet_id()), - /*store_in_plasma*/ store_in_plasma_ids.count(object_id)); + /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); } // Handle backpressure if needed. @@ -807,7 +807,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (HandleTaskReturn(object_id, return_object, NodeID::FromBinary(worker_addr.raylet_id()), - store_in_plasma_ids.count(object_id))) { + store_in_plasma_ids.contains(object_id))) { direct_return_ids.push_back(object_id); } } @@ -933,7 +933,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, HandleTaskReturn(generator_return_id, return_object, NodeID::FromBinary(worker_addr.raylet_id()), - store_in_plasma_ids.count(generator_return_id)); + store_in_plasma_ids.contains(generator_return_id)); } } } @@ -1047,18 +1047,18 @@ void TaskManager::FailPendingTask(const TaskID &task_id, << "Tried to fail task that was not pending " << task_id; spec = it->second.spec; - if (status && status->IsIntentionalSystemExit()) { + if ((status != nullptr) && status->IsIntentionalSystemExit()) { // We don't mark intentional system exit as failures, such as tasks that // exit by exit_actor(), exit by ray.shutdown(), etc. These tasks are expected // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { - SetTaskStatus( - it->second, - rpc::TaskStatus::FAILED, - (ray_error_info == nullptr - ? gcs::GetRayErrorInfo(error_type, (status ? status->ToString() : "")) - : *ray_error_info)); + SetTaskStatus(it->second, + rpc::TaskStatus::FAILED, + (ray_error_info == nullptr + ? gcs::GetRayErrorInfo( + error_type, (status != nullptr ? status->ToString() : "")) + : *ray_error_info)); } submissible_tasks_.erase(it); num_pending_tasks_--; @@ -1308,7 +1308,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( int64_t num_returns = spec.NumReturns(); for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); - if (store_in_plasma_ids.count(object_id)) { + if (store_in_plasma_ids.contains(object_id)) { put_in_local_plasma_callback_(error, object_id); } else { in_memory_store_->Put(error, object_id); @@ -1316,7 +1316,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( } if (spec.ReturnsDynamic()) { for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { - if (store_in_plasma_ids.count(dynamic_return_id)) { + if (store_in_plasma_ids.contains(dynamic_return_id)) { put_in_local_plasma_callback_(error, dynamic_return_id); } else { in_memory_store_->Put(error, dynamic_return_id); @@ -1341,7 +1341,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( auto num_streaming_generator_returns = spec.NumStreamingGeneratorReturns(); for (size_t i = 0; i < num_streaming_generator_returns; i++) { const auto generator_return_id = spec.StreamingGeneratorReturnId(i); - if (store_in_plasma_ids.count(generator_return_id)) { + if (store_in_plasma_ids.contains(generator_return_id)) { put_in_local_plasma_callback_(error, generator_return_id); } else { in_memory_store_->Put(error, generator_return_id); diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index eae9912ef152..df0b26f3d0d9 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -66,14 +66,14 @@ class TaskFinisherInterface { virtual absl::optional GetTaskSpec(const TaskID &task_id) const = 0; - virtual ~TaskFinisherInterface() {} + virtual ~TaskFinisherInterface() = default; }; class TaskResubmissionInterface { public: virtual bool ResubmitTask(const TaskID &task_id, std::vector *task_deps) = 0; - virtual ~TaskResubmissionInterface() {} + virtual ~TaskResubmissionInterface() = default; }; using TaskStatusCounter = CounterMap>; @@ -222,7 +222,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa max_lineage_bytes_(max_lineage_bytes), task_event_buffer_(task_event_buffer) { task_counter_.SetOnChangeCallback( - [this](const std::tuple key) + [this](const std::tuple &key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { ray::stats::STATS_tasks.Record( task_counter_.Get(key), @@ -621,13 +621,13 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa num_retries_left(num_retries_left_arg), counter(counter), num_oom_retries_left(num_oom_retries_left) { + reconstructable_return_ids.reserve(num_returns); for (size_t i = 0; i < num_returns; i++) { reconstructable_return_ids.insert(spec.ReturnId(i)); } - auto new_status = + status = std::make_tuple(spec.GetName(), rpc::TaskStatus::PENDING_ARGS_AVAIL, false); - counter.Increment(new_status); - status = new_status; + counter.Increment(status); } void SetStatus(rpc::TaskStatus new_status) { @@ -640,7 +640,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // for FINISHED and FAILED tasks. counter.Increment(new_tuple); } - status = new_tuple; + status = std::move(new_tuple); } void MarkRetry() { is_retry_ = true; } diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index ce8eb530d7b5..b6b5d2278f03 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -80,7 +80,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { void PushActorTask(std::unique_ptr request, bool skip_queue, - const rpc::ClientCallback &callback) override { + rpc::ClientCallback &&callback) override { received_seq_nos.push_back(request->sequence_number()); callbacks.push_back(callback); } diff --git a/src/ray/core_worker/test/normal_task_submitter_test.cc b/src/ray/core_worker/test/normal_task_submitter_test.cc index dd3304901ae6..f7feb5ae26e0 100644 --- a/src/ray/core_worker/test/normal_task_submitter_test.cc +++ b/src/ray/core_worker/test/normal_task_submitter_test.cc @@ -147,6 +147,7 @@ class MockTaskFinisher : public TaskFinisherInterface { const Status *status, const rpc::RayErrorInfo *ray_error_info = nullptr) override { num_fail_pending_task_calls++; + num_tasks_failed++; } bool FailOrRetryPendingTask(const TaskID &task_id, @@ -2093,7 +2094,7 @@ TEST(NormalTaskSubmitterTest, TestKillPendingTask) { ASSERT_EQ(raylet_client->num_workers_returned, 0); ASSERT_EQ(raylet_client->num_workers_disconnected, 0); ASSERT_EQ(task_finisher->num_tasks_complete, 0); - ASSERT_EQ(task_finisher->num_tasks_failed, 0); + ASSERT_EQ(task_finisher->num_tasks_failed, 1); ASSERT_EQ(task_finisher->num_fail_pending_task_calls, 1); ASSERT_EQ(raylet_client->num_leases_canceled, 1); ASSERT_TRUE(raylet_client->ReplyCancelWorkerLease()); diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index babd1ba8dc6d..2f18f6299bb8 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -14,14 +14,8 @@ #include "ray/core_worker/transport/actor_task_submitter.h" -#include - -#include "ray/common/task/task.h" #include "ray/gcs/pb_util.h" -using ray::rpc::ActorTableData; -using namespace ray::gcs; - namespace ray { namespace core { @@ -235,7 +229,7 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { absl::MutexLock lock(&mu_); const auto queue_it = client_queues_.find(task_spec.ActorId()); const auto &death_cause = queue_it->second.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); error_type = error_info.error_type(); } auto status = Status::IOError("cancelling task of dead actor"); @@ -366,7 +360,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, const rpc::ActorDeathCause &death_cause, bool is_restartable) { RAY_LOG(DEBUG).WithField(actor_id) << "Disconnecting from actor, death context type=" - << GetActorDeathCauseString(death_cause); + << gcs::GetActorDeathCauseString(death_cause); absl::flat_hash_map> inflight_task_callbacks; @@ -432,7 +426,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, // Failing tasks has to be done without mu_ hold because the callback // might require holding mu_ which will lead to a deadlock. auto status = Status::IOError("cancelling all pending tasks of dead actor"); - const auto error_info = GetErrorInfoFromActorDeathCause(death_cause); + const auto error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); const auto error_type = error_info.error_type(); for (auto &task_id : task_ids_to_fail) { @@ -639,7 +633,8 @@ void ActorTaskSubmitter::PushActorTask(ClientQueue &queue, task_finisher_.MarkTaskWaitingForExecution(task_id, NodeID::FromBinary(addr.raylet_id()), WorkerID::FromBinary(addr.worker_id())); - queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback); + queue.rpc_client->PushActorTask( + std::move(request), skip_queue, std::move(wrapped_callback)); } void ActorTaskSubmitter::HandlePushTaskReply(const Status &status, @@ -704,7 +699,7 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status, is_actor_dead = queue.state == rpc::ActorTableData::DEAD; if (is_actor_dead) { const auto &death_cause = queue.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); fail_immediately = error_info.has_actor_died_error() && error_info.actor_died_error().has_oom_context() && error_info.actor_died_error().oom_context().fail_immediately(); @@ -953,7 +948,7 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv request.set_recursive(recursive); request.set_caller_worker_id(task_spec.CallerWorkerId().Binary()); client->CancelTask(request, - [this, task_spec, recursive, task_id]( + [this, task_spec = std::move(task_spec), recursive, task_id]( const Status &status, const rpc::CancelTaskReply &reply) { RAY_LOG(DEBUG).WithField(task_spec.TaskId()) << "CancelTask RPC response received with status " diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index 997f770ca1de..d4543e4b9ce7 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -91,8 +91,10 @@ void LocalDependencyResolver::ResolveDependencies( // This is deleted when the last dependency fetch callback finishes. auto inserted = pending_tasks_.emplace( task_id, - std::make_unique( - task, local_dependency_ids, actor_dependency_ids, on_dependencies_resolved)); + std::make_unique(task, + local_dependency_ids, + actor_dependency_ids, + std::move(on_dependencies_resolved))); RAY_CHECK(inserted.second); } @@ -137,7 +139,7 @@ void LocalDependencyResolver::ResolveDependencies( for (const auto &actor_id : actor_dependency_ids) { actor_creator_.AsyncWaitForActorRegisterFinish( - actor_id, [this, task_id, on_dependencies_resolved](const Status &status) { + actor_id, [this, task_id](const Status &status) { std::unique_ptr resolved_task_state = nullptr; { diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 84a1eb92f5ab..9e8ac970db1b 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -16,7 +16,6 @@ #include "ray/core_worker/transport/dependency_resolver.h" #include "ray/gcs/pb_util.h" -#include "ray/stats/metric_defs.h" namespace ray { namespace core { @@ -517,14 +516,13 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli auto &task_spec = tasks_to_fail.front(); if (task_spec.IsActorCreationTask() && error_type == rpc::ErrorType::TASK_PLACEMENT_GROUP_REMOVED) { - RAY_UNUSED(task_finisher_->FailPendingTask( - task_spec.TaskId(), - rpc::ErrorType::ACTOR_PLACEMENT_GROUP_REMOVED, - &error_status, - &error_info)); + task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::ACTOR_PLACEMENT_GROUP_REMOVED, + &error_status, + &error_info); } else { - RAY_UNUSED(task_finisher_->FailPendingTask( - task_spec.TaskId(), error_type, &error_status, &error_info)); + task_finisher_->FailPendingTask( + task_spec.TaskId(), error_type, &error_status, &error_info); } tasks_to_fail.pop_front(); } @@ -630,8 +628,7 @@ void NormalTaskSubmitter::PushNormalTask( if (reply.was_cancelled_before_running()) { RAY_LOG(DEBUG) << "Task " << task_id << " was cancelled before it started running."; - RAY_UNUSED( - task_finisher_->FailPendingTask(task_id, rpc::ErrorType::TASK_CANCELLED)); + task_finisher_->FailPendingTask(task_id, rpc::ErrorType::TASK_CANCELLED); } else if (!task_spec.GetMessage().retry_exceptions() || !reply.is_retryable_error() || !task_finisher_->RetryTaskIfPossible( @@ -703,7 +700,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursive) { RAY_LOG(INFO) << "Cancelling a task: " << task_spec.TaskId() << " force_kill: " << force_kill << " recursive: " << recursive; - const SchedulingKey scheduling_key( + SchedulingKey scheduling_key( task_spec.GetSchedulingClass(), task_spec.GetDependencyIds(), task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), @@ -724,12 +721,9 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, for (auto spec = scheduling_tasks.begin(); spec != scheduling_tasks.end(); spec++) { if (spec->TaskId() == task_spec.TaskId()) { scheduling_tasks.erase(spec); - - if (scheduling_tasks.empty()) { - CancelWorkerLeaseIfNeeded(scheduling_key); - } - RAY_UNUSED(task_finisher_->FailPendingTask(task_spec.TaskId(), - rpc::ErrorType::TASK_CANCELLED)); + CancelWorkerLeaseIfNeeded(scheduling_key); + task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::TASK_CANCELLED); return Status::OK(); } } @@ -755,7 +749,6 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, } RAY_CHECK(client != nullptr); - auto request = rpc::CancelTaskRequest(); request.set_intended_task_id(task_spec.TaskId().Binary()); request.set_force_kill(force_kill); @@ -763,8 +756,11 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, request.set_caller_worker_id(task_spec.CallerWorkerId().Binary()); client->CancelTask( request, - [this, task_spec, scheduling_key, force_kill, recursive]( - const Status &status, const rpc::CancelTaskReply &reply) { + [this, + task_spec = std::move(task_spec), + scheduling_key = std::move(scheduling_key), + force_kill, + recursive](const Status &status, const rpc::CancelTaskReply &reply) mutable { absl::MutexLock lock(&mu_); RAY_LOG(DEBUG) << "CancelTask RPC response received for " << task_spec.TaskId() << " with status " << status.ToString(); @@ -789,7 +785,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, cancel_retry_timer_->async_wait( boost::bind(&NormalTaskSubmitter::CancelTask, this, - task_spec, + std::move(task_spec), force_kill, recursive)); } else { diff --git a/src/ray/core_worker/transport/normal_task_submitter.h b/src/ray/core_worker/transport/normal_task_submitter.h index b434324008f2..532e3cb85600 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.h +++ b/src/ray/core_worker/transport/normal_task_submitter.h @@ -34,9 +34,8 @@ namespace ray { namespace core { -typedef std::function(const std::string &ip_address, - int port)> - LeaseClientFactoryFn; +using LeaseClientFactoryFn = + std::function(const std::string &, int)>; // The task queues are keyed on resource shape & function descriptor // (encapsulated in SchedulingClass) to defer resource allocation decisions to the raylet @@ -49,7 +48,7 @@ typedef std::function(const std::string &i // be aware of the actor and is not able to manage it. It is also keyed on // RuntimeEnvHash, because a worker can only run a task if the worker's RuntimeEnvHash // matches the RuntimeEnvHash required by the task spec. -typedef int RuntimeEnvHash; +using RuntimeEnvHash = int; using SchedulingKey = std::tuple, ActorID, RuntimeEnvHash>; @@ -64,7 +63,7 @@ class LeaseRequestRateLimiter { // Lease request rate-limiter with fixed number. class StaticLeaseRequestRateLimiter : public LeaseRequestRateLimiter { public: - StaticLeaseRequestRateLimiter(size_t limit) : kLimit(limit) {} + explicit StaticLeaseRequestRateLimiter(size_t limit) : kLimit(limit) {} size_t GetMaxPendingLeaseRequestsPerSchedulingCategory() override { return kLimit; } private: @@ -89,7 +88,7 @@ class NormalTaskSubmitter { const JobID &job_id, std::shared_ptr lease_request_rate_limiter, absl::optional cancel_timer = absl::nullopt) - : rpc_address_(rpc_address), + : rpc_address_(std::move(rpc_address)), local_lease_client_(lease_client), lease_client_factory_(lease_client_factory), lease_policy_(std::move(lease_policy)), diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index db5df1c0ce15..add45a82b24d 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -47,10 +47,10 @@ namespace ray { namespace rpc { /// The maximum number of requests in flight per client. -const int64_t kMaxBytesInFlight = 16 * 1024 * 1024; +inline constexpr int64_t kMaxBytesInFlight = 16L * 1024 * 1024; /// The base size in bytes per request. -const int64_t kBaseRequestSize = 1024; +inline constexpr int64_t kBaseRequestSize = 1024; /// Get the estimated size in bytes of the given task. const static int64_t RequestSizeInBytes(const PushTaskRequest &request) { @@ -87,7 +87,7 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { /// \return if the rpc call succeeds virtual void PushActorTask(std::unique_ptr request, bool skip_queue, - const ClientCallback &callback) {} + ClientCallback &&callback) {} /// Similar to PushActorTask, but sets no ordering constraint. This is used to /// push non-actor tasks directly to a worker. @@ -196,7 +196,7 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { /// Returns the max acked sequence number, useful for checking on progress. virtual int64_t ClientProcessedUpToSeqno() { return -1; } - virtual ~CoreWorkerClientInterface(){}; + virtual ~CoreWorkerClientInterface() = default; }; /// Client used for communicating with a remote worker server. @@ -350,7 +350,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, void PushActorTask(std::unique_ptr request, bool skip_queue, - const ClientCallback &callback) override { + ClientCallback &&callback) override { if (skip_queue) { // Set this value so that the actor does not skip any tasks when // processing this request. We could also set it to max_finished_seq_no_, @@ -367,9 +367,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, { absl::MutexLock lock(&mutex_); - send_queue_.push_back(std::make_pair( - std::move(request), - std::move(const_cast &>(callback)))); + send_queue_.emplace_back(std::move(request), std::move(callback)); } SendRequests(); }