Skip to content

Commit

Permalink
Fix the issue of restarting DataNode to clean up InvalidDataRegion (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
133tosakarin authored Sep 18, 2024
1 parent 07f1475 commit a322ab5
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,40 +133,27 @@ public void init() {
}
}

/**
* Scan the database and schema region directories to recover schema regions and return the
* collected local schema partition info for localSchemaPartitionTable recovery.
*/
@SuppressWarnings("java:S2142")
private void initSchemaRegion() {
public static Map<String, List<SchemaRegionId>> getLocalSchemaRegionInfo() {
final File schemaDir = new File(config.getSchemaDir());
final File[] sgDirList = schemaDir.listFiles();

final Map<String, List<SchemaRegionId>> localSchemaPartitionTable = new HashMap<>();
if (sgDirList == null) {
return;
return localSchemaPartitionTable;
}

// recover SchemaRegion concurrently
final ExecutorService schemaRegionRecoverPools =
IoTDBThreadPoolFactory.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
final List<Future<ISchemaRegion>> futures = new ArrayList<>();

for (File file : sgDirList) {
if (!file.isDirectory()) {
continue;
}

final PartialPath storageGroup;
final PartialPath database;
try {
storageGroup = PartialPath.getDatabasePath(file.getName());
database = PartialPath.getDatabasePath(file.getName());
} catch (IllegalPathException illegalPathException) {
// not a legal sg dir
continue;
}

final File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
final File sgDir = new File(config.getSchemaDir(), database.getFullPath());

if (!sgDir.exists()) {
continue;
Expand All @@ -176,7 +163,7 @@ private void initSchemaRegion() {
if (schemaRegionDirs == null) {
continue;
}

List<SchemaRegionId> schemaRegionIds = new ArrayList<>();
for (final File schemaRegionDir : schemaRegionDirs) {
final SchemaRegionId schemaRegionId;
try {
Expand All @@ -185,11 +172,38 @@ private void initSchemaRegion() {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
futures.add(
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId)));
schemaRegionIds.add(schemaRegionId);
}
localSchemaPartitionTable.put(database.getFullPath(), schemaRegionIds);
}
return localSchemaPartitionTable;
}

/**
* Scan the database and schema region directories to recover schema regions and return the
* collected local schema partition info for localSchemaPartitionTable recovery.
*/
@SuppressWarnings("java:S2142")
private void initSchemaRegion() {
// recover SchemaRegion concurrently
Map<String, List<SchemaRegionId>> localSchemaRegionInfo = getLocalSchemaRegionInfo();
final ExecutorService schemaRegionRecoverPools =
IoTDBThreadPoolFactory.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
final List<Future<ISchemaRegion>> futures = new ArrayList<>();
localSchemaRegionInfo.forEach(
(k, v) -> {
for (SchemaRegionId schemaRegionId : v) {
try {
futures.add(
schemaRegionRecoverPools.submit(
recoverSchemaRegionTask(new PartialPath(k), schemaRegionId)));
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
}
});
for (final Future<ISchemaRegion> future : futures) {
try {
final ISchemaRegion schemaRegion = future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.StartupException;
Expand Down Expand Up @@ -123,6 +125,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -215,7 +218,6 @@ protected void start() {

// Pull and check system configurations from ConfigNode-leader
pullAndCheckSystemConfigurations();

if (isFirstStart) {
sendRegisterRequestToConfigNode(true);
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
Expand Down Expand Up @@ -544,49 +546,92 @@ private void sendRegisterRequestToConfigNode(boolean isPreCheck)
}

private void removeInvalidRegions(List<ConsensusGroupId> dataNodeConsensusGroupIds) {
removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
removeInvalidDataRegions(dataNodeConsensusGroupIds);
removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
}

private void removeInvalidDataRegions(List<ConsensusGroupId> dataNodeConsensusGroupIds) {
Map<String, List<DataRegionId>> localDataRegionInfo =
StorageEngine.getInstance().getLocalDataRegionInfo();
List<String> allLocalFilesFolders = TierManager.getInstance().getAllLocalFilesFolders();
localDataRegionInfo.forEach(
(database, dataRegionIds) -> {
for (DataRegionId dataRegionId : dataRegionIds) {
if (!dataNodeConsensusGroupIds.contains(dataRegionId)) {
removeDataDirRegion(database, dataRegionId, allLocalFilesFolders);
}
}
});
}

private void removeInvalidConsensusDataRegions(List<ConsensusGroupId> dataNodeConsensusGroupIds) {
List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
.filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId))
.collect(Collectors.toList());

List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
.filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId))
.collect(Collectors.toList());
removeInvalidDataRegions(invalidDataRegionConsensusGroupIds);
removeInvalidSchemaRegions(invalidSchemaRegionConsensusGroupIds);
}

private void removeInvalidDataRegions(List<ConsensusGroupId> invalidConsensusGroupId) {
logger.info("Remove invalid dataRegion directories... {}", invalidConsensusGroupId);
for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
logger.info("Remove invalid dataRegion directories... {}", invalidDataRegionConsensusGroupIds);
for (ConsensusGroupId consensusGroupId : invalidDataRegionConsensusGroupIds) {
File oldDir =
new File(
DataRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
removeRegionsDir(oldDir);
removeDir(oldDir);
}
}

private void removeInvalidSchemaRegions(List<ConsensusGroupId> invalidConsensusGroupId) {
logger.info("Remove invalid schemaRegion directories... {}", invalidConsensusGroupId);
for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
private void removeInvalidSchemaRegions(List<ConsensusGroupId> schemaConsensusGroupIds) {
Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
SchemaEngine.getLocalSchemaRegionInfo();
localSchemaRegionInfo.forEach(
(database, schemaRegionIds) -> {
for (SchemaRegionId schemaRegionId : schemaRegionIds) {
if (!schemaConsensusGroupIds.contains(schemaRegionId)) {
removeInvalidSchemaDir(database, schemaRegionId);
}
}
});
}

private void removeDataDirRegion(
String database, DataRegionId dataRegionId, List<String> fileFolders) {
fileFolders.forEach(
folder -> {
String regionDir =
folder + File.separator + database + File.separator + dataRegionId.getId();
removeDir(new File(regionDir));
});
}

private void removeInvalidConsensusSchemaRegions(
List<ConsensusGroupId> dataNodeConsensusGroupIds) {
List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
.filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId))
.collect(Collectors.toList());
logger.info(
"Remove invalid schemaRegion directories... {}", invalidSchemaRegionConsensusGroupIds);

for (ConsensusGroupId consensusGroupId : invalidSchemaRegionConsensusGroupIds) {
File oldDir =
new File(
SchemaRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
removeRegionsDir(oldDir);
removeDir(oldDir);
}
}

private void removeRegionsDir(File regionDir) {
private void removeInvalidSchemaDir(String database, SchemaRegionId schemaRegionId) {
String systemSchemaDir =
config.getSystemDir() + File.separator + database + File.separator + schemaRegionId.getId();
removeDir(new File(systemSchemaDir));
}

private void removeDir(File regionDir) {
if (regionDir.exists()) {
try {
FileUtils.recursivelyDeleteFolder(regionDir.getPath());
logger.info("delete {} succeed.", regionDir.getAbsolutePath());
} catch (IOException e) {
logger.error("delete {} failed.", regionDir.getAbsolutePath());
}
FileUtils.deleteDirectoryAndEmptyParent(regionDir);
logger.info("delete {} succeed.", regionDir.getAbsolutePath());
} else {
logger.info("delete {} failed, because it does not exist.", regionDir.getAbsolutePath());
}
Expand Down Expand Up @@ -645,12 +690,10 @@ private void sendRestartRequestToConfigNode() throws StartupException {
(endTime - startTime));

List<TConsensusGroupId> consensusGroupIds = dataNodeRestartResp.getConsensusGroupIds();
List<ConsensusGroupId> dataNodeConsensusGroupIds =
removeInvalidRegions(
consensusGroupIds.stream()
.map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
.collect(Collectors.toList());

removeInvalidRegions(dataNodeConsensusGroupIds);
.collect(Collectors.toList()));
} else {
/* Throw exception when restart is rejected */
throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
Expand Down

0 comments on commit a322ab5

Please sign in to comment.