Skip to content

Commit

Permalink
[refactor](wal) move group commit load content length to runtime state (
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Jan 2, 2024
1 parent 4692a62 commit b3f6921
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 61 deletions.
3 changes: 1 addition & 2 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
ctx->put_result.params.__set_content_length(content_length);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
ctx->put_result.params.__set_content_length(content_length);
}
}

Expand Down
48 changes: 3 additions & 45 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,33 +498,17 @@ Status LoadBlockQueue::close_wal() {
return Status::OK();
}

bool LoadBlockQueue::has_enough_wal_disk_space(
const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id,
bool is_blocks_contain_all_load_data) {
size_t blocks_size = 0;
for (auto block : blocks) {
blocks_size += block->bytes();
}
size_t content_length = 0;
Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length);
if (st.ok()) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
} else {
return Status::InternalError("can not find load id.");
}
size_t pre_allocated = is_blocks_contain_all_load_data
? blocks_size
: (blocks_size > content_length ? blocks_size : content_length);
bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
size_t available_bytes = 0;
{
st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes);
Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes);
if (!st.ok()) {
LOG(WARNING) << "get wal disk available size filed!";
}
}
if (pre_allocated < available_bytes) {
st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true);
Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true);
if (!st.ok()) {
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
}
Expand All @@ -534,30 +518,4 @@ bool LoadBlockQueue::has_enough_wal_disk_space(
return false;
}
}

Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) {
std::unique_lock l(_load_info_lock);
if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) {
_load_id_to_content_length_map.insert(std::make_pair(load_id, content_length));
}
return Status::OK();
}

Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) {
std::shared_lock l(_load_info_lock);
if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) {
*content_length = _load_id_to_content_length_map[load_id];
return Status::OK();
}
return Status::InternalError("can not find load id!");
}

Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
std::unique_lock l(_load_info_lock);
if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) {
return Status::InternalError("can not remove load id!");
}
_load_id_to_content_length_map.erase(load_id);
return Status::OK();
}
} // namespace doris
8 changes: 1 addition & 7 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class LoadBlockQueue {
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
Status close_wal();
bool has_enough_wal_disk_space(const std::vector<std::shared_ptr<vectorized::Block>>& blocks,
const TUniqueId& load_id, bool is_blocks_contain_all_load_data);
bool has_enough_wal_disk_space(size_t pre_allocated);

// 1s
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
Expand Down Expand Up @@ -157,9 +156,6 @@ class GroupCommitMgr {
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);
Status update_load_info(TUniqueId load_id, size_t content_length);
Status get_load_info(TUniqueId load_id, size_t* content_length);
Status remove_load_info(TUniqueId load_id);

private:
ExecEnv* _exec_env = nullptr;
Expand All @@ -170,8 +166,6 @@ class GroupCommitMgr {
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for back pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::shared_mutex _load_info_lock;
std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map;
};

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
if (request.__isset.wal_id) {
_runtime_state->set_wal_id(request.wal_id);
}
if (request.__isset.content_length) {
_runtime_state->set_content_length(request.content_length);
}

if (request.query_options.__isset.is_report_success) {
_is_report_success = request.query_options.is_report_success;
Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ class RuntimeState {

void set_wal_id(int64_t wal_id) { _wal_id = wal_id; }

int64_t wal_id() { return _wal_id; }
int64_t wal_id() const { return _wal_id; }

void set_content_length(size_t content_length) { _content_length = content_length; }

size_t content_length() const { return _content_length; }

const std::string& import_label() { return _import_label; }

Expand Down Expand Up @@ -659,6 +663,7 @@ class RuntimeState {
std::string _load_dir;
int64_t _load_job_id;
int64_t _wal_id = -1;
size_t _content_length = 0;

// mini load
int64_t _normal_row_number;
Expand Down
17 changes: 13 additions & 4 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {

Status GroupCommitBlockSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
RETURN_IF_ERROR(
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(), 0));
_state = state;

// profile must add to state's object pool
Expand Down Expand Up @@ -240,8 +238,8 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
_db_id, _table_id, _base_schema_version, load_id, _load_block_queue,
_state->be_exec_version()));
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
_group_commit_mode = _load_block_queue->has_enough_wal_disk_space(
_blocks, load_id, is_blocks_contain_all_load_data)
size_t pre_allocated = _pre_allocated(is_blocks_contain_all_load_data);
_group_commit_mode = _load_block_queue->has_enough_wal_disk_space(pre_allocated)
? TGroupCommitMode::ASYNC_MODE
: TGroupCommitMode::SYNC_MODE;
if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
Expand All @@ -265,5 +263,16 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
return Status::OK();
}

size_t GroupCommitBlockSink::_pre_allocated(bool is_blocks_contain_all_load_data) {
size_t blocks_size = 0;
for (auto block : _blocks) {
blocks_size += block->bytes();
}
return is_blocks_contain_all_load_data
? blocks_size
: (blocks_size > _state->content_length() ? blocks_size
: _state->content_length());
}

} // namespace vectorized
} // namespace doris
1 change: 1 addition & 0 deletions be/src/vec/sink/group_commit_block_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class GroupCommitBlockSink : public DataSink {
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block);
Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data);
size_t _pre_allocated(bool is_blocks_contain_all_load_data);

vectorized::VExprContextSPtrs _output_vexpr_ctxs;

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ struct TExecPlanFragmentParams {
27: optional i32 total_load_streams

28: optional i32 num_local_sink

29: optional i64 content_length
}

struct TExecPlanFragmentParamsList {
Expand Down

0 comments on commit b3f6921

Please sign in to comment.