From 2ed122b787f673797a4e9e20b4af4e4ece149fcd Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Tue, 2 Jan 2024 15:28:27 +0800 Subject: [PATCH] [improvement](task exec context) add parent class HasTaskExecutionCtx to own the task ctx (#29388) --------- Co-authored-by: yiguolei --- be/src/pipeline/task_scheduler.cpp | 3 +-- be/src/runtime/task_execution_context.h | 26 +++++++++++++++++++++- be/src/vec/exec/scan/scanner_context.cpp | 10 ++++----- be/src/vec/exec/scan/scanner_context.h | 6 ++--- be/src/vec/exec/scan/scanner_scheduler.cpp | 4 ++-- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e814e4cdf2d092..91980aeec7a8d8 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -345,8 +345,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. - std::shared_ptr lock_for_context = - task->fragment_context()->shared_from_this(); + auto lock_for_context = task->fragment_context()->shared_from_this(); auto status = task->try_close(exec_status); auto cancel = [&]() { task->query_context()->cancel(true, status.to_string(), diff --git a/be/src/runtime/task_execution_context.h b/be/src/runtime/task_execution_context.h index 08564840f59c46..c876ed5cb0d7b9 100644 --- a/be/src/runtime/task_execution_context.h +++ b/be/src/runtime/task_execution_context.h @@ -21,11 +21,35 @@ namespace doris { -// This class act as a super class of all context like things +// This class act as a super class of all context like things such as +// plan fragment executor or pipelinefragmentcontext or pipelinexfragmentcontext class TaskExecutionContext : public std::enable_shared_from_this { public: TaskExecutionContext() = default; virtual ~TaskExecutionContext() = default; }; +using TaskExecutionContextSPtr = std::shared_ptr; + +// Task Execution Context maybe plan fragment executor or pipelinefragmentcontext or pipelinexfragmentcontext +// In multi thread scenario, the object is created in main thread (such as FragmentExecThread), but the object +// maybe used in other thread(such as scanner thread, brpc->sender queue). If the main thread stopped and destroy +// the object, then the other thread may core. So the other thread must lock the context to ensure the object exists. +struct HasTaskExecutionCtx { + using Weak = typename TaskExecutionContextSPtr::weak_type; + + HasTaskExecutionCtx(TaskExecutionContextSPtr task_exec_ctx) : task_exec_ctx_(task_exec_ctx) {} + + // Init task ctx from state, the state has to own a method named get_task_execution_context() + // like runtime state + template + HasTaskExecutionCtx(T* state) : task_exec_ctx_(state->get_task_execution_context()) {} + +public: + inline TaskExecutionContextSPtr task_exec_ctx() const { return task_exec_ctx_.lock(); } + +private: + Weak task_exec_ctx_; +}; + } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 16bb1ce8487f4a..9c87967f505568 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -52,7 +52,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu pipeline::ScanLocalStateBase* local_state, std::shared_ptr dependency, std::shared_ptr finish_dependency) - : _state(state), + : HasTaskExecutionCtx(state), + _state(state), _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_row_descriptor @@ -72,8 +73,6 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _finish_dependency(finish_dependency) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); - // Use the task exec context as a lock between scanner threads and fragment exection threads - _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { @@ -102,7 +101,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS const std::list& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) - : _state(state), + : HasTaskExecutionCtx(state), + _state(state), _parent(parent), _local_state(local_state), _output_tuple_desc(output_row_descriptor @@ -120,8 +120,6 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); - // Use the task exec context as a lock between scanner threads and fragment exection threads - _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 035d396bf65660..8e840a47465882 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -65,7 +65,8 @@ class SimplifiedScanScheduler; // ScannerContext is also the scheduling unit of ScannerScheduler. // ScannerScheduler schedules a ScannerContext at a time, // and submits the Scanners to the scanner thread pool for data scanning. -class ScannerContext : public std::enable_shared_from_this { +class ScannerContext : public std::enable_shared_from_this, + public HasTaskExecutionCtx { ENABLE_FACTORY_CREATOR(ScannerContext); public: @@ -180,8 +181,6 @@ class ScannerContext : public std::enable_shared_from_this { bool _should_reset_thread_name = true; - std::weak_ptr get_task_execution_context() { return _task_exec_ctx; } - private: template Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); @@ -199,7 +198,6 @@ class ScannerContext : public std::enable_shared_from_this { void _set_scanner_done(); RuntimeState* _state = nullptr; - std::weak_ptr _task_exec_ctx; VScanNode* _parent = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 6ec83e8bd6aec8..29b53b39353bba 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -172,7 +172,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { - auto task_lock = ctx->get_task_execution_context().lock(); + auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) // << " maybe finished"; @@ -266,7 +266,7 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, VScannerSPtr scanner) { - auto task_lock = ctx->get_task_execution_context().lock(); + auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) // << " maybe finished";