Skip to content

Commit

Permalink
[FLINK-21880][tests] Ignore incomplete checkpoints in UnalignedCheckp…
Browse files Browse the repository at this point in the history
…ointRescaleITCase
  • Loading branch information
rkhachatryan authored and AHeise committed Apr 7, 2021
1 parent 9cb1610 commit 2b1cf60
Showing 1 changed file with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.LogLevelRule;
import org.apache.flink.util.TestLogger;

Expand All @@ -86,6 +87,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -98,6 +100,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -163,13 +167,11 @@ protected File execute(UnalignedSettings settings) throws Exception {
.get()
.toJobExecutionResult(getClass().getClassLoader()));
} catch (Exception e) {
if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) {
throw e;
}
if (settings.generateCheckpoint) {
return Files.find(
checkpointDir.toPath(),
2,
(file, attr) ->
attr.isDirectory()
&& file.getFileName().toString().startsWith("chk"))
return Files.find(checkpointDir.toPath(), 2, this::isCompletedCheckpoint)
.min(Comparator.comparing(Path::toString))
.map(Path::toFile)
.orElseThrow(
Expand All @@ -185,6 +187,27 @@ protected File execute(UnalignedSettings settings) throws Exception {
return null;
}

private boolean isCompletedCheckpoint(Path path, BasicFileAttributes attr) {
return attr.isDirectory()
&& path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX)
&& hasMetadata(path);
}

private boolean hasMetadata(Path file) {
try {
return Files.find(
file.toAbsolutePath(),
1,
(path, attrs) ->
path.getFileName().toString().equals(METADATA_FILE_NAME))
.findAny()
.isPresent();
} catch (IOException e) {
ExceptionUtils.rethrow(e);
return false; // should never happen
}
}

private StreamGraph getStreamGraph(UnalignedSettings settings, Configuration conf) {
// a dummy environment used to retrieve the DAG, mini cluster will be used later
final StreamExecutionEnvironment setupEnv =
Expand Down Expand Up @@ -849,7 +872,7 @@ public void checkFail(FilterFunction<FailingMapperState> failFunction, String de
}

private void failMapper(String description) throws Exception {
throw new Exception(
throw new TestException(
"Failing "
+ description
+ " @ "
Expand Down Expand Up @@ -1106,4 +1129,10 @@ protected static long checkHeader(long value) {
}
return value;
}

private static class TestException extends Exception {
public TestException(String s) {
super(s);
}
}
}

0 comments on commit 2b1cf60

Please sign in to comment.