Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dockerfile_modify' into dockerfi…
Browse files Browse the repository at this point in the history
…le_modify
  • Loading branch information
catpineapple committed Apr 30, 2024
2 parents e9c8688 + ea6f1e9 commit efee566
Show file tree
Hide file tree
Showing 983 changed files with 28,490 additions and 16,860 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ github:
- shuke987
- wm1581066
- KassieZ
- gavinchou
- yujun777
- gavinchou

notifications:
pullrequests_status: [email protected]
Expand Down
8 changes: 1 addition & 7 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ option(USE_LIBCPP "Use libc++" OFF)
option(USE_MEM_TRACKER, "Use memory tracker" ON)
option(USE_UNWIND "Use libunwind" ON)
option(USE_JEMALLOC "Use jemalloc" ON)
option(USE_JEMALLOC_HOOK "Use jemalloc hook" ON)
if (OS_MACOSX)
set(GLIBC_COMPATIBILITY OFF)
set(USE_LIBCPP ON)
Expand All @@ -88,7 +87,6 @@ message(STATUS "GLIBC_COMPATIBILITY is ${GLIBC_COMPATIBILITY}")
message(STATUS "USE_LIBCPP is ${USE_LIBCPP}")
message(STATUS "USE_MEM_TRACKER is ${USE_MEM_TRACKER}")
message(STATUS "USE_JEMALLOC is ${USE_JEMALLOC}")
message(STATUS "USE_JEMALLOC_HOOK is ${USE_JEMALLOC_HOOK}")
message(STATUS "USE_UNWIND is ${USE_UNWIND}")
message(STATUS "ENABLE_PCH is ${ENABLE_PCH}")

Expand Down Expand Up @@ -284,8 +282,7 @@ if (COMPILER_CLANG)
-Wunused-member-function
-Wunused-macros
-Wconversion)
add_compile_options(-Wno-vla-extension
-Wno-gnu-statement-expression
add_compile_options( -Wno-gnu-statement-expression
-Wno-implicit-float-conversion
-Wno-implicit-int-conversion
-Wno-sign-conversion
Expand Down Expand Up @@ -348,9 +345,6 @@ endif()
if (USE_JEMALLOC)
add_definitions(-DUSE_JEMALLOC)
endif()
if (USE_JEMALLOC_HOOK)
add_definitions(-DUSE_JEMALLOC_HOOK)
endif()

# Compile with libunwind
if (USE_UNWIND)
Expand Down
5 changes: 4 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });

_workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }));

Expand All @@ -203,7 +206,7 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_

_workers[TTaskType::CALCULATE_DELETE_BITMAP] = std::make_unique<TaskWorkerPool>(
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return calc_delete_bimtap_callback(engine, task); });
[&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
Expand Down
23 changes: 19 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand All @@ -452,6 +453,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -522,7 +524,7 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
}

PriorTaskWorkerPool::PriorTaskWorkerPool(
std::string_view name, int normal_worker_count, int high_prior_worker_conut,
std::string_view name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
Expand All @@ -535,8 +537,8 @@ PriorTaskWorkerPool::PriorTaskWorkerPool(
CHECK(st.ok()) << name << ": " << st;

st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
.set_min_threads(high_prior_worker_conut)
.set_max_threads(high_prior_worker_conut)
.set_min_threads(high_prior_worker_count)
.set_max_threads(high_prior_worker_count)
.build(&_high_prior_pool);
CHECK(st.ok()) << name << ": " << st;

Expand Down Expand Up @@ -1077,6 +1079,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

std::map<int64_t, int64_t> partitions_version;
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
Expand Down Expand Up @@ -1365,6 +1372,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
Expand All @@ -1384,6 +1392,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
};
st = client->reset(conf);
fs = std::move(existed_fs);
Expand Down Expand Up @@ -1925,6 +1934,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const TVisibleVersionReq& visible_version_req = req.visible_version_req;
engine.tablet_manager()->update_partitions_visible_version(
visible_version_req.partition_version);
}

void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
Expand Down Expand Up @@ -1998,7 +2013,7 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
Status status;
Expand Down
6 changes: 4 additions & 2 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {

class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_conut,
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback);

~PriorTaskWorkerPool() override;
Expand Down Expand Up @@ -176,6 +176,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand All @@ -184,6 +186,6 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
}
is_set_workload_group_info = true;

// 1 parse topicinfo to group info
// 1 parse topic info to group info
WorkloadGroupInfo workload_group_info;
Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
&workload_group_info);
Expand Down
26 changes: 11 additions & 15 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
}
});
};
static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f)));
static_cast<void>(_engine.sync_load_for_tablets_thread_pool().submit_func(std::move(f)));
}

void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
const TGetTopNHotPartitionsRequest& request) {
TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables);
_engine.tablet_hotspot().get_top_n_hot_partition(&response.hot_tables);
response.file_cache_size = io::FileCacheFactory::instance()->get_capacity();
response.__isset.hot_tables = !response.hot_tables.empty();
}

void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
const TWarmUpTabletsRequest& request) {
Status st;
auto* manager = CloudWarmUpManager::instance();
auto& manager = _engine.cloud_warm_up_manager();
switch (request.type) {
case TWarmUpTabletsRequestType::SET_JOB: {
LOG_INFO("receive the warm up request.")
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
st = manager->check_and_set_job_id(request.job_id);
st = manager.check_and_set_job_id(request.job_id);
if (!st) {
LOG_WARNING("SET_JOB failed.").error(st);
break;
Expand All @@ -105,9 +105,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("batch_id", request.batch_id)
.tag("jobs size", request.job_metas.size());
bool retry = false;
st = manager->check_and_set_batch_id(request.job_id, request.batch_id, &retry);
st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry);
if (!retry && st) {
manager->add_job(request.job_metas);
manager.add_job(request.job_metas);
} else {
if (retry) {
LOG_WARNING("retry the job.")
Expand All @@ -121,7 +121,7 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
}
case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: {
auto [job_id, batch_id, pending_job_size, finish_job_size] =
manager->get_current_job_state();
manager.get_current_job_state();
LOG_INFO("receive the warm up request.")
.tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE")
.tag("job_id", job_id)
Expand All @@ -138,7 +138,7 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "CLEAR_JOB")
.tag("job_id", request.job_id);
st = manager->clear_job(request.job_id);
st = manager.clear_job(request.job_id);
break;
}
default:
Expand All @@ -165,12 +165,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
PGetFileCacheMetaResponse brpc_response;
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
if (!cntl.Failed()) {
std::vector<FileCacheBlockMeta> metas;
std::transform(brpc_response.file_cache_block_metas().cbegin(),
brpc_response.file_cache_block_metas().cend(), std::back_inserter(metas),
[](const FileCacheBlockMeta& meta) { return meta; });
io::DownloadTask download_task(std::move(metas));
io::FileCacheBlockDownloader::instance()->submit_download_task(download_task);
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
}
Expand All @@ -181,7 +177,7 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) {
std::map<int64_t, bool> task_done;
io::FileCacheBlockDownloader::instance()->check_download_task(request.tablets, &task_done);
_engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
response.__set_task_done(task_done);

Status st = Status::OK();
Expand Down
4 changes: 1 addition & 3 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ class CloudBackendService final : public BaseBackendService {

~CloudBackendService() override;

// TODO(plat1ko): cloud backend functions

// If another cluster load, FE need to notify the cluster to sync the load data
void sync_load_for_tablets(TSyncLoadForTabletsResponse& response,
const TSyncLoadForTabletsRequest& request) override;
Expand All @@ -56,7 +54,7 @@ class CloudBackendService final : public BaseBackendService {
const TCheckWarmUpCacheAsyncRequest& request) override;

private:
[[maybe_unused]] CloudStorageEngine& _engine;
CloudStorageEngine& _engine;
};

} // namespace doris
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
_filter_input_rowset();
if (_input_rowsets.size() <= 1) {
return Status::Error<BE_NO_SUITABLE_VERSION>(
"insuffient compation input rowset, #rowsets={}", _input_rowsets.size());
"insufficent compaction input rowset, #rowsets={}", _input_rowsets.size());
}

if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
Expand Down Expand Up @@ -283,7 +283,7 @@ Status CloudBaseCompaction::modify_rowsets() {
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaciton(
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaciton(
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
int64_t _promotion_min_size;
/// lower bound size to do compaction compaction.
int64_t _compaction_min_size;
// cululative compaction promotion version count, only works for unique key MoW table
// cumulative compaction promotion version count, only works for unique key MoW table
int64_t _promotion_version_count;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
OlapStopWatch watch;
VLOG_NOTICE << "begin to calculate delete bitmap. transaction_id=" << transaction_id;
std::unique_ptr<ThreadPoolToken> token =
_engine.calc_tablet_delete_bitmap_task_thread_pool()->new_token(
_engine.calc_tablet_delete_bitmap_task_thread_pool().new_token(
ThreadPool::ExecutionMode::CONCURRENT);

for (const auto& partition : _cal_delete_bitmap_req.partitions) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status CloudFullCompaction::prepare_compact() {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
}

// always sync lastest rowset for full compaction
// always sync latest rowset for full compaction
RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());

RETURN_IF_ERROR(pick_rowsets_to_compact());
Expand Down Expand Up @@ -124,7 +124,7 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
}
if (_input_rowsets.size() <= 1) {
return Status::Error<BE_NO_SUITABLE_VERSION>(
"insuffient compation input rowset, #rowsets={}", _input_rowsets.size());
"insufficent compaction input rowset, #rowsets={}", _input_rowsets.size());
}

if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class CloudInternalServiceImpl final : public PInternalService {

~CloudInternalServiceImpl() override;

// TODO(plat1ko): cloud internal service functions

void alter_vault_sync(google::protobuf::RpcController* controller,
const doris::PAlterVaultSyncRequest* request,
PAlterVaultSyncResponse* response,
Expand All @@ -43,7 +41,7 @@ class CloudInternalServiceImpl final : public PInternalService {
google::protobuf::Closure* done) override;

private:
[[maybe_unused]] CloudStorageEngine& _engine;
CloudStorageEngine& _engine;
};

} // namespace doris
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stat
class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
return get_pooled_client(stub);
}

Expand Down Expand Up @@ -458,7 +458,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
continue;
}
if (!st.ok()) {
LOG_WARNING("failed to get delete bimtap")
LOG_WARNING("failed to get delete bitmap")
.tag("tablet", tablet->tablet_id())
.error(st);
return st;
Expand Down
Loading

0 comments on commit efee566

Please sign in to comment.