From eae2c8f3e66d30f18fdd12273fc3fab3946d1253 Mon Sep 17 00:00:00 2001 From: Noah Overcash Date: Thu, 28 Sep 2023 15:49:21 -0400 Subject: [PATCH] Add file splitting code and tests --- .gitignore | 2 + .../processing/split/AsyncInputStream.java | 144 +++++++ .../processing/split/FileSplitService.java | 136 +++++++ .../processing/split/FileSplitUtilities.java | 100 +++++ .../processing/split/FileSplitWriter.java | 282 +++++++++++++ .../split/FileSplitWriterOptions.java | 50 +++ .../split/AsyncInputStreamTest.java | 355 ++++++++++++++++ .../split/FileSplitServiceTest.java | 181 +++++++++ .../split/FileSplitUtilitiesChunkKeyTest.java | 52 +++ .../split/FileSplitUtilitiesCountTest.java | 79 ++++ .../FileSplitUtilitiesIsBinaryMarcTest.java | 55 +++ .../split/FileSplitUtilitiesTempDirTest.java | 26 ++ .../split/FileSplitWriterDeleteLocalTest.java | 83 ++++ .../split/FileSplitWriterExceptionalTest.java | 129 ++++++ .../split/FileSplitWriterRegularTest.java | 380 ++++++++++++++++++ .../FileSplitWriterS3ExceptionalTest.java | 117 ++++++ .../split/FileSplitWriterS3Test.java | 163 ++++++++ .../FileSplitWriterUnusedMethodTest.java | 43 ++ 18 files changed, 2377 insertions(+) create mode 100644 src/main/java/org/folio/service/processing/split/AsyncInputStream.java create mode 100644 src/main/java/org/folio/service/processing/split/FileSplitService.java create mode 100644 src/main/java/org/folio/service/processing/split/FileSplitUtilities.java create mode 100644 src/main/java/org/folio/service/processing/split/FileSplitWriter.java create mode 100644 src/main/java/org/folio/service/processing/split/FileSplitWriterOptions.java create mode 100644 src/test/java/org/folio/service/processing/split/AsyncInputStreamTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitServiceTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitUtilitiesChunkKeyTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitUtilitiesCountTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitUtilitiesIsBinaryMarcTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitUtilitiesTempDirTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterDeleteLocalTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterExceptionalTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterRegularTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterS3ExceptionalTest.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterS3Test.java create mode 100644 src/test/java/org/folio/service/processing/split/FileSplitWriterUnusedMethodTest.java diff --git a/.gitignore b/.gitignore index 3c36c290..56917d33 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.DS_Store +.vscode .idea/ *.iml node_modules/ diff --git a/src/main/java/org/folio/service/processing/split/AsyncInputStream.java b/src/main/java/org/folio/service/processing/split/AsyncInputStream.java new file mode 100644 index 00000000..4bdd38bc --- /dev/null +++ b/src/main/java/org/folio/service/processing/split/AsyncInputStream.java @@ -0,0 +1,144 @@ +package org.folio.service.processing.split; + +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import javax.annotation.CheckForNull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +@Accessors(chain = true, fluent = true) +public class AsyncInputStream implements ReadStream { + + private static final Logger LOGGER = LogManager.getLogger(); + + public static final int READ_BUFFER_SIZE = 8192; + + private final ReadableByteChannel channel; + private final Context context; + + @Getter + private boolean active = false; + + @Getter + private boolean closed = false; + + @Setter + @CheckForNull + private Handler handler; + + @Setter + @CheckForNull + private Handler endHandler; + + @Setter + @CheckForNull + private Handler exceptionHandler; + + /** + * Create a new AsyncInputStream to wrap a regular {@link InputStream} + */ + public AsyncInputStream(Context context, InputStream in) { + this.context = context; + this.channel = Channels.newChannel(in); + } + + @Override + public ReadStream pause() { + active = false; + + return this; + } + + @Override + public ReadStream resume() { + read(); + + return this; + } + + public void read() { + fetch(1L); + } + + /** + * Fetch the specified amount of elements. If the ReadStream has been paused, reading will + * recommence. + * + * Note: the {@code amount} parameter is currently ignored. + * + * @param amount has no effect; retained for compatibility with {@link ReadStream#fetch(long)} + */ + @Override + public ReadStream fetch(long amount) { + if (!closed) { + active = true; + doRead(); + } + + return this; + } + + private void doRead() { + context.runOnContext((Void v) -> { + int bytesRead; + ByteBuffer byteBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); + + try { + bytesRead = channel.read(byteBuffer); + + if (bytesRead > 0) { + byteBuffer.flip(); + Buffer buffer = Buffer.buffer(bytesRead); + buffer.setBytes(0, byteBuffer); + + if (this.handler != null) { + handler.handle(buffer); + } + + doRead(); + } else { + close(); + } + } catch (IOException e) { + LOGGER.error("Unable to read from channel:", e); + close(); + reportException(e); + return; + } + }); + } + + public void close() { + if (!closed) { + closed = true; + active = false; + + if (this.endHandler != null) { + context.runOnContext(vv -> this.endHandler.handle(null)); + } + + try { + channel.close(); + } catch (IOException e) { + reportException(e); + } + } + } + + private void reportException(Exception e) { + LOGGER.error("Received exception:", e); + if (this.exceptionHandler != null) { + context.runOnContext(vv -> this.exceptionHandler.handle(e)); + } + } +} diff --git a/src/main/java/org/folio/service/processing/split/FileSplitService.java b/src/main/java/org/folio/service/processing/split/FileSplitService.java new file mode 100644 index 00000000..4adbe508 --- /dev/null +++ b/src/main/java/org/folio/service/processing/split/FileSplitService.java @@ -0,0 +1,136 @@ +package org.folio.service.processing.split; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.service.s3storage.MinioStorageService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Service +public class FileSplitService { + + private static final Logger LOGGER = LogManager.getLogger(); + + private Vertx vertx; + + private MinioStorageService minioStorageService; + + private int maxRecordsPerChunk; + + @Autowired + public FileSplitService( + Vertx vertx, + MinioStorageService minioStorageService, + @Value("${RECORDS_PER_SPLIT_FILE:1000}") int maxRecordsPerChunk + ) { + this.vertx = vertx; + this.minioStorageService = minioStorageService; + this.maxRecordsPerChunk = maxRecordsPerChunk; + } + + /** + * Read a file from S3 and split it into parts. + * + * @return a {@link Promise} that wraps a list of string keys and will resolve + * once every split chunk has been uploaded to MinIO/S3. + * @throws IOException if the file cannot be read or if temporary files cannot + * be created + */ + public Future> splitFileFromS3(Context context, String key) { + return minioStorageService + .readFile(key) + .compose((InputStream stream) -> { + // this stream will be closed as part of splitStream + try { + return splitStream(context, stream, key) + .compose((List result) -> { + LOGGER.info("Split from S3 completed...deleting original file"); + return minioStorageService.remove(key).map(v -> result); + }); + } catch (IOException e) { + LOGGER.error("Unable to split file", e); + throw new UncheckedIOException(e); + } + }); + } + + /** + * Take a file, as an {@link InputStream}, split it into parts, and close it after. + * + * @return a {@link Future} which will resolve with a list of strings once every + * split chunk has been uploaded to MinIO/S3. + * @throws IOException if the stream cannot be read or if temporary files cannot + * be created + */ + public Future> splitStream( + Context context, + InputStream stream, + String key + ) throws IOException { + Promise promise = Promise.promise(); + + Path tempDir = FileSplitUtilities.createTemporaryDir(key); + + LOGGER.info( + "Streaming stream with key={} to writer, temporary folder={}...", + key, + tempDir + ); + + FileSplitWriter writer = new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(context) + .minioStorageService(minioStorageService) + .chunkUploadingCompositeFuturePromise(promise) + .outputKey(key) + .chunkFolder(tempDir.toString()) + .maxRecordsPerChunk(maxRecordsPerChunk) + .uploadFilesToS3(true) + .deleteLocalFiles(true) + .build() + ); + + AsyncInputStream asyncStream = new AsyncInputStream(context, stream); + asyncStream + .pipeTo(writer) + .onComplete(ar1 -> LOGGER.info("File split for key={} completed", key)); + + return promise + // original future resolves once the chunks are split, but NOT uploaded + .future() + // this composite future resolves once all are uploaded + .compose(cf -> cf) + // now let's turn this back into a List + .map(cf -> cf.list()) + .map(list -> + list.stream().map(String.class::cast).collect(Collectors.toList()) + ) + // and since we're all done, we can delete the temporary folder + .compose((List innerResult) -> { + LOGGER.info("Deleting temporary folder={}", tempDir); + + return vertx + .fileSystem() + .deleteRecursive(tempDir.toString(), true) + .map(v -> innerResult); + }) + .onSuccess(result -> + LOGGER.info("All done splitting! Got chunks {}", result) + ) + .onFailure(err -> LOGGER.error("Unable to split file: ", err)) + .onComplete(v -> asyncStream.close()); + } +} diff --git a/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java b/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java new file mode 100644 index 00000000..b0f4a1dc --- /dev/null +++ b/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java @@ -0,0 +1,100 @@ +package org.folio.service.processing.split; + +import static org.folio.service.processing.reader.MarcJsonReader.JSON_EXTENSION; +import static org.folio.service.processing.reader.MarcXmlReader.XML_EXTENSION; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermissions; +import lombok.experimental.UtilityClass; +import org.apache.commons.io.FilenameUtils; +import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.folio.service.processing.ParallelFileChunkingProcessor; + +@UtilityClass +public class FileSplitUtilities { + + public static final byte MARC_RECORD_TERMINATOR = (byte) 0x1d; + + /** + * Creates the S3 key for a split chunk within a larger file + */ + public static String buildChunkKey(String baseKey, int partNumber) { + String[] keyNameParts = baseKey.split("\\."); + + if (keyNameParts.length > 1) { + String partUpdate = String.format( + "%s_%s", + keyNameParts[keyNameParts.length - 2], + partNumber + ); + keyNameParts[keyNameParts.length - 2] = partUpdate; + return String.join(".", keyNameParts); + } + + return String.format("%s_%s", baseKey, partNumber); + } + + /** + * Counts records in a given {@link InputStream}, closing it afterwards. + * + * @throws IOException if the stream cannot be read or a temp file cannot be created + */ + public static int countRecordsInFile( + String filename, + InputStream inStream, + JobProfileInfo profile + ) throws IOException { + File tempFile = Files + .createTempFile( + "di-tmp-", + // later stage requires correct file extension + Path.of(filename).getFileName().toString(), + PosixFilePermissions.asFileAttribute( + PosixFilePermissions.fromString("rwx------") + ) + ) + .toFile(); + + try ( + InputStream autoCloseMe = inStream; + OutputStream fileOutputStream = new FileOutputStream(tempFile) + ) { + inStream.transferTo(fileOutputStream); + fileOutputStream.flush(); + + return ParallelFileChunkingProcessor.countTotalRecordsInFile( + tempFile, + profile + ); + } finally { + Files.deleteIfExists(tempFile.toPath()); + } + } + + public static Path createTemporaryDir(String key) throws IOException { + return Files.createTempDirectory( + String.format("di-split-%s", key.replace('/', '-')), + PosixFilePermissions.asFileAttribute( + PosixFilePermissions.fromString("rwx------") + ) + ); + } + + public boolean isMarcBinary(String path, JobProfileInfo profile) { + if (profile.getDataType() != JobProfileInfo.DataType.MARC) { + return false; + } + + String extension = FilenameUtils.getExtension(path); + + return ( + !JSON_EXTENSION.equals(extension) && !XML_EXTENSION.equals(extension) + ); + } +} diff --git a/src/main/java/org/folio/service/processing/split/FileSplitWriter.java b/src/main/java/org/folio/service/processing/split/FileSplitWriter.java new file mode 100644 index 00000000..e79970d6 --- /dev/null +++ b/src/main/java/org/folio/service/processing/split/FileSplitWriter.java @@ -0,0 +1,282 @@ +package org.folio.service.processing.split; + +import static java.nio.file.StandardOpenOption.CREATE; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.WriteStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.service.s3storage.MinioStorageService; + +public class FileSplitWriter implements WriteStream { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final Context vertxContext; + private final MinioStorageService minioStorageService; + private final Promise chunkUploadingCompositeFuturePromise; + private final List> chunkProcessingFutures; + private Handler exceptionHandler; + + private String chunkFolder; + private String outputKey; + private boolean uploadFilesToS3; + private boolean deleteLocalFiles; + + private final byte recordTerminator; + + private int maxRecordsPerChunk; + + private ByteArrayOutputStream currentChunkStream; + private String currentChunkPath; + private String currentChunkKey; + + private int chunkIndex = 1; + private int recordCount = 0; + + // used to hint the buffer size for the next chunks + private int lastChunkSize; + + public FileSplitWriter(FileSplitWriterOptions options) { + this.vertxContext = options.getVertxContext(); + this.minioStorageService = options.getMinioStorageService(); + this.chunkUploadingCompositeFuturePromise = + options.getChunkUploadingCompositeFuturePromise(); + + this.outputKey = options.getOutputKey(); + this.chunkFolder = options.getChunkFolder(); + + this.maxRecordsPerChunk = options.getMaxRecordsPerChunk(); + this.uploadFilesToS3 = options.isUploadFilesToS3(); + this.deleteLocalFiles = options.isDeleteLocalFiles(); + + // Per https://www.loc.gov/marc/makrbrkr.html, the average size of a MARC + // record is 800-1500 chars (bytes). So, we will use 800/record as the + // first buffer's size (then future chunks can be the same size as previous). + // This allows some responsiveness in the first chunk and later ones, to + // ensure we don't waste too much time resizing the buffer. + this.lastChunkSize = 800 * maxRecordsPerChunk; + + this.recordTerminator = options.getRecordTerminator(); + + this.chunkProcessingFutures = new ArrayList<>(); + + startChunk(); + } + + @Override + public WriteStream exceptionHandler( + @Nullable Handler handler + ) { + exceptionHandler = handler; + return this; + } + + @Override + public Future write(Buffer data) { + Promise promise = Promise.promise(); + write(data, promise); + return promise.future(); + } + + @Override + public void write(Buffer data, Handler> handler) { + byte[] bytes = data.getBytes(); + int start = 0; + int len = 0; + + for (int i = 0; i < bytes.length; i++) { + if ( + bytes[i] == recordTerminator && (++recordCount == maxRecordsPerChunk) + ) { + len = i + 1 - start; + + try { + if (currentChunkStream == null) { + startChunk(); + } + currentChunkStream.write(bytes, start, len); + endChunk(); + } catch (IOException e) { + handleWriteException(handler, e); + } + + start = i + 1; + } + } + + if (start < bytes.length) { + len = bytes.length - start; + + if (currentChunkStream == null) { + startChunk(); + } + currentChunkStream.write(bytes, start, len); + } + } + + private void handleWriteException( + Handler> handler, + Exception e + ) { + LOGGER.error("Error writing file chunk: ", e); + if (handler != null) { + handler.handle(Future.failedFuture(e)); + } + if (exceptionHandler != null) { + exceptionHandler.handle(e); + } + chunkUploadingCompositeFuturePromise.fail(e); + } + + @Override + public void end(Handler> handler) { + try { + endChunk(); + handler.handle(Future.succeededFuture()); + // Future.all is not available, CompositeFuture.all is broken, + // and we're on an older Vert.x, so we must resort to this ugly re-encapsulation + // https://github.com/eclipse-vertx/vert.x/issues/2627 + chunkUploadingCompositeFuturePromise.complete( + CompositeFuture.all( + Arrays.asList( + chunkProcessingFutures.toArray( + new Future[chunkProcessingFutures.size()] + ) + ) + ) + ); + } catch (IOException e) { + handler.handle(Future.failedFuture(e)); + chunkUploadingCompositeFuturePromise.fail(e); + } + } + + // unused + @Override + public WriteStream setWriteQueueMaxSize(int maxSize) { + return this; + } + + // unused + @Override + public boolean writeQueueFull() { + return false; + } + + // unused + @Override + public WriteStream drainHandler(@Nullable Handler handler) { + return this; + } + + /** Start processing a new chunk */ + private void startChunk() { + String fileName = FileSplitUtilities.buildChunkKey(outputKey, chunkIndex++); + if (!deleteLocalFiles) { + currentChunkPath = + Path.of(chunkFolder, new File(fileName).getName()).toString(); + } else { + currentChunkPath = new File(fileName).getName(); + } + currentChunkKey = fileName; + currentChunkStream = new ByteArrayOutputStream(lastChunkSize); + LOGGER.debug("starting chunk {}", currentChunkKey); + } + + /** Finalize the current chunk */ + private void endChunk() throws IOException { + if (currentChunkStream != null) { + currentChunkStream.close(); + if (!deleteLocalFiles) { + currentChunkStream.writeTo( + Files.newOutputStream(Path.of(currentChunkPath), CREATE) + ); + } + + // avoid intermediate file-writing + uploadChunkAsync( + new ByteArrayInputStream(currentChunkStream.toByteArray()), + currentChunkKey, + currentChunkPath + ); + + lastChunkSize = currentChunkStream.size(); + + // this will trigger a startChunk when more data is received + currentChunkStream = null; + recordCount = 0; + + LOGGER.debug( + "finished chunk of size {} written to {}", + lastChunkSize, + currentChunkKey + ); + } + } + + private void uploadChunkAsync( + InputStream is, + String chunkKey, + String chunkPath + ) { + Promise chunkPromise = Promise.promise(); + chunkProcessingFutures.add(chunkPromise.future()); + vertxContext.executeBlocking( + event -> { + // chunk file uploading to S3 + if (uploadFilesToS3) { + LOGGER.debug("Uploading file {} to S3", chunkKey); + + try { + minioStorageService + .write(chunkKey, is) + .onComplete(ar -> { + if (ar.failed()) { + LOGGER.error( + "Failed uploading file {}:", + chunkKey, + ar.cause() + ); + + chunkPromise.fail(ar.cause()); + } else if (ar.succeeded()) { + LOGGER.info("Successfully uploaded file {} to S3", chunkKey); + + chunkPromise.complete(chunkKey); + } + }); + } catch (IOException e) { + LOGGER.error("Exception uploading file {} to S3", chunkKey); + LOGGER.error(e); + event.fail(e); + chunkPromise.fail(e); + return; + } + } else { + event.complete(); + chunkPromise.complete(chunkPath); + } + LOGGER.debug("Finished processing chunk: {}", chunkKey); + }, + false + ); + } +} diff --git a/src/main/java/org/folio/service/processing/split/FileSplitWriterOptions.java b/src/main/java/org/folio/service/processing/split/FileSplitWriterOptions.java new file mode 100644 index 00000000..f532ee78 --- /dev/null +++ b/src/main/java/org/folio/service/processing/split/FileSplitWriterOptions.java @@ -0,0 +1,50 @@ +package org.folio.service.processing.split; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import javax.annotation.Nonnull; +import javax.validation.constraints.Min; +import lombok.Builder; +import lombok.Data; +import org.folio.service.s3storage.MinioStorageService; + +@Data +@Builder +public class FileSplitWriterOptions { + + private MinioStorageService minioStorageService; + + @Nonnull + private Context vertxContext; + + /** + * A promise that will resolve with a CompositeFuture containing either S3 + * keys or file paths to each chunk + */ + @Nonnull + private final Promise chunkUploadingCompositeFuturePromise; + + @Builder.Default + private Handler exceptionHandler = null; + + @Nonnull + private String outputKey; + + /** Where temporary files should be stored */ + @Nonnull + private String chunkFolder; + + @Min(1) + private int maxRecordsPerChunk; + + @Builder.Default + private boolean uploadFilesToS3 = true; + + @Builder.Default + private boolean deleteLocalFiles = true; + + @Builder.Default + private byte recordTerminator = FileSplitUtilities.MARC_RECORD_TERMINATOR; +} diff --git a/src/test/java/org/folio/service/processing/split/AsyncInputStreamTest.java b/src/test/java/org/folio/service/processing/split/AsyncInputStreamTest.java new file mode 100644 index 00000000..99a8812c --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/AsyncInputStreamTest.java @@ -0,0 +1,355 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class AsyncInputStreamTest { + + protected static Vertx vertx = Vertx.vertx(); + + // 0 chunks + protected static byte[] emptyBuff = new byte[0]; + // 0.5 chunks + protected static byte[] smallBuff = new byte[8192 / 2]; + // 1.0 chunks + protected static byte[] mediumBuff = new byte[8192]; + // 2.5 chunks + protected static byte[] largeBuff = new byte[8192 * 2 + 8192 / 2]; + + static { + for (int i = 0; i < smallBuff.length; i++) { + smallBuff[i] = (byte) (i / 32); + } + for (int i = 0; i < mediumBuff.length; i++) { + mediumBuff[i] = (byte) (i / 32); + } + for (int i = 0; i < largeBuff.length; i++) { + largeBuff[i] = (byte) (i / 32); + } + } + + @Test + public void testHandlerEmpty(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(emptyBuff) + ); + + stream.endHandler(v -> + context.verify(vv -> { + async.complete(); + assertThat(stream.closed(), is(true)); + }) + ); + stream.exceptionHandler(err -> context.fail(err)); + stream.handler(buff -> context.fail("No data should have been read")); + + stream.read(); + } + + @Test + public void testHandlerOutOfOrder(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(emptyBuff) + ); + + stream.handler(buff -> context.fail("No data should have been read")); + + stream.read(); + + vertx.setTimer( + 100, + _v -> { + stream.endHandler(v -> + context.fail( + "End handler should not be called after stream is consumed" + ) + ); + stream.exceptionHandler(err -> context.fail(err)); + + // make sure neither are called, then complete + vertx.setTimer(100, __v -> async.complete()); + } + ); + } + + @Test + @SuppressWarnings("java:S2699") + public void testHandlerSmall(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(smallBuff) + ); + + List receivedData = new ArrayList<>(); + + stream.endHandler(v -> + context.verify(_v -> { + assertThat(receivedData, hasSize(1)); + assertThat(receivedData, hasSize(1)); + assertThat(receivedData.get(0), is(smallBuff)); + assertThat(receivedData.get(0), is(smallBuff)); + async.complete(); + }) + ); + stream.exceptionHandler(err -> context.fail(err)); + stream.handler(buff -> receivedData.add(buff.getBytes())); + + stream.read(); + } + + @Test + @SuppressWarnings("java:S2699") + public void testHandlerMedium(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(mediumBuff) + ); + + List receivedData = new ArrayList<>(); + + stream.exceptionHandler(err -> context.fail(err)); + stream.endHandler(v -> + context.verify(_v -> { + assertThat(receivedData, hasSize(1)); + assertThat(receivedData, hasSize(1)); + assertThat(receivedData.get(0), is(mediumBuff)); + assertThat(receivedData.get(0), is(mediumBuff)); + async.complete(); + }) + ); + stream.handler(buff -> receivedData.add(buff.getBytes())); + + stream.read(); + } + + @Test + @SuppressWarnings("java:S2699") + public void testHandlerLarge(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(largeBuff) + ); + + List receivedData = new ArrayList<>(); + + stream.exceptionHandler(err -> context.fail(err)); + stream.endHandler(v -> + context.verify(_v -> { + assertThat(receivedData, hasSize(3)); + assertThat(receivedData, hasSize(3)); + assertThat( + receivedData.get(0), + is(Arrays.copyOfRange(largeBuff, 0, 8192)) + ); + assertThat( + receivedData.get(1), + is(Arrays.copyOfRange(largeBuff, 8192, 8192 * 2)) + ); + assertThat( + receivedData.get(2), + is(Arrays.copyOfRange(largeBuff, 8192 * 2, 8192 * 2 + 4096)) + ); + async.complete(); + }) + ); + stream.handler(buff -> receivedData.add(buff.getBytes())); + + stream.read(); + } + + @Test + public void testPauseResume(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(largeBuff) + ); + + stream.handler(buff -> { + stream.pause(); + assertThat(stream.active(), is(false)); + + stream.resume(); + assertThat(stream.active(), is(true)); + + async.complete(); + }); + + stream.read(); + } + + @Test + public void testPauseFetchResumeForConsumed(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(smallBuff) + ); + + List receivedData = new ArrayList<>(); + AtomicBoolean isPaused = new AtomicBoolean(false); + + stream.handler(buff -> { + if (isPaused.get()) { + context.fail("Should not have received data while paused"); + } + + receivedData.add(buff.getBytes()); + + stream.pause(); + isPaused.set(true); + + vertx.setTimer( + 100, + v -> { + // consumed + stream.resume(); + assertThat(stream.active(), is(false)); + + async.complete(); + } + ); + }); + + stream.read(); + } + + @Test + public void testResumeClosed(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(largeBuff) + ); + + AtomicBoolean isPaused = new AtomicBoolean(false); + + stream.handler(buff -> + context.verify(v -> { + if (isPaused.get()) { + context.fail("Should not have received data while paused"); + } + + assertThat(stream.active(), is(true)); + + stream.pause(); + assertThat(stream.active(), is(false)); + + isPaused.set(true); + + assertThat(stream.closed(), is(false)); + stream.close(); + assertThat(stream.closed(), is(true)); + + // stays paused after closure + stream.resume(); + stream.read(); + assertThat(stream.active(), is(false)); + + // give time for additional chunks to be read, + // to ensure no more are + vertx.setTimer(100L, vv -> async.complete()); + }) + ); + + stream.read(); + } + + @Test + @SuppressWarnings("java:S2699") + public void testHandlerRemoval(TestContext context) { + Async async = context.async(); + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(largeBuff) + ); + + List receivedData = new ArrayList<>(); + + stream.endHandler(v -> + context.verify(_v -> { + assertThat(receivedData, hasSize(1)); + assertThat(receivedData, hasSize(1)); + assertThat( + receivedData.get(0), + is(Arrays.copyOfRange(largeBuff, 0, 8192)) + ); + async.complete(); + }) + ); + + stream.handler(buff -> { + receivedData.add(buff.getBytes()); + + // deregister handler; no more chunks should be sent and it should end + stream.handler(null); + }); + + stream.read(); + } + + @Test + public void testCloseFailure(TestContext context) { + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(smallBuff) { + @Override + public void close() throws IOException { + throw new IOException("test"); + } + } + ); + + Async async = context.strictAsync(2); + + stream.endHandler(v -> async.countDown()); + stream.exceptionHandler(t -> async.countDown()); + stream.handler(buff -> context.fail("Should not have received data")); + + stream.close(); + } + + @Test + public void testReadFailure(TestContext context) { + AsyncInputStream stream = new AsyncInputStream( + vertx.getOrCreateContext(), + new InputStream() { + @Override + public int read() throws IOException { + throw new IOException("test"); + } + } + ); + + Async async = context.strictAsync(2); + + stream.endHandler(v -> async.countDown()); + stream.exceptionHandler(t -> async.countDown()); + stream.handler(buff -> context.fail("Should not have received data")); + + stream.read(); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitServiceTest.java b/src/test/java/org/folio/service/processing/split/FileSplitServiceTest.java new file mode 100644 index 00000000..479b5a11 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitServiceTest.java @@ -0,0 +1,181 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.folio.service.s3storage.MinioStorageService; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +@RunWith(VertxUnitRunner.class) +public class FileSplitServiceTest { + + protected static final Vertx vertx = Vertx.vertx(); + + @Mock + MinioStorageService minioStorageService; + + FileSplitService fileSplitService; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + + this.fileSplitService = + new FileSplitService(vertx, minioStorageService, 1000); + when(minioStorageService.write(any(), any())) + .thenReturn(Future.succeededFuture()); + } + + @Test + public void testSplitFileFromS3(TestContext context) throws IOException { + when(minioStorageService.readFile("test-key")) + .thenReturn( + Future.succeededFuture( + new ByteArrayInputStream( + Files.readAllBytes(Path.of("src/test/resources/10000.mrc")) + ) + ) + ); + when(minioStorageService.remove("test-key")) + .thenReturn(Future.succeededFuture()); + + fileSplitService + .splitFileFromS3(vertx.getOrCreateContext(), "test-key") + .onComplete( + context.asyncAssertSuccess(result -> { + try { + assertThat( + result, + containsInAnyOrder( + "test-key_1", + "test-key_2", + "test-key_3", + "test-key_4", + "test-key_5", + "test-key_6", + "test-key_7", + "test-key_8", + "test-key_9", + "test-key_10" + ) + ); + + verify(minioStorageService, times(1)).readFile("test-key"); + verify(minioStorageService, times(10)).write(any(), any()); + verify(minioStorageService, times(1)).remove("test-key"); + + verifyNoMoreInteractions(minioStorageService); + } catch (IOException e) { + context.fail(e); + } + }) + ); + } + + @Test + @SuppressWarnings("java:S2699") + public void testSplitFileFromS3Exceptional(TestContext context) + throws IOException { + when(minioStorageService.readFile("test-key")) + .thenReturn( + Future.succeededFuture(new ByteArrayInputStream(new byte[1])) + ); + + try ( + MockedStatic mock = Mockito.mockStatic( + FileSplitUtilities.class, + Mockito.CALLS_REAL_METHODS + ) + ) { + mock + .when(() -> FileSplitUtilities.createTemporaryDir(anyString())) + .thenThrow(IOException.class); + + fileSplitService + .splitFileFromS3(vertx.getOrCreateContext(), "test-key") + .onComplete( + context.asyncAssertFailure(result -> { + assertThat(result, is(instanceOf(UncheckedIOException.class))); + + verify(minioStorageService, times(1)).readFile("test-key"); + + verifyNoMoreInteractions(minioStorageService); + }) + ); + } + } + + @Test + public void testSplitStream(TestContext context) throws IOException { + fileSplitService + .splitStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(new byte[1]), + "test-key" + ) + .onComplete( + context.asyncAssertSuccess(result -> + assertThat(result, containsInAnyOrder("test-key_1")) + ) + ); + } + + @Test + public void testBadTemporaryDirectory(TestContext context) + throws IOException { + File test = temporaryFolder.newFolder(); + System.out.println(test.toString()); + + // mockito mock static FileSplitUtilities::createTemporaryDir(String key) + try ( + MockedStatic mock = Mockito.mockStatic( + FileSplitUtilities.class, + Mockito.CALLS_REAL_METHODS + ) + ) { + mock + .when(() -> FileSplitUtilities.createTemporaryDir(anyString())) + .thenReturn(test.toPath()); + + // can't be deleted after completion if there's a file in there + Files.createFile(Path.of(test.getAbsolutePath(), "test-file")); + + fileSplitService + .splitStream( + vertx.getOrCreateContext(), + new ByteArrayInputStream(new byte[1]), + "test-key" + ) + .onComplete(context.asyncAssertSuccess()); + } + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesChunkKeyTest.java b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesChunkKeyTest.java new file mode 100644 index 00000000..580ce385 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesChunkKeyTest.java @@ -0,0 +1,52 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class FileSplitUtilitiesChunkKeyTest { + + // tuples of [key, part number, expected] + @Parameters + public static Collection getCases() { + return Arrays.asList( + new Object[] { "test", 1, "test_1" }, + new Object[] { "test.mrc", 1, "test_1.mrc" }, + new Object[] { "test.mrc", 234, "test_234.mrc" }, + new Object[] { "test.foo.mrc", 1, "test.foo_1.mrc" }, + new Object[] { "test.foo_12.mrc", 2, "test.foo_12_2.mrc" }, + new Object[] { + "a/really.long/and_..complex.path", + 15, + "a/really.long/and_..complex_15.path", + }, + new Object[] { "windows\\style.path", 128, "windows\\style_128.path" } + ); + } + + private String key; + private int partNumber; + private String expected; + + public FileSplitUtilitiesChunkKeyTest( + String key, + int partNumber, + String expected + ) { + this.key = key; + this.partNumber = partNumber; + this.expected = expected; + } + + @Test + public void testChunkKeyGeneration() { + assertThat(FileSplitUtilities.buildChunkKey(key, partNumber), is(expected)); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesCountTest.java b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesCountTest.java new file mode 100644 index 00000000..89b47511 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesCountTest.java @@ -0,0 +1,79 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class FileSplitUtilitiesCountTest { + + private static final JobProfileInfo MARC_PROFILE = new JobProfileInfo() + .withDataType(JobProfileInfo.DataType.MARC); + private static final JobProfileInfo EDIFACT_PROFILE = new JobProfileInfo() + .withDataType(JobProfileInfo.DataType.EDIFACT); + + // tuples of [path, number of records, profile] + @Parameters + public static Collection getCases() { + return Arrays.asList( + new Object[] { "0.mrc", 0, MARC_PROFILE }, + // 1 buffer read + new Object[] { "1.mrc", 1, MARC_PROFILE }, + // multiple buffer reads + new Object[] { "100.mrc", 100, MARC_PROFILE }, + new Object[] { "2500.mrc", 2500, MARC_PROFILE }, + new Object[] { "5000.mrc", 5000, MARC_PROFILE }, + new Object[] { "10000.mrc", 10000, MARC_PROFILE }, + new Object[] { "22778.mrc", 22778, MARC_PROFILE }, + new Object[] { "50000.mrc", 50000, MARC_PROFILE }, + new Object[] { "invalidMarcFile.mrc", 0, MARC_PROFILE }, + // MARC XML + new Object[] { "UChicago_SampleBibs.xml", 62, MARC_PROFILE }, + // MARC JSON + new Object[] { "ChalmersFOLIOExamples.json", 62, MARC_PROFILE }, + // Edifact + new Object[] { "edifact/TAMU-HRSW20200808072013.EDI", 7, EDIFACT_PROFILE } + ); + } + + private String path; + private int count; + private JobProfileInfo profile; + + public FileSplitUtilitiesCountTest( + String path, + int count, + JobProfileInfo profile + ) { + this.path = "src/test/resources/" + path; + this.count = count; + this.profile = profile; + } + + @Test + public void testCountAndStreamClose() throws IOException { + BufferedInputStream inputStream = new BufferedInputStream( + new FileInputStream(path) + ); + + assertThat( + FileSplitUtilities.countRecordsInFile(path, inputStream, profile), + is(count) + ); + + // countRecordsInMarcFile closes the stream, so it should be unavailable after + // call, throwing an IOException + assertThrows(IOException.class, () -> inputStream.available()); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesIsBinaryMarcTest.java b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesIsBinaryMarcTest.java new file mode 100644 index 00000000..50028589 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesIsBinaryMarcTest.java @@ -0,0 +1,55 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class FileSplitUtilitiesIsBinaryMarcTest { + + private static final JobProfileInfo MARC_PROFILE = new JobProfileInfo() + .withDataType(JobProfileInfo.DataType.MARC); + private static final JobProfileInfo EDIFACT_PROFILE = new JobProfileInfo() + .withDataType(JobProfileInfo.DataType.EDIFACT); + + // tuples of [path, profile, expected] + @Parameters + public static Collection getCases() { + return Arrays.asList( + new Object[] { "test.mrc", MARC_PROFILE, true }, + new Object[] { "test.mrc21", MARC_PROFILE, true }, + // specifically excludes MARC JSON/XML + new Object[] { "test.json", MARC_PROFILE, false }, + new Object[] { "test.xml", MARC_PROFILE, false }, + // non-marc profile takes precedence + new Object[] { "test.mrc", EDIFACT_PROFILE, false } + ); + } + + private String path; + private JobProfileInfo profile; + private boolean expected; + + public FileSplitUtilitiesIsBinaryMarcTest( + String path, + JobProfileInfo profile, + boolean expected + ) { + this.path = "src/test/resources/" + path; + this.profile = profile; + this.expected = expected; + } + + @Test + public void testIsBinaryMarcFile() throws IOException { + assertThat(FileSplitUtilities.isMarcBinary(path, profile), is(expected)); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesTempDirTest.java b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesTempDirTest.java new file mode 100644 index 00000000..64f274a2 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitUtilitiesTempDirTest.java @@ -0,0 +1,26 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +import java.io.File; +import java.io.IOException; +import org.junit.Test; + +public class FileSplitUtilitiesTempDirTest { + + @Test + public void testTemporaryDirectory() throws IOException { + File tempDir = FileSplitUtilities.createTemporaryDir("test-key").toFile(); + + // in case assertions fail + tempDir.deleteOnExit(); + + assertThat(tempDir.exists(), is(true)); + assertThat(tempDir.isDirectory(), is(true)); + assertThat(tempDir.getPath(), containsString("test-key")); + + tempDir.delete(); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterDeleteLocalTest.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterDeleteLocalTest.java new file mode 100644 index 00000000..2d047ada --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterDeleteLocalTest.java @@ -0,0 +1,83 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.File; +import java.io.IOException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class FileSplitWriterDeleteLocalTest { + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String TEST_FILE = "src/test/resources/10.mrc"; + private static final String TEST_KEY = "10.mrc"; + + @Test + public void testCleanup(TestContext context) throws IOException { + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(TEST_FILE, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + try { + File folder = temporaryFolder.newFolder(); + + FileSplitWriter writer = new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(TEST_KEY) + .chunkFolder(folder.toString()) + .maxRecordsPerChunk(3) + .uploadFilesToS3(false) + .deleteLocalFiles(true) + .build() + ); + + file.pipeTo(writer).onComplete(context.asyncAssertSuccess()); + chunkUploadingCompositeFuturePromise + .future() + .onComplete( + context.asyncAssertSuccess(result -> { + assertThat(result.list(), hasSize(4)); + // need to add a small delay since the actual deletion of files can be async + // depending on OS implementations + vertx.setTimer( + 100, + _v -> + context.verify(__v -> + assertThat(folder.listFiles().length, is(0)) + ) + ); + }) + ); + } catch (IOException err) { + context.fail(err); + } + }) + ); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterExceptionalTest.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterExceptionalTest.java new file mode 100644 index 00000000..d84384fb --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterExceptionalTest.java @@ -0,0 +1,129 @@ +package org.folio.service.processing.split; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class FileSplitWriterExceptionalTest { + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String TEST_FILE = "src/test/resources/10.mrc"; + private static final String TEST_KEY = "10.mrc"; + + @Test + public void testInvalidDirectory(TestContext context) throws IOException { + Async async = context.strictAsync(1); // ensure only one exception + + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(TEST_FILE, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + try { + // we will delete this later, so writing will error + File folder = temporaryFolder.newFolder(); + String path = folder.getPath(); + + FileSplitWriter writer = new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(TEST_KEY) + .chunkFolder(path) + .maxRecordsPerChunk(1) + .uploadFilesToS3(false) + .deleteLocalFiles(false) + .build() + ); + + writer.exceptionHandler(err -> async.countDown()); + + for (File f : folder.listFiles()) { + Files.delete(Path.of(f.getPath())); + } + Files.delete(Path.of(folder.getPath())); + + // should not be able to pipe, resulting in failure + file.pipeTo(writer).onComplete(context.asyncAssertFailure()); + } catch (IOException err) { + context.fail(err); + } + }) + ); + } + + @Test + public void testInvalidDirectoryNoHandler(TestContext context) + throws IOException { + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + try { + // we will delete this later, so writing will error + File folder = temporaryFolder.newFolder(); + String path = folder.getPath(); + + FileSplitWriter writer = new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(TEST_KEY) + .chunkFolder(path) + .maxRecordsPerChunk(1) + .uploadFilesToS3(false) + .deleteLocalFiles(false) + .build() + ); + + for (File f : folder.listFiles()) { + Files.delete(Path.of(f.getPath())); + } + Files.delete(Path.of(folder.getPath())); + + // should not be able to write, resulting in failure, but with no handler + // so it will be reported internally only + writer.write( + Buffer.buffer( + new byte[] { + FileSplitUtilities.MARC_RECORD_TERMINATOR, + FileSplitUtilities.MARC_RECORD_TERMINATOR, + FileSplitUtilities.MARC_RECORD_TERMINATOR, + } + ) + ); + + chunkUploadingCompositeFuturePromise + .future() + .onComplete(context.asyncAssertFailure()); + } catch (IOException err) { + context.fail(err); + } + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterRegularTest.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterRegularTest.java new file mode 100644 index 00000000..eba7fc8b --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterRegularTest.java @@ -0,0 +1,380 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.verifyNoInteractions; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunnerWithParametersFactory; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.folio.service.s3storage.MinioStorageService; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(Parameterized.class) +@UseParametersRunnerFactory(VertxUnitRunnerWithParametersFactory.class) +public class FileSplitWriterRegularTest { + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Mock + private MinioStorageService minioStorageService; + + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + FileSplitWriter writer; + + private String sourceFile; + private String key; + private int chunkSize; + private String[] expectedChunkFiles; + + // tuples of [source file, key, chunk size, expected chunk files[]] + @Parameters + public static Collection getCases() { + return Arrays.asList( + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 11, + new String[] { "out_1.mrc" }, + }, + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 10, + new String[] { "out_1.mrc" }, + }, + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 9, + new String[] { "out_1.mrc", "out_2.mrc" }, + }, + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 5, + new String[] { "out_1.mrc", "out_2.mrc" }, + }, + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 3, + new String[] { "out_1.mrc", "out_2.mrc", "out_3.mrc", "out_4.mrc" }, + }, + new Object[] { + "src/test/resources/10.mrc", + "out.mrc", + 1, + new String[] { + "out_1.mrc", + "out_2.mrc", + "out_3.mrc", + "out_4.mrc", + "out_5.mrc", + "out_6.mrc", + "out_7.mrc", + "out_8.mrc", + "out_9.mrc", + "out_10.mrc", + }, + }, + new Object[] { + "src/test/resources/0.mrc", + "none.mrc", + 1, + new String[] { "none_1.mrc" }, + }, + new Object[] { + "src/test/resources/1.mrc", + "single.mrc", + 10, + new String[] { "single_1.mrc" }, + }, + new Object[] { + "src/test/resources/1.mrc", + "single.mrc", + 1, + new String[] { "single_1.mrc" }, + }, + new Object[] { + "src/test/resources/100.mrc", + "big.mrc", + 60, + new String[] { "big_1.mrc", "big_2.mrc" }, + }, + new Object[] { + "src/test/resources/5000.mrc", + "5000.mrc", + 1000, + new String[] { + "5000_1.mrc", + "5000_2.mrc", + "5000_3.mrc", + "5000_4.mrc", + "5000_5.mrc", + }, + }, + new Object[] { + "src/test/resources/10000.mrc", + "10000.mrc", + 1000, + new String[] { + "10000_1.mrc", + "10000_2.mrc", + "10000_3.mrc", + "10000_4.mrc", + "10000_5.mrc", + "10000_6.mrc", + "10000_7.mrc", + "10000_8.mrc", + "10000_9.mrc", + "10000_10.mrc", + }, + }, + new Object[] { + "src/test/resources/22778.mrc", + "22778.mrc", + 2300, + new String[] { + "22778_1.mrc", + "22778_2.mrc", + "22778_3.mrc", + "22778_4.mrc", + "22778_5.mrc", + "22778_6.mrc", + "22778_7.mrc", + "22778_8.mrc", + "22778_9.mrc", + "22778_10.mrc", + }, + }, + new Object[] { + "src/test/resources/50000.mrc", + "50000.mrc", + 5000, + new String[] { + "50000_1.mrc", + "50000_2.mrc", + "50000_3.mrc", + "50000_4.mrc", + "50000_5.mrc", + "50000_6.mrc", + "50000_7.mrc", + "50000_8.mrc", + "50000_9.mrc", + "50000_10.mrc", + }, + } + ); + } + + public FileSplitWriterRegularTest( + String sourceFile, + String key, + int chunkSize, + String[] expectedChunkFiles + ) throws IOException { + this.sourceFile = sourceFile; + this.key = key; + this.chunkSize = chunkSize; + this.expectedChunkFiles = expectedChunkFiles; + } + + @Before + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + + writer = + new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(key) + .chunkFolder(temporaryFolder.newFolder().getPath()) + .maxRecordsPerChunk(chunkSize) + .uploadFilesToS3(false) + .deleteLocalFiles(false) + .build() + ); + } + + @Test + public void testSplit(TestContext context) throws IOException { + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(sourceFile, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + // start the splitting + file + .pipeTo(writer) + .onComplete( + context.asyncAssertSuccess(v -> { + // splitting finished, "uploading" in progress + chunkUploadingCompositeFuturePromise + .future() + .onComplete( + context.asyncAssertSuccess(cf -> + cf.onComplete( + context.asyncAssertSuccess(internalFuture -> { + // "uploading" finished, check the results + List paths = internalFuture + .list() + .stream() + .map(obj -> Path.of((String) obj)) + .collect(Collectors.toList()); + List fileNames = paths + .stream() + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + + // number of chunks and chunk names + assertThat(fileNames, contains(expectedChunkFiles)); + + // read and verify chunking + int totalSize = 0; + List fileContents = new ArrayList<>(); + + // get each file's contents + for (Path path : paths) { + File actualFile = path.toFile(); + totalSize += actualFile.length(); + try ( + FileInputStream fileStream = new FileInputStream( + actualFile + ) + ) { + fileContents.add(fileStream.readAllBytes()); + } catch (IOException err) { + context.fail(err); + } + } + + // recombine the split chunks + byte[] actual = new byte[totalSize]; + int pos = 0; + for (byte[] content : fileContents) { + System.arraycopy( + content, + 0, + actual, + pos, + content.length + ); + pos += content.length; + } + + // verify end delimiter + for (byte[] content : fileContents) { + if (content.length > 0) { // don't check empty chunks (for empty starting file) + assertThat( + content[content.length - 1], + is(FileSplitUtilities.MARC_RECORD_TERMINATOR) + ); + } + } + + // + 1 is sufficient in case the original file is larger since it will read in + // some extra after the splits, which is enough to fail the test + file + .read(Buffer.buffer(), 0, 0, totalSize + 1) + .onComplete( + context.asyncAssertSuccess(expectedBuffer -> { + try { + byte[] expected = expectedBuffer.getBytes(); + // verify the chunks are equivalent + assertThat(actual, is(expected)); + + // verify counts of records in each are correct + int totalRecords = countRecordsInMarcFile( + new ByteArrayInputStream(actual) + ); + + for ( + int i = 0; + i < fileContents.size(); + i++ + ) { + if (i == fileContents.size() - 1) { + // the last slice should have all remaining records + assertThat( + countRecordsInMarcFile( + new ByteArrayInputStream( + fileContents.get(i) + ) + ), + is(totalRecords) + ); + } else { + // all other slices should have a full chunk + assertThat( + countRecordsInMarcFile( + new ByteArrayInputStream( + fileContents.get(i) + ) + ), + is(chunkSize) + ); + totalRecords -= chunkSize; + } + } + + verifyNoInteractions(minioStorageService); + } catch (IOException err) { + context.fail(err); + } + }) + ); + }) + ) + ) + ); + }) + ); + }) + ); + } + + private int countRecordsInMarcFile(InputStream stream) throws IOException { + return FileSplitUtilities.countRecordsInFile( + "placeholder.mrc", + stream, + new JobProfileInfo().withDataType(JobProfileInfo.DataType.MARC) + ); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterS3ExceptionalTest.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterS3ExceptionalTest.java new file mode 100644 index 00000000..00139439 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterS3ExceptionalTest.java @@ -0,0 +1,117 @@ +package org.folio.service.processing.split; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.IOException; +import java.io.InputStream; +import org.folio.service.s3storage.MinioStorageService; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(VertxUnitRunner.class) +public class FileSplitWriterS3ExceptionalTest { + + private static final String TEST_FILE = "src/test/resources/10.mrc"; + private static final String TEST_KEY = "10.mrc"; + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + @Mock + private MinioStorageService minioStorageService; + + @Captor + private ArgumentCaptor captor; + + private FileSplitWriter writer; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + + writer = + new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .minioStorageService(minioStorageService) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(TEST_KEY) + .chunkFolder(temporaryFolder.newFolder().toString()) + .maxRecordsPerChunk(3) + .uploadFilesToS3(true) + .deleteLocalFiles(false) + .build() + ); + } + + @Test + public void testExceptionBeforeUpload(TestContext context) + throws IOException { + when(minioStorageService.write(any(), any())).thenThrow(new IOException()); + + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(TEST_FILE, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + file.pipeTo(writer).onComplete(context.asyncAssertSuccess()); + chunkUploadingCompositeFuturePromise + .future() + .onComplete( + context.asyncAssertSuccess(cf -> + cf.onComplete(context.asyncAssertFailure()) + ) + ); + }) + ); + } + + @Test + public void testExceptionDuringUpload(TestContext context) + throws IOException { + when(minioStorageService.write(any(), any())) + .thenReturn(Future.failedFuture("test")); + + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(TEST_FILE, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + file.pipeTo(writer).onComplete(context.asyncAssertSuccess()); + chunkUploadingCompositeFuturePromise + .future() + .onComplete( + context.asyncAssertSuccess(cf -> + cf.onComplete(context.asyncAssertFailure()) + ) + ); + }) + ); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterS3Test.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterS3Test.java new file mode 100644 index 00000000..7535d2f7 --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterS3Test.java @@ -0,0 +1,163 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunnerWithParametersFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import org.folio.service.s3storage.MinioStorageService; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(Parameterized.class) +@UseParametersRunnerFactory(VertxUnitRunnerWithParametersFactory.class) +public class FileSplitWriterS3Test { + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + File chunkDir; + + Promise chunkUploadingCompositeFuturePromise = Promise.promise(); + + @Mock + private MinioStorageService minioStorageService; + + @Captor + private ArgumentCaptor captor; + + private FileSplitWriter writer; + + private String sourceFile; + private String key; + private int chunkSize; + + // tuples of [source file, key, chunk size] + // expected chunk files are checked by FileSplitWriterRegularTest + @Parameters + public static Collection getCases() { + return Arrays.asList( + new Object[] { "src/test/resources/10.mrc", "out.mrc", 11 }, + new Object[] { "src/test/resources/10.mrc", "out.mrc", 10 }, + new Object[] { "src/test/resources/10.mrc", "out.mrc", 9 }, + new Object[] { "src/test/resources/10.mrc", "out.mrc", 5 }, + new Object[] { "src/test/resources/10.mrc", "out.mrc", 3 }, + new Object[] { "src/test/resources/10.mrc", "out.mrc", 1 }, + new Object[] { "src/test/resources/0.mrc", "none.mrc", 1 }, + new Object[] { "src/test/resources/1.mrc", "single.mrc", 10 }, + new Object[] { "src/test/resources/1.mrc", "single.mrc", 1 }, + new Object[] { "src/test/resources/100.mrc", "big.mrc", 60 } + ); + } + + public FileSplitWriterS3Test(String sourceFile, String key, int chunkSize) + throws IOException { + this.sourceFile = sourceFile; + this.key = key; + this.chunkSize = chunkSize; + } + + @Before + public void setUp() throws IOException { + chunkDir = temporaryFolder.newFolder(); + + MockitoAnnotations.openMocks(this); + + writer = + new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .minioStorageService(minioStorageService) + .chunkUploadingCompositeFuturePromise( + chunkUploadingCompositeFuturePromise + ) + .outputKey(key) + .chunkFolder(chunkDir.toString()) + .maxRecordsPerChunk(chunkSize) + .uploadFilesToS3(true) + .deleteLocalFiles(false) + .build() + ); + } + + @Test + public void testUploadToS3(TestContext context) throws IOException { + when(minioStorageService.write(any(), any())) + .thenReturn(Future.succeededFuture("result")); + + vertx + .getOrCreateContext() + .owner() + .fileSystem() + .open(sourceFile, new OpenOptions().setRead(true)) + .onComplete( + context.asyncAssertSuccess(file -> { + file.pipeTo(writer).onComplete(context.asyncAssertSuccess()); + chunkUploadingCompositeFuturePromise + .future() + .onComplete( + context.asyncAssertSuccess(cf -> + cf.onComplete( + context.asyncAssertSuccess(result -> { + for (Object obj : result.list()) { + String path = Path + .of(chunkDir.toString(), (String) obj) + .toString(); + + try ( + FileInputStream fileStream = new FileInputStream(path) + ) { + verify(minioStorageService) + .write( + eq(Path.of(path).getFileName().toString()), + captor.capture() + ); + assertThat( + captor.getValue().readAllBytes(), + is(equalTo(fileStream.readAllBytes())) + ); + } catch (IOException err) { + context.fail(err); + } + } + + verifyNoMoreInteractions(minioStorageService); + }) + ) + ) + ); + }) + ); + } +} diff --git a/src/test/java/org/folio/service/processing/split/FileSplitWriterUnusedMethodTest.java b/src/test/java/org/folio/service/processing/split/FileSplitWriterUnusedMethodTest.java new file mode 100644 index 00000000..c80e5bee --- /dev/null +++ b/src/test/java/org/folio/service/processing/split/FileSplitWriterUnusedMethodTest.java @@ -0,0 +1,43 @@ +package org.folio.service.processing.split; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.IOException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class FileSplitWriterUnusedMethodTest { + + protected static Vertx vertx = Vertx.vertx(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testUnusedMethods(TestContext context) throws IOException { + FileSplitWriter writer = new FileSplitWriter( + FileSplitWriterOptions + .builder() + .vertxContext(vertx.getOrCreateContext()) + .chunkUploadingCompositeFuturePromise(Promise.promise()) + .outputKey("") + .chunkFolder(temporaryFolder.newFolder().toString()) + .maxRecordsPerChunk(1) + .uploadFilesToS3(false) + .deleteLocalFiles(false) + .build() + ); + + assertThat(writer.setWriteQueueMaxSize(0), is(writer)); + assertThat(writer.writeQueueFull(), is(false)); + assertThat(writer.drainHandler(null), is(writer)); + } +}