Skip to content

Commit

Permalink
[Refactor] Add more log in cross-cluster replication (backport #53054) (
Browse files Browse the repository at this point in the history
#53075)

Co-authored-by: xiangguangyxg <[email protected]>
  • Loading branch information
mergify[bot] and xiangguangyxg authored Nov 21, 2024
1 parent 03bf0ca commit 8a8c24d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 36 deletions.
103 changes: 67 additions & 36 deletions be/src/storage/replication_txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,12 @@ Status ReplicationTxnManager::publish_txn(TTransactionId transaction_id, TPartit
std::string snapshot_dir_path =
get_tablet_snapshot_dir_path(tablet->data_dir(), transaction_id, partition_id, tablet->tablet_id());

return publish_snapshot(tablet.get(), snapshot_dir_path, version, txn_meta_pb.incremental_snapshot());
auto status = publish_snapshot(tablet.get(), snapshot_dir_path, version, txn_meta_pb.incremental_snapshot());
if (!status.ok()) {
LOG(WARNING) << "Failed to publish snapshot: " << snapshot_dir_path << ", tablet_id: " << tablet->tablet_id()
<< ", partition_id: " << partition_id << ", txn_id: " << transaction_id << ", status: " << status;
};
return status;
}

void ReplicationTxnManager::clear_expired_snapshots() {
Expand Down Expand Up @@ -695,7 +700,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
TabletMeta cloned_tablet_meta;
res = cloned_tablet_meta.create_from_file(header_file);
if (!res.ok()) {
LOG(WARNING) << "Failed to load load tablet meta from " << header_file;
LOG(WARNING) << "Failed to load load tablet meta from " << header_file << ", status: " << res;
break;
}

Expand All @@ -704,15 +709,15 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
if (has_dcgs_snapshot_file) {
res = DeltaColumnGroupListHelper::parse_snapshot(dcgs_snapshot_file, dcg_snapshot_pb);
if (!res.ok()) {
LOG(WARNING) << "Failed to load load dcg snapshot from " << dcgs_snapshot_file;
LOG(WARNING) << "Failed to load load dcg snapshot from " << dcgs_snapshot_file << ", status: " << res;
break;
}
}

std::set<std::string> clone_files;
res = fs::list_dirs_files(snapshot_dir, nullptr, &clone_files);
if (!res.ok()) {
LOG(WARNING) << "Failed to list directory " << snapshot_dir << ": " << res;
LOG(WARNING) << "Failed to list directory " << snapshot_dir << ", status: " << res;
break;
}

Expand All @@ -725,14 +730,14 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
std::string tablet_dir = tablet->schema_hash_path();
res = fs::list_dirs_files(tablet_dir, nullptr, &local_files);
if (!res.ok()) {
LOG(WARNING) << "Failed to list tablet directory " << tablet_dir << ": " << res;
LOG(WARNING) << "Failed to list tablet directory " << tablet_dir << ", status: " << res;
break;
}

// link files from clone dir, if file exists, skip it
for (const string& clone_file : clone_files) {
if (local_files.find(clone_file) != local_files.end()) {
VLOG(3) << "find same file when clone, skip it. "
VLOG(3) << "Find same file when clone, skip it. "
<< "tablet=" << tablet->full_name() << ", clone_file=" << clone_file;
continue;
}
Expand All @@ -741,7 +746,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
std::string to = strings::Substitute("$0/$1", tablet_dir, clone_file);
res = FileSystem::Default()->link_file(from, to);
if (!res.ok()) {
LOG(WARNING) << "Failed to link " << from << " to " << to << ": " << res;
LOG(WARNING) << "Failed to link " << from << " to " << to << ", status: " << res;
break;
}
linked_success_files.emplace_back(std::move(to));
Expand All @@ -758,8 +763,12 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
res = publish_full_meta(tablet, &cloned_tablet_meta, rs_to_clone);
}

if (!res.ok()) {
break;
}

// if full clone success, need to update cumulative layer point
if (!incremental_snapshot && res.ok()) {
if (!incremental_snapshot) {
tablet->set_cumulative_layer_point(-1);
}

Expand All @@ -777,26 +786,31 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna
// dcgs for each segment
auto& dcg_list_pb = dcg_snapshot_pb.dcg_lists(idx);
DeltaColumnGroupList dcgs;
RETURN_IF_ERROR(
DeltaColumnGroupListSerializer::deserialize_delta_column_group_list(dcg_list_pb, &dcgs));
res = DeltaColumnGroupListSerializer::deserialize_delta_column_group_list(dcg_list_pb, &dcgs);
if (!res.ok()) {
LOG(WARNING) << "Failed to deserialize_delta_column_group_list, status: " << res;
break;
}

if (dcgs.size() == 0) {
++idx;
continue;
}

RETURN_IF_ERROR(TabletMetaManager::put_delta_column_group(
data_dir, &wb, dcg_snapshot_pb.tablet_id(idx), dcg_snapshot_pb.rowset_id(idx),
dcg_snapshot_pb.segment_id(idx), dcgs));
res = TabletMetaManager::put_delta_column_group(data_dir, &wb, dcg_snapshot_pb.tablet_id(idx),
dcg_snapshot_pb.rowset_id(idx),
dcg_snapshot_pb.segment_id(idx), dcgs);
if (!res.ok()) {
LOG(WARNING) << "Failed to put_delta_column_group, status: " << res;
break;
}
++idx;
}
}
res = data_dir->get_meta()->write_batch(&wb);
if (!res.ok()) {
std::stringstream ss;
ss << "save dcgs meta failed, tablet id: " << tablet->tablet_id();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
LOG(WARNING) << "Failed to save dcgs meta, tablet id: " << tablet->tablet_id() << ", status: " << res;
break;
}
}
} while (false);
Expand Down Expand Up @@ -830,7 +844,7 @@ Status ReplicationTxnManager::publish_snapshot_for_primary(Tablet* tablet, const
ASSIGN_OR_RETURN(auto md5sum1, fs::md5sum(snapshot_dir + "/" + fname));
ASSIGN_OR_RETURN(auto md5sum2, fs::md5sum(tablet_dir + "/" + fname));
if (md5sum1 != md5sum2) {
LOG(WARNING) << "duplicated file `" << fname << "` with different md5sum";
LOG(WARNING) << "Duplicated file `" << fname << "` with different md5sum";
return Status::InternalError("duplicate file with different md5");
}
clone_files.erase(fname);
Expand All @@ -849,42 +863,44 @@ Status ReplicationTxnManager::publish_snapshot_for_primary(Tablet* tablet, const

Status status = tablet->updates()->load_snapshot(snapshot_meta, false, true);
if (!status.ok()) {
LOG(WARNING) << "Failed to load snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir;
Status clear_st;
for (const std::string& filename : tablet_files) {
clear_st = fs::delete_file(filename);
if (!clear_st.ok()) {
LOG(WARNING) << "remove tablet file: " << filename << " failed, status: " << clear_st;
LOG(WARNING) << "Failed to remove tablet file: " << filename << ", status: " << clear_st;
}
}
} else {
int64_t expired_stale_sweep_endtime = UnixSeconds() - config::tablet_rowset_stale_sweep_time_sec;
tablet->updates()->remove_expired_versions(expired_stale_sweep_endtime);
LOG(INFO) << "Loaded snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir;
}

int64_t expired_stale_sweep_endtime = UnixSeconds() - config::tablet_rowset_stale_sweep_time_sec;
tablet->updates()->remove_expired_versions(expired_stale_sweep_endtime);
LOG(INFO) << "Loaded snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir;

return status;
}

Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const TabletMeta& cloned_tablet_meta,
int64_t snapshot_version) {
LOG(INFO) << "begin to publish incremental meta. tablet=" << tablet->full_name()
<< ", snapshot_version=" << snapshot_version;
LOG(INFO) << "Begin to publish incremental meta. tablet: " << tablet->full_name()
<< ", snapshot_version: " << snapshot_version;

std::vector<Version> missed_versions;
tablet->calc_missed_versions_unlocked(snapshot_version, &missed_versions);

std::vector<Version> versions_to_delete;
std::vector<RowsetMetaSharedPtr> rowsets_to_clone;

VLOG(3) << "get missed versions again when publish incremental meta. "
VLOG(3) << "Get missed versions again when publish incremental meta. "
<< "tablet=" << tablet->full_name() << ", snapshot_version=" << snapshot_version
<< ", missed_versions_size=" << missed_versions.size();

// check missing versions exist in clone src
for (Version version : missed_versions) {
RowsetMetaSharedPtr inc_rs_meta = cloned_tablet_meta.acquire_inc_rs_meta_by_version(version);
if (inc_rs_meta == nullptr) {
LOG(WARNING) << "missed version is not found in cloned tablet meta."
LOG(WARNING) << "Missed version is not found in cloned incremental tablet meta. tablet="
<< tablet->full_name() << ", snapshot_version=" << snapshot_version
<< ", missed_version=" << version.first << "-" << version.second;
return Status::NotFound(strings::Substitute("version not found"));
}
Expand All @@ -894,21 +910,28 @@ Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const Tab

// clone_data to tablet
Status st = tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete);
LOG(INFO) << "finish to publish incremental meta. [tablet=" << tablet->full_name() << ", status=" << st << "]";
return st;
if (!st.ok()) {
LOG(WARNING) << "Failed to publish incremental meta. tablet: " << tablet->full_name()
<< ", snapshot_version: " << snapshot_version << ", status: " << st;
return st;
}

LOG(INFO) << "Finish to publish incremental meta. tablet: " << tablet->full_name()
<< ", snapshot_version: " << snapshot_version;
return Status::OK();
}

Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* cloned_tablet_meta,
std::vector<RowsetMetaSharedPtr>& rs_to_clone) {
Version cloned_max_version = cloned_tablet_meta->max_version();
LOG(INFO) << "begin to publish full meta. tablet=" << tablet->full_name()
LOG(INFO) << "Begin to publish full meta. tablet=" << tablet->full_name()
<< ", cloned_max_version=" << cloned_max_version.first << "-" << cloned_max_version.second;
std::vector<Version> versions_to_delete;
std::vector<RowsetMetaSharedPtr> rs_metas_found_in_src;
// check local versions
for (auto& rs_meta : tablet->tablet_meta()->all_rs_metas()) {
Version local_version(rs_meta->start_version(), rs_meta->end_version());
LOG(INFO) << "check local delta when publish full snapshot."
LOG(INFO) << "Check local delta when publish full snapshot."
<< "tablet=" << tablet->full_name() << ", local_version=" << local_version.first << "-"
<< local_version.second;

Expand All @@ -919,7 +942,7 @@ Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* clon
// It should not happen because if there is a hole, the following delta will not
// do compaction.
if (local_version.first <= cloned_max_version.second && local_version.second > cloned_max_version.second) {
LOG(WARNING) << "stop to publish full snapshot, version cross src latest."
LOG(WARNING) << "Stop to publish full snapshot, version cross src latest."
<< "tablet=" << tablet->full_name() << ", local_version=" << local_version.first << "-"
<< local_version.second;
return Status::InternalError("clone version conflict with local version");
Expand Down Expand Up @@ -967,19 +990,27 @@ Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* clon

// clone_data to tablet
Status st = tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete);
LOG(INFO) << "finish to full clone. tablet=" << tablet->full_name() << ", res=" << st;
if (!st.ok()) {
LOG(WARNING) << "Failed to publish full meta. tablet: " << tablet->full_name()
<< ", cloned_max_version: " << cloned_max_version.first << "-" << cloned_max_version.second
<< ", status: " << st;
} else {
LOG(INFO) << "Finish to publish full meta. tablet: " << tablet->full_name()
<< ", cloned_max_version: " << cloned_max_version.first << "-" << cloned_max_version.second;
}

// in previous step, copy all files from CLONE_DIR to tablet dir
// but some rowset is useless, so that remove them here
for (auto& rs_meta_ptr : rs_metas_found_in_src) {
RowsetSharedPtr rowset_to_remove;
if (auto s = RowsetFactory::create_rowset(cloned_tablet_meta->tablet_schema_ptr(), tablet->schema_hash_path(),
rs_meta_ptr, &rowset_to_remove);
!s.ok()) {
LOG(WARNING) << "failed to init rowset to remove: " << rs_meta_ptr->rowset_id().to_string();
LOG(WARNING) << "Failed to init rowset to remove: " << rs_meta_ptr->rowset_id().to_string();
continue;
}
if (auto ost = rowset_to_remove->remove(); !ost.ok()) {
LOG(WARNING) << "failed to remove rowset " << rs_meta_ptr->rowset_id().to_string() << ", res=" << ost;
LOG(WARNING) << "Failed to remove rowset " << rs_meta_ptr->rowset_id().to_string() << ", status: " << ost;
}
}
return st;
Expand Down Expand Up @@ -1125,7 +1156,7 @@ StatusOr<TabletSharedPtr> ReplicationTxnManager::get_tablet(TTabletId tablet_id)
std::string error_msg;
auto tablet = tablet_manager->get_tablet(tablet_id, false, &error_msg);
if (tablet == nullptr) {
LOG(WARNING) << "Cannot get tablet " << tablet_id << ", error: " << error_msg;
LOG(WARNING) << "Cannot get tablet " << tablet_id << ", status: " << error_msg;
return Status::NotFound(error_msg);
}
return tablet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,10 @@ private boolean isAllTaskFinished() {
return true;
}

if (runningTasks.size() < 10) {
LOG.info("Unfinished tasks: {}, details: {}", runningTasks.size(), runningTasks.values());
}

return false;
}

Expand Down

0 comments on commit 8a8c24d

Please sign in to comment.