From 8784f3e4df3131ac4b535375c6a9daeac0ef5228 Mon Sep 17 00:00:00 2001 From: xiangguangyxg <110401425+xiangguangyxg@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:30:34 +0800 Subject: [PATCH] [Refactor] Add more log in cross-cluster replication (#53054) Signed-off-by: xiangguangyxg (cherry picked from commit 06dc6c80f3556d2401f87de0f7ced11b161b88ef) --- be/src/storage/replication_txn_manager.cpp | 103 ++++++++++++------ .../starrocks/replication/ReplicationJob.java | 4 + 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/be/src/storage/replication_txn_manager.cpp b/be/src/storage/replication_txn_manager.cpp index 29261a396a855..caeb7811aeaf9 100644 --- a/be/src/storage/replication_txn_manager.cpp +++ b/be/src/storage/replication_txn_manager.cpp @@ -352,7 +352,12 @@ Status ReplicationTxnManager::publish_txn(TTransactionId transaction_id, TPartit std::string snapshot_dir_path = get_tablet_snapshot_dir_path(tablet->data_dir(), transaction_id, partition_id, tablet->tablet_id()); - return publish_snapshot(tablet.get(), snapshot_dir_path, version, txn_meta_pb.incremental_snapshot()); + auto status = publish_snapshot(tablet.get(), snapshot_dir_path, version, txn_meta_pb.incremental_snapshot()); + if (!status.ok()) { + LOG(WARNING) << "Failed to publish snapshot: " << snapshot_dir_path << ", tablet_id: " << tablet->tablet_id() + << ", partition_id: " << partition_id << ", txn_id: " << transaction_id << ", status: " << status; + }; + return status; } void ReplicationTxnManager::clear_expired_snapshots() { @@ -695,7 +700,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna TabletMeta cloned_tablet_meta; res = cloned_tablet_meta.create_from_file(header_file); if (!res.ok()) { - LOG(WARNING) << "Failed to load load tablet meta from " << header_file; + LOG(WARNING) << "Failed to load load tablet meta from " << header_file << ", status: " << res; break; } @@ -704,7 +709,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna if (has_dcgs_snapshot_file) { res = DeltaColumnGroupListHelper::parse_snapshot(dcgs_snapshot_file, dcg_snapshot_pb); if (!res.ok()) { - LOG(WARNING) << "Failed to load load dcg snapshot from " << dcgs_snapshot_file; + LOG(WARNING) << "Failed to load load dcg snapshot from " << dcgs_snapshot_file << ", status: " << res; break; } } @@ -712,7 +717,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna std::set clone_files; res = fs::list_dirs_files(snapshot_dir, nullptr, &clone_files); if (!res.ok()) { - LOG(WARNING) << "Failed to list directory " << snapshot_dir << ": " << res; + LOG(WARNING) << "Failed to list directory " << snapshot_dir << ", status: " << res; break; } @@ -725,14 +730,14 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna std::string tablet_dir = tablet->schema_hash_path(); res = fs::list_dirs_files(tablet_dir, nullptr, &local_files); if (!res.ok()) { - LOG(WARNING) << "Failed to list tablet directory " << tablet_dir << ": " << res; + LOG(WARNING) << "Failed to list tablet directory " << tablet_dir << ", status: " << res; break; } // link files from clone dir, if file exists, skip it for (const string& clone_file : clone_files) { if (local_files.find(clone_file) != local_files.end()) { - VLOG(3) << "find same file when clone, skip it. " + VLOG(3) << "Find same file when clone, skip it. " << "tablet=" << tablet->full_name() << ", clone_file=" << clone_file; continue; } @@ -741,7 +746,7 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna std::string to = strings::Substitute("$0/$1", tablet_dir, clone_file); res = FileSystem::Default()->link_file(from, to); if (!res.ok()) { - LOG(WARNING) << "Failed to link " << from << " to " << to << ": " << res; + LOG(WARNING) << "Failed to link " << from << " to " << to << ", status: " << res; break; } linked_success_files.emplace_back(std::move(to)); @@ -758,8 +763,12 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna res = publish_full_meta(tablet, &cloned_tablet_meta, rs_to_clone); } + if (!res.ok()) { + break; + } + // if full clone success, need to update cumulative layer point - if (!incremental_snapshot && res.ok()) { + if (!incremental_snapshot) { tablet->set_cumulative_layer_point(-1); } @@ -777,26 +786,31 @@ Status ReplicationTxnManager::publish_snapshot(Tablet* tablet, const string& sna // dcgs for each segment auto& dcg_list_pb = dcg_snapshot_pb.dcg_lists(idx); DeltaColumnGroupList dcgs; - RETURN_IF_ERROR( - DeltaColumnGroupListSerializer::deserialize_delta_column_group_list(dcg_list_pb, &dcgs)); + res = DeltaColumnGroupListSerializer::deserialize_delta_column_group_list(dcg_list_pb, &dcgs); + if (!res.ok()) { + LOG(WARNING) << "Failed to deserialize_delta_column_group_list, status: " << res; + break; + } if (dcgs.size() == 0) { ++idx; continue; } - RETURN_IF_ERROR(TabletMetaManager::put_delta_column_group( - data_dir, &wb, dcg_snapshot_pb.tablet_id(idx), dcg_snapshot_pb.rowset_id(idx), - dcg_snapshot_pb.segment_id(idx), dcgs)); + res = TabletMetaManager::put_delta_column_group(data_dir, &wb, dcg_snapshot_pb.tablet_id(idx), + dcg_snapshot_pb.rowset_id(idx), + dcg_snapshot_pb.segment_id(idx), dcgs); + if (!res.ok()) { + LOG(WARNING) << "Failed to put_delta_column_group, status: " << res; + break; + } ++idx; } } res = data_dir->get_meta()->write_batch(&wb); if (!res.ok()) { - std::stringstream ss; - ss << "save dcgs meta failed, tablet id: " << tablet->tablet_id(); - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + LOG(WARNING) << "Failed to save dcgs meta, tablet id: " << tablet->tablet_id() << ", status: " << res; + break; } } } while (false); @@ -830,7 +844,7 @@ Status ReplicationTxnManager::publish_snapshot_for_primary(Tablet* tablet, const ASSIGN_OR_RETURN(auto md5sum1, fs::md5sum(snapshot_dir + "/" + fname)); ASSIGN_OR_RETURN(auto md5sum2, fs::md5sum(tablet_dir + "/" + fname)); if (md5sum1 != md5sum2) { - LOG(WARNING) << "duplicated file `" << fname << "` with different md5sum"; + LOG(WARNING) << "Duplicated file `" << fname << "` with different md5sum"; return Status::InternalError("duplicate file with different md5"); } clone_files.erase(fname); @@ -849,26 +863,27 @@ Status ReplicationTxnManager::publish_snapshot_for_primary(Tablet* tablet, const Status status = tablet->updates()->load_snapshot(snapshot_meta, false, true); if (!status.ok()) { + LOG(WARNING) << "Failed to load snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir; Status clear_st; for (const std::string& filename : tablet_files) { clear_st = fs::delete_file(filename); if (!clear_st.ok()) { - LOG(WARNING) << "remove tablet file: " << filename << " failed, status: " << clear_st; + LOG(WARNING) << "Failed to remove tablet file: " << filename << ", status: " << clear_st; } } + } else { + int64_t expired_stale_sweep_endtime = UnixSeconds() - config::tablet_rowset_stale_sweep_time_sec; + tablet->updates()->remove_expired_versions(expired_stale_sweep_endtime); + LOG(INFO) << "Loaded snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir; } - int64_t expired_stale_sweep_endtime = UnixSeconds() - config::tablet_rowset_stale_sweep_time_sec; - tablet->updates()->remove_expired_versions(expired_stale_sweep_endtime); - LOG(INFO) << "Loaded snapshot of tablet " << tablet->tablet_id() << " from " << snapshot_dir; - return status; } Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const TabletMeta& cloned_tablet_meta, int64_t snapshot_version) { - LOG(INFO) << "begin to publish incremental meta. tablet=" << tablet->full_name() - << ", snapshot_version=" << snapshot_version; + LOG(INFO) << "Begin to publish incremental meta. tablet: " << tablet->full_name() + << ", snapshot_version: " << snapshot_version; std::vector missed_versions; tablet->calc_missed_versions_unlocked(snapshot_version, &missed_versions); @@ -876,7 +891,7 @@ Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const Tab std::vector versions_to_delete; std::vector rowsets_to_clone; - VLOG(3) << "get missed versions again when publish incremental meta. " + VLOG(3) << "Get missed versions again when publish incremental meta. " << "tablet=" << tablet->full_name() << ", snapshot_version=" << snapshot_version << ", missed_versions_size=" << missed_versions.size(); @@ -884,7 +899,8 @@ Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const Tab for (Version version : missed_versions) { RowsetMetaSharedPtr inc_rs_meta = cloned_tablet_meta.acquire_inc_rs_meta_by_version(version); if (inc_rs_meta == nullptr) { - LOG(WARNING) << "missed version is not found in cloned tablet meta." + LOG(WARNING) << "Missed version is not found in cloned incremental tablet meta. tablet=" + << tablet->full_name() << ", snapshot_version=" << snapshot_version << ", missed_version=" << version.first << "-" << version.second; return Status::NotFound(strings::Substitute("version not found")); } @@ -894,21 +910,28 @@ Status ReplicationTxnManager::publish_incremental_meta(Tablet* tablet, const Tab // clone_data to tablet Status st = tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete); - LOG(INFO) << "finish to publish incremental meta. [tablet=" << tablet->full_name() << ", status=" << st << "]"; - return st; + if (!st.ok()) { + LOG(WARNING) << "Failed to publish incremental meta. tablet: " << tablet->full_name() + << ", snapshot_version: " << snapshot_version << ", status: " << st; + return st; + } + + LOG(INFO) << "Finish to publish incremental meta. tablet: " << tablet->full_name() + << ", snapshot_version: " << snapshot_version; + return Status::OK(); } Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* cloned_tablet_meta, std::vector& rs_to_clone) { Version cloned_max_version = cloned_tablet_meta->max_version(); - LOG(INFO) << "begin to publish full meta. tablet=" << tablet->full_name() + LOG(INFO) << "Begin to publish full meta. tablet=" << tablet->full_name() << ", cloned_max_version=" << cloned_max_version.first << "-" << cloned_max_version.second; std::vector versions_to_delete; std::vector rs_metas_found_in_src; // check local versions for (auto& rs_meta : tablet->tablet_meta()->all_rs_metas()) { Version local_version(rs_meta->start_version(), rs_meta->end_version()); - LOG(INFO) << "check local delta when publish full snapshot." + LOG(INFO) << "Check local delta when publish full snapshot." << "tablet=" << tablet->full_name() << ", local_version=" << local_version.first << "-" << local_version.second; @@ -919,7 +942,7 @@ Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* clon // It should not happen because if there is a hole, the following delta will not // do compaction. if (local_version.first <= cloned_max_version.second && local_version.second > cloned_max_version.second) { - LOG(WARNING) << "stop to publish full snapshot, version cross src latest." + LOG(WARNING) << "Stop to publish full snapshot, version cross src latest." << "tablet=" << tablet->full_name() << ", local_version=" << local_version.first << "-" << local_version.second; return Status::InternalError("clone version conflict with local version"); @@ -967,7 +990,15 @@ Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* clon // clone_data to tablet Status st = tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete); - LOG(INFO) << "finish to full clone. tablet=" << tablet->full_name() << ", res=" << st; + if (!st.ok()) { + LOG(WARNING) << "Failed to publish full meta. tablet: " << tablet->full_name() + << ", cloned_max_version: " << cloned_max_version.first << "-" << cloned_max_version.second + << ", status: " << st; + } else { + LOG(INFO) << "Finish to publish full meta. tablet: " << tablet->full_name() + << ", cloned_max_version: " << cloned_max_version.first << "-" << cloned_max_version.second; + } + // in previous step, copy all files from CLONE_DIR to tablet dir // but some rowset is useless, so that remove them here for (auto& rs_meta_ptr : rs_metas_found_in_src) { @@ -975,11 +1006,11 @@ Status ReplicationTxnManager::publish_full_meta(Tablet* tablet, TabletMeta* clon if (auto s = RowsetFactory::create_rowset(cloned_tablet_meta->tablet_schema_ptr(), tablet->schema_hash_path(), rs_meta_ptr, &rowset_to_remove); !s.ok()) { - LOG(WARNING) << "failed to init rowset to remove: " << rs_meta_ptr->rowset_id().to_string(); + LOG(WARNING) << "Failed to init rowset to remove: " << rs_meta_ptr->rowset_id().to_string(); continue; } if (auto ost = rowset_to_remove->remove(); !ost.ok()) { - LOG(WARNING) << "failed to remove rowset " << rs_meta_ptr->rowset_id().to_string() << ", res=" << ost; + LOG(WARNING) << "Failed to remove rowset " << rs_meta_ptr->rowset_id().to_string() << ", status: " << ost; } } return st; @@ -1125,7 +1156,7 @@ StatusOr ReplicationTxnManager::get_tablet(TTabletId tablet_id) std::string error_msg; auto tablet = tablet_manager->get_tablet(tablet_id, false, &error_msg); if (tablet == nullptr) { - LOG(WARNING) << "Cannot get tablet " << tablet_id << ", error: " << error_msg; + LOG(WARNING) << "Cannot get tablet " << tablet_id << ", status: " << error_msg; return Status::NotFound(error_msg); } return tablet; diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java index 5055e1b7cf28e..d7e66e05ad442 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java @@ -988,6 +988,10 @@ private boolean isAllTaskFinished() { return true; } + if (runningTasks.size() < 10) { + LOG.info("Unfinished tasks: {}, details: {}", runningTasks.size(), runningTasks.values()); + } + return false; }