Skip to content

Commit

Permalink
Merge pull request ClickHouse#45650 from ClickHouse/backport/22.8/382…
Browse files Browse the repository at this point in the history
…62-fix-system-unfreeze-for-ordinary-database

Merge pull request ClickHouse#38262 from PolyProgrammist/fix-ordinary-system-un…
  • Loading branch information
alesapin authored Jan 27, 2023
2 parents ce99631 + aa9f39a commit e481794
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 88 deletions.
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ BlockIO InterpreterSystemQuery::execute()
{
getContext()->checkAccess(AccessType::SYSTEM_UNFREEZE);
/// The result contains information about deleted parts as a table. It is for compatibility with ALTER TABLE UNFREEZE query.
result = Unfreezer().unfreeze(query.backup_name, getContext());
result = Unfreezer(getContext()).systemUnfreeze(query.backup_name);
break;
}
default:
Expand Down
4 changes: 4 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
}
else if (type == Type::UNFREEZE)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(backup_name);
}
}


Expand Down
123 changes: 85 additions & 38 deletions src/Storages/Freeze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,29 @@
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>

/**
* When ClickHouse has frozen data on remote storage it required 'smart' data removing during UNFREEZE.
* For remote storage actually frozen not remote data but local metadata with referrers on remote data.
* So remote data can be referred from working and frozen data sets (or two frozen) at same time.
* In this case during UNFREEZE ClickHouse should remove only local metadata and keep remote data.
* But when data was already removed from working data set ClickHouse should remove remote data too.
* To detect is current data used or not in some other place ClickHouse uses
* - ref_count from metadata to check if data used in some other metadata on the same replica;
* - Keeper record to check if data used on other replica.
* StorageReplicatedMergeTree::removeSharedDetachedPart makes required checks, so here this method
* called for each frozen part.
*/

namespace DB
{

namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}

void FreezeMetaData::fill(const StorageReplicatedMergeTree & storage)
{
is_replicated = storage.supportsReplication();
is_remote = storage.isRemote();
replica_name = storage.getReplicaName();
zookeeper_name = storage.getZooKeeperName();
table_shared_id = storage.getTableSharedID();
Expand All @@ -26,11 +43,17 @@ void FreezeMetaData::save(DiskPtr data_disk, const String & path) const

writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated = true;
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
bool is_remote = true;
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
}
writeString(escapeForFileName(replica_name), buffer);
buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
Expand All @@ -51,17 +74,25 @@ bool FreezeMetaData::load(DiskPtr data_disk, const String & path)
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
if (version < 1 || version > 2)
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown frozen metadata version: {}", version);
return false;
}
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated;
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
bool is_remote;
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
}
std::string unescaped_replica_name;
readString(unescaped_replica_name, buffer);
replica_name = unescapeForFileName(unescaped_replica_name);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
Expand All @@ -87,43 +118,62 @@ String FreezeMetaData::getFileName(const String & path)
return fs::path(path) / "frozen_metadata.txt";
}

BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context)
Unfreezer::Unfreezer(ContextPtr context) : local_context(context)
{
if (local_context->hasZooKeeper())
zookeeper = local_context->getZooKeeper();
}

BlockIO Unfreezer::systemUnfreeze(const String & backup_name)
{
LOG_DEBUG(log, "Unfreezing backup {}", backup_name);
LOG_DEBUG(log, "Unfreezing backup {}", escapeForFileName(backup_name));

const auto & config = local_context->getConfigRef();
static constexpr auto config_key = "enable_system_unfreeze";
if (!config.getBool(config_key, false))
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Support for SYSTEM UNFREEZE query is disabled. You can enable it via '{}' server setting", config_key);
}

auto disks_map = local_context->getDisksMap();
Disks disks;
for (auto & [name, disk]: disks_map)
{
disks.push_back(disk);
}
auto backup_path = fs::path(backup_directory_prefix) / escapeForFileName(backup_name);
auto store_path = backup_path / "store";
auto store_paths = {backup_path / "store", backup_path / "data"};

PartitionCommandsResultInfo result_info;

for (const auto & disk: disks)
{
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
for (const auto& store_path: store_paths)
{
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
{
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory([] (const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
{
command_result.command_type = "SYSTEM UNFREEZE";
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory(
[](const String &) { return true; }, backup_name, {disk}, table_directory);
for (auto & command_result : current_result_info)
{
command_result.command_type = "SYSTEM UNFREEZE";
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
}
if (disk->exists(backup_path))
{
/// After unfreezing we need to clear revision.txt file and empty directories
disk->removeRecursive(backup_path);
}
}
Expand All @@ -136,18 +186,15 @@ BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context
return result;
}

bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context)
bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper)
{
if (disk->supportZeroCopyReplication())
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
if (meta.is_replicated)
{
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context, zookeeper);
}
}

Expand All @@ -156,7 +203,7 @@ bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const Strin
return false;
}

PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context)
PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory)
{
PartitionCommandsResultInfo result;

Expand All @@ -180,7 +227,7 @@ PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(Merg

const auto & path = it->path();

bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context);
bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context, zookeeper);

result.push_back(PartitionCommandResultInfo{
.partition_id = partition_id,
Expand Down
13 changes: 7 additions & 6 deletions src/Storages/Freeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ struct FreezeMetaData
static String getFileName(const String & path);

public:
int version = 1;
bool is_replicated{false};
bool is_remote{false};
int version = 2;
String replica_name;
String zookeeper_name;
String table_shared_id;
Expand All @@ -34,12 +32,15 @@ struct FreezeMetaData
class Unfreezer
{
public:
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context);
BlockIO unfreeze(const String & backup_name, ContextPtr local_context);
Unfreezer(ContextPtr context);
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory);
BlockIO systemUnfreeze(const String & backup_name);
private:
ContextPtr local_context;
zkutil::ZooKeeperPtr zookeeper;
Poco::Logger * log = &Poco::Logger::get("Unfreezer");
static constexpr std::string_view backup_directory_prefix = "shadow";
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context);
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper);
};

}
8 changes: 4 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1941,7 +1941,7 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
old_name.clear();
}
Expand Down Expand Up @@ -4629,7 +4629,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr

for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
Expand Down Expand Up @@ -6285,7 +6285,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}

bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &)
{
disk->removeRecursive(path);

Expand All @@ -6300,7 +6300,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn

auto disks = getStoragePolicy()->getDisks();

return Unfreezer().unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path, local_context);
return Unfreezer(local_context).unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path);
}

bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);

virtual String getTableSharedID() const { return ""; }

Expand Down
22 changes: 4 additions & 18 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8291,25 +8291,12 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
}
}

bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed)
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
{
if (is_freezed)
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
FreezeMetaData::clean(disk, path);
return removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", getContext());
}
}
else
{
String table_id = getTableSharedID();

return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext(), current_zookeeper);
}

disk->removeRecursive(path);
Expand All @@ -8319,11 +8306,10 @@ bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String &


bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context)
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper)
{
bool keep_shared = false;

zkutil::ZooKeeperPtr zookeeper = local_context->getZooKeeper();
NameSet files_not_to_remove;

fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
void checkBrokenDisks();

static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper);

private:
std::atomic_bool are_restoring_replica {false};
Expand Down Expand Up @@ -824,7 +824,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
Expand Down
4 changes: 4 additions & 0 deletions tests/config/config.d/system_unfreeze.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>
1 change: 1 addition & 0 deletions tests/config/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ln -sf $SRC_PATH/config.d/named_collection.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/ssl_certs.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/filesystem_cache_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/session_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/system_unfreeze.xml $DEST_SERVER_PATH/config.d/

ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
Expand Down
17 changes: 1 addition & 16 deletions tests/integration/test_merge_tree_s3/configs/config.xml
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>

<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>

<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>
Loading

0 comments on commit e481794

Please sign in to comment.