Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test/wip: TestExportImports remove sleeps #92

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -284,6 +285,8 @@ public static enum DOWNSAMPLE_METHOD {

private final HashMap<Class<?>, N5MetadataWriter<?>> metadataWriters;

private ThreadPoolExecutor threadPool;

// consider something like this eventually
// private BiFunction<RandomAccessibleInterval<? extends
// NumericType<?>>,long[],RandomAccessibleInterval<?>> downsampler;
Expand Down Expand Up @@ -636,7 +639,6 @@ protected void initializeDataset() {

protected boolean validateDataset() {

System.out.println("validateDataset");
if (dataset.isEmpty()) {
cancel("Please provide a name for the dataset");
return false;
Expand Down Expand Up @@ -1189,6 +1191,11 @@ protected boolean promptOverwriteAndDelete(final N5Writer n5, final String datas
return true;
}

public ExecutorService getExecutorService() {

return threadPool;
}

@SuppressWarnings({"rawtypes", "unchecked"})
private <T extends RealType & NativeType, M extends N5Metadata> boolean write(
final RandomAccessibleInterval<T> image,
Expand All @@ -1201,14 +1208,16 @@ private <T extends RealType & NativeType, M extends N5Metadata> boolean write(

// Here, either allowing overwrite, or not allowing, but the dataset does not exist.
// use threadPool even for single threaded execution for progress monitoring
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
progressMonitor(threadPool);

N5Utils.save(image,
n5, dataset, chunkSize, compression,
Executors.newFixedThreadPool(nThreads));

threadPool);
threadPool.shutdown();
writeMetadata(metadata, n5, dataset);

return true;
}

Expand Down
144 changes: 105 additions & 39 deletions src/test/java/org/janelia/saalfeldlab/n5/TestExportImports.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.janelia.saalfeldlab.n5.hdf5.N5HDF5Reader;
Expand Down Expand Up @@ -98,7 +99,6 @@ public void testEmptyMeta() throws InterruptedException
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void test4dN5v()
{
final int nChannels = 3;
Expand All @@ -120,7 +120,19 @@ public void test4dN5v()
for( int i = 0; i < nChannels; i++)
{
final String n5PathAndDataset = String.format("%s/%s/c%d/s0", n5RootPath, dataset, i);
final List< ImagePlus > impList = reader.process( n5PathAndDataset, false );

final Optional<List<ImagePlus>> impListOpt = TestRunners.tryWaitRepeat(() -> {
return reader.process(n5PathAndDataset, false);
});

List<ImagePlus> impList;
if (impListOpt.isPresent()) {
impList = impListOpt.get();
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5RootPath, dataset));
return;
}

Assert.assertEquals("n5v load channel", 1, impList.size());
Assert.assertTrue("n5v channel equals", equalChannel(imp, i, impList.get(0)));
}
Expand All @@ -135,7 +147,6 @@ public void test4dN5v()
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testReadWriteParse() throws InterruptedException
{
final HashMap<String,String> typeToExtension = new HashMap<>();
Expand Down Expand Up @@ -166,7 +177,6 @@ public void testReadWriteParse() throws InterruptedException
final String dataset = datasetBase;

singleReadWriteParseTest( imp, n5RootPath, dataset, blockSizeString, metatype, compressionString, true );
Thread.sleep(25);
}
}
}
Expand Down Expand Up @@ -279,6 +289,24 @@ public static void singleReadWriteParseTest(
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionType);
writer.run(); // run() closes the n5 writer

// wait
writer.getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS);

readParseTest( imp, outputPath, dataset, blockSizeString, metadataType, compressionType, testMeta, testData, 5);
deleteContainer(outputPath);
}

private static void readParseTest(
final ImagePlus imp,
final String outputPath,
final String dataset,
final String blockSizeString,
final String metadataType,
final String compressionType,
final boolean testMeta,
final boolean testData,
final int nTries) throws InterruptedException {

final String readerDataset;
if (metadataType.equals(N5Importer.MetadataN5ViewerKey) || (metadataType.equals(N5Importer.MetadataN5CosemKey) && imp.getNChannels() > 1))
readerDataset = dataset + "/c0/s0";
Expand All @@ -288,18 +316,20 @@ else if (metadataType.equals(N5Importer.MetadataOmeZarrKey) || metadataType.equa
readerDataset = dataset;

final String n5PathAndDataset = outputPath + readerDataset;

final File n5RootWritten = new File(outputPath);
assertTrue("root does not exist: " + outputPath, n5RootWritten.exists());
if (outputPath.endsWith(".h5"))
assertTrue("hdf5 file exists", n5RootWritten.exists());
else
assertTrue("n5 or zarr root is not a directory:" + outputPath, n5RootWritten.isDirectory());

Thread.sleep(25);
// consider testing this files existence before trying to read?
final N5Importer reader = new N5Importer();
reader.setShow( false );
final List< ImagePlus > impList = reader.process( n5PathAndDataset, false );
reader.setShow(false);
final Optional<List< ImagePlus >> impListOpt = TestRunners.tryWaitRepeat( () -> {
return reader.process(n5PathAndDataset, false);
});

List<ImagePlus> impList;
if (impListOpt.isPresent()) {
impList = impListOpt.get();
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", outputPath, dataset));
return;
}

assertNotNull(String.format( "Failed to open image: %s %s ", outputPath, dataset ), impList);
assertEquals( String.format( "%s %s one image opened ", outputPath, dataset ), 1, impList.size() );
Expand Down Expand Up @@ -329,14 +359,14 @@ else if (metadataType.equals(N5Importer.MetadataOmeZarrKey) || metadataType.equa
assertTrue( String.format( "%s data ", dataset ), imagesEqual );
}

impRead.close();
deleteContainer(outputPath);
}

@Test
public void testRgb() throws InterruptedException
{
final ImagePlus imp = NewImage.createRGBImage("test", 8, 6, 4, NewImage.FILL_NOISE);
imp.setDimensions(1, 4, 1);

final String metaType = N5Importer.MetadataImageJKey;

final String n5RootPath = baseDir + "/test_rgb.n5";
Expand All @@ -353,7 +383,6 @@ public void testRgb() throws InterruptedException
*
*/
@Test
@Ignore // TODO intermittent failures on GH actions
public void testMultiChannel()
{
for( final String suffix : new String[] { ".h5", ".n5", ".zarr" })
Expand All @@ -370,8 +399,7 @@ public void testMultiChannel()
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testOverwrite() {
public void testOverwrite() throws InterruptedException {

final String n5Root = baseDir + "/overwriteTest.n5";
final String dataset = "dataset";
Expand All @@ -391,33 +419,72 @@ public void testOverwrite() {
writer.setOverwrite(true);
writer.run();

final N5Writer n5 = new N5FSWriter(n5Root);
assertTrue(n5.datasetExists(dataset));
try (final N5Writer n5 = new N5FSWriter(n5Root)) {

assertArrayEquals("size orig", szBig, n5.getDatasetAttributes(dataset).getDimensions());
Optional<DatasetAttributes> dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

final N5ScalePyramidExporter writerNoOverride = new N5ScalePyramidExporter();
writerNoOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerNoOverride.setOverwrite(false);
writerNoOverride.run();
DatasetAttributes dsetAttrs;
if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size orig", szBig, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}
dsetAttrsOpt = Optional.empty();

assertArrayEquals("size after no overwrite", szBig, n5.getDatasetAttributes(dataset).getDimensions());
final N5ScalePyramidExporter writerNoOverride = new N5ScalePyramidExporter();
writerNoOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerNoOverride.setOverwrite(false);
writerNoOverride.run();

dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size after no overwrite", szBig, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}
dsetAttrsOpt = Optional.empty();

final N5ScalePyramidExporter writerOverride = new N5ScalePyramidExporter();
writerOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerOverride.setOverwrite(true);
writerOverride.run();
final N5ScalePyramidExporter writerOverride = new N5ScalePyramidExporter();
writerOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerOverride.setOverwrite(true);
writerOverride.run();

dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size after overwrite", szSmall, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}

assertArrayEquals("size after overwrite", szSmall, n5.getDatasetAttributes(dataset).getDimensions());
n5.remove();
n5.close();
}

n5.remove();
n5.close();
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testFormatOptions() {

final String n5Root = baseDir + "/root_of_some_container";
Expand Down Expand Up @@ -611,7 +678,6 @@ public void testMultiChannelHelper( final String metatype, final String suffix )
final String n5RootPath = baseDir + "/test_" + metatype + "_" + dimCode + suffix;
final String dataset = String.format("/%s", dimCode);
singleReadWriteParseTest( imp, n5RootPath, dataset, blockSizeString, metatype, compressionString, true, nc == 1 );
Thread.sleep(25);
}
}
}
Expand Down
69 changes: 69 additions & 0 deletions src/test/java/org/janelia/saalfeldlab/n5/TestRunners.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.janelia.saalfeldlab.n5;

import java.util.Optional;
import java.util.function.Supplier;

public class TestRunners {

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier) throws InterruptedException {

return tryWaitRepeat(supplier, 5, 50, 2);
}

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier, int nTries) throws InterruptedException {

return tryWaitRepeat(supplier, nTries, 50, 2);
}

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier, int nTries, long waitTimeMillis) throws InterruptedException {

return tryWaitRepeat(supplier, nTries, waitTimeMillis, 2);
}

/**
* Attempts to execute a provided {@link Supplier} multiple times, with an increasing wait period
* between each attempt. If the supplier returns a non-null result, it is wrapped in an
* {@code Optional} and returned. If all attempts fail or return null, an empty {@link Optional} is returned.
*
* <p>The wait time between attempts increases after each failure, multiplied by a specified factor.
*
* @param <T> the type of result provided by the supplier
* @param supplier the {@link Supplier} function that provides the result to be evaluated. The
* function may throw a {@link RuntimeException} if it fails, which will be caught and retried.
* @param nTries the maximum number of attempts to invoke the supplier
* @param initialWaitTimeMillis the initial wait time in milliseconds before retrying after the first failure
* @param waitTimeMultiplier the multiplier to apply to the wait time after each failure, increasing
* the wait time for subsequent retries
* @return an {@link Optional} containing the result from the supplier if a non-null result is returned
* before the maximum number of tries, or an empty {@code Optional} if all attempts fail or
* return null
* @throws InterruptedException thrown if interrupted while waiting
*/
public static <T> Optional<T> tryWaitRepeat(
final Supplier<T> supplier,
final int nTries,
final long initialWaitTimeMillis,
final int waitTimeMultiplier) throws InterruptedException {

int i = 0;
long waitTime = initialWaitTimeMillis;
while (i < nTries) {

if (i == nTries)
break;

try {
T result = supplier.get();
if (result != null)
return Optional.of(result);
} catch (RuntimeException e) {}

Thread.sleep(waitTime);
waitTime *= waitTimeMultiplier;
i++;
}

return Optional.empty();
}

}
Loading
Loading