Skip to content

Commit

Permalink
Add file splitting code and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ncovercash committed Sep 28, 2023
1 parent 932bf67 commit eae2c8f
Show file tree
Hide file tree
Showing 18 changed files with 2,377 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.DS_Store
.vscode
.idea/
*.iml
node_modules/
Expand Down
144 changes: 144 additions & 0 deletions src/main/java/org/folio/service/processing/split/AsyncInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.folio.service.processing.split;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import javax.annotation.CheckForNull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Accessors(chain = true, fluent = true)
public class AsyncInputStream implements ReadStream<Buffer> {

private static final Logger LOGGER = LogManager.getLogger();

public static final int READ_BUFFER_SIZE = 8192;

private final ReadableByteChannel channel;
private final Context context;

@Getter
private boolean active = false;

@Getter
private boolean closed = false;

@Setter
@CheckForNull
private Handler<Buffer> handler;

@Setter
@CheckForNull
private Handler<Void> endHandler;

@Setter
@CheckForNull
private Handler<Throwable> exceptionHandler;

/**
* Create a new AsyncInputStream to wrap a regular {@link InputStream}
*/
public AsyncInputStream(Context context, InputStream in) {
this.context = context;
this.channel = Channels.newChannel(in);
}

@Override
public ReadStream<Buffer> pause() {
active = false;

return this;
}

@Override
public ReadStream<Buffer> resume() {
read();

return this;
}

public void read() {
fetch(1L);
}

/**
* Fetch the specified amount of elements. If the ReadStream has been paused, reading will
* recommence.
*
* <strong>Note: the {@code amount} parameter is currently ignored.</strong>
*
* @param amount has no effect; retained for compatibility with {@link ReadStream#fetch(long)}
*/
@Override
public ReadStream<Buffer> fetch(long amount) {
if (!closed) {
active = true;
doRead();
}

return this;
}

private void doRead() {
context.runOnContext((Void v) -> {
int bytesRead;
ByteBuffer byteBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);

try {
bytesRead = channel.read(byteBuffer);

if (bytesRead > 0) {
byteBuffer.flip();
Buffer buffer = Buffer.buffer(bytesRead);
buffer.setBytes(0, byteBuffer);

if (this.handler != null) {
handler.handle(buffer);
}

doRead();
} else {
close();
}
} catch (IOException e) {
LOGGER.error("Unable to read from channel:", e);
close();
reportException(e);
return;
}
});
}

public void close() {
if (!closed) {
closed = true;
active = false;

if (this.endHandler != null) {
context.runOnContext(vv -> this.endHandler.handle(null));
}

try {
channel.close();
} catch (IOException e) {
reportException(e);
}
}
}

private void reportException(Exception e) {
LOGGER.error("Received exception:", e);
if (this.exceptionHandler != null) {
context.runOnContext(vv -> this.exceptionHandler.handle(e));
}
}
}
136 changes: 136 additions & 0 deletions src/main/java/org/folio/service/processing/split/FileSplitService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.folio.service.processing.split;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.service.s3storage.MinioStorageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class FileSplitService {

private static final Logger LOGGER = LogManager.getLogger();

private Vertx vertx;

private MinioStorageService minioStorageService;

private int maxRecordsPerChunk;

@Autowired
public FileSplitService(
Vertx vertx,
MinioStorageService minioStorageService,
@Value("${RECORDS_PER_SPLIT_FILE:1000}") int maxRecordsPerChunk
) {
this.vertx = vertx;
this.minioStorageService = minioStorageService;
this.maxRecordsPerChunk = maxRecordsPerChunk;
}

/**
* Read a file from S3 and split it into parts.
*
* @return a {@link Promise} that wraps a list of string keys and will resolve
* once every split chunk has been uploaded to MinIO/S3.
* @throws IOException if the file cannot be read or if temporary files cannot
* be created
*/
public Future<List<String>> splitFileFromS3(Context context, String key) {
return minioStorageService
.readFile(key)
.compose((InputStream stream) -> {
// this stream will be closed as part of splitStream
try {
return splitStream(context, stream, key)
.compose((List<String> result) -> {
LOGGER.info("Split from S3 completed...deleting original file");
return minioStorageService.remove(key).map(v -> result);
});
} catch (IOException e) {
LOGGER.error("Unable to split file", e);
throw new UncheckedIOException(e);
}
});
}

/**
* Take a file, as an {@link InputStream}, split it into parts, and close it after.
*
* @return a {@link Future} which will resolve with a list of strings once every
* split chunk has been uploaded to MinIO/S3.
* @throws IOException if the stream cannot be read or if temporary files cannot
* be created
*/
public Future<List<String>> splitStream(
Context context,
InputStream stream,
String key
) throws IOException {
Promise<CompositeFuture> promise = Promise.promise();

Path tempDir = FileSplitUtilities.createTemporaryDir(key);

LOGGER.info(
"Streaming stream with key={} to writer, temporary folder={}...",
key,
tempDir
);

FileSplitWriter writer = new FileSplitWriter(
FileSplitWriterOptions
.builder()
.vertxContext(context)
.minioStorageService(minioStorageService)
.chunkUploadingCompositeFuturePromise(promise)
.outputKey(key)
.chunkFolder(tempDir.toString())
.maxRecordsPerChunk(maxRecordsPerChunk)
.uploadFilesToS3(true)
.deleteLocalFiles(true)
.build()
);

AsyncInputStream asyncStream = new AsyncInputStream(context, stream);
asyncStream
.pipeTo(writer)
.onComplete(ar1 -> LOGGER.info("File split for key={} completed", key));

return promise
// original future resolves once the chunks are split, but NOT uploaded
.future()
// this composite future resolves once all are uploaded
.compose(cf -> cf)
// now let's turn this back into a List<String>
.map(cf -> cf.list())
.map(list ->
list.stream().map(String.class::cast).collect(Collectors.toList())
)
// and since we're all done, we can delete the temporary folder
.compose((List<String> innerResult) -> {
LOGGER.info("Deleting temporary folder={}", tempDir);

return vertx
.fileSystem()
.deleteRecursive(tempDir.toString(), true)
.map(v -> innerResult);
})
.onSuccess(result ->
LOGGER.info("All done splitting! Got chunks {}", result)
)
.onFailure(err -> LOGGER.error("Unable to split file: ", err))
.onComplete(v -> asyncStream.close());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.folio.service.processing.split;

import static org.folio.service.processing.reader.MarcJsonReader.JSON_EXTENSION;
import static org.folio.service.processing.reader.MarcXmlReader.XML_EXTENSION;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermissions;
import lombok.experimental.UtilityClass;
import org.apache.commons.io.FilenameUtils;
import org.folio.rest.jaxrs.model.JobProfileInfo;
import org.folio.service.processing.ParallelFileChunkingProcessor;

@UtilityClass
public class FileSplitUtilities {

public static final byte MARC_RECORD_TERMINATOR = (byte) 0x1d;

/**
* Creates the S3 key for a split chunk within a larger file
*/
public static String buildChunkKey(String baseKey, int partNumber) {
String[] keyNameParts = baseKey.split("\\.");

if (keyNameParts.length > 1) {
String partUpdate = String.format(
"%s_%s",
keyNameParts[keyNameParts.length - 2],
partNumber
);
keyNameParts[keyNameParts.length - 2] = partUpdate;
return String.join(".", keyNameParts);
}

return String.format("%s_%s", baseKey, partNumber);
}

/**
* Counts records in a given {@link InputStream}, <strong>closing it afterwards</strong>.
*
* @throws IOException if the stream cannot be read or a temp file cannot be created
*/
public static int countRecordsInFile(
String filename,
InputStream inStream,
JobProfileInfo profile
) throws IOException {
File tempFile = Files
.createTempFile(
"di-tmp-",
// later stage requires correct file extension
Path.of(filename).getFileName().toString(),
PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwx------")
)
)
.toFile();

try (
InputStream autoCloseMe = inStream;
OutputStream fileOutputStream = new FileOutputStream(tempFile)
) {
inStream.transferTo(fileOutputStream);
fileOutputStream.flush();

return ParallelFileChunkingProcessor.countTotalRecordsInFile(
tempFile,
profile
);
} finally {
Files.deleteIfExists(tempFile.toPath());
}
}

public static Path createTemporaryDir(String key) throws IOException {
return Files.createTempDirectory(
String.format("di-split-%s", key.replace('/', '-')),
PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwx------")
)
);
}

public boolean isMarcBinary(String path, JobProfileInfo profile) {
if (profile.getDataType() != JobProfileInfo.DataType.MARC) {
return false;
}

String extension = FilenameUtils.getExtension(path);

return (
!JSON_EXTENSION.equals(extension) && !XML_EXTENSION.equals(extension)
);
}
}
Loading

0 comments on commit eae2c8f

Please sign in to comment.