Skip to content

Commit

Permalink
[improvement](task exec context) add parent class HasTaskExecutionCtx…
Browse files Browse the repository at this point in the history
… to own the task ctx (apache#29388)



---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Jan 2, 2024
1 parent 4581618 commit 2ed122b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status) {
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
std::shared_ptr<TaskExecutionContext> lock_for_context =
task->fragment_context()->shared_from_this();
auto lock_for_context = task->fragment_context()->shared_from_this();
auto status = task->try_close(exec_status);
auto cancel = [&]() {
task->query_context()->cancel(true, status.to_string(),
Expand Down
26 changes: 25 additions & 1 deletion be/src/runtime/task_execution_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,35 @@

namespace doris {

// This class act as a super class of all context like things
// This class act as a super class of all context like things such as
// plan fragment executor or pipelinefragmentcontext or pipelinexfragmentcontext
class TaskExecutionContext : public std::enable_shared_from_this<TaskExecutionContext> {
public:
TaskExecutionContext() = default;
virtual ~TaskExecutionContext() = default;
};

using TaskExecutionContextSPtr = std::shared_ptr<TaskExecutionContext>;

// Task Execution Context maybe plan fragment executor or pipelinefragmentcontext or pipelinexfragmentcontext
// In multi thread scenario, the object is created in main thread (such as FragmentExecThread), but the object
// maybe used in other thread(such as scanner thread, brpc->sender queue). If the main thread stopped and destroy
// the object, then the other thread may core. So the other thread must lock the context to ensure the object exists.
struct HasTaskExecutionCtx {
using Weak = typename TaskExecutionContextSPtr::weak_type;

HasTaskExecutionCtx(TaskExecutionContextSPtr task_exec_ctx) : task_exec_ctx_(task_exec_ctx) {}

// Init task ctx from state, the state has to own a method named get_task_execution_context()
// like runtime state
template <typename T>
HasTaskExecutionCtx(T* state) : task_exec_ctx_(state->get_task_execution_context()) {}

public:
inline TaskExecutionContextSPtr task_exec_ctx() const { return task_exec_ctx_.lock(); }

private:
Weak task_exec_ctx_;
};

} // namespace doris
10 changes: 4 additions & 6 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
pipeline::ScanLocalStateBase* local_state,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
: _state(state),
: HasTaskExecutionCtx(state),
_state(state),
_parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
Expand All @@ -72,8 +73,6 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
_finish_dependency(finish_dependency) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
Expand Down Expand Up @@ -102,7 +101,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: _state(state),
: HasTaskExecutionCtx(state),
_state(state),
_parent(parent),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
Expand All @@ -120,8 +120,6 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class SimplifiedScanScheduler;
// ScannerContext is also the scheduling unit of ScannerScheduler.
// ScannerScheduler schedules a ScannerContext at a time,
// and submits the Scanners to the scanner thread pool for data scanning.
class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
public HasTaskExecutionCtx {
ENABLE_FACTORY_CREATOR(ScannerContext);

public:
Expand Down Expand Up @@ -180,8 +181,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {

bool _should_reset_thread_name = true;

std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return _task_exec_ctx; }

private:
template <typename Parent>
Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
Expand All @@ -199,7 +198,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
void _set_scanner_done();

RuntimeState* _state = nullptr;
std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
VScanNode* _parent = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}

void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
auto task_lock = ctx->get_task_execution_context().lock();
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id)
// << " maybe finished";
Expand Down Expand Up @@ -266,7 +266,7 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {

void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx, VScannerSPtr scanner) {
auto task_lock = ctx->get_task_execution_context().lock();
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id)
// << " maybe finished";
Expand Down

0 comments on commit 2ed122b

Please sign in to comment.