Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

10977 Globus Upload optimization - batch file size lookups #11040

Merged
merged 12 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/release-notes/10977-globus-filesize-lookup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## A new Globus optimization setting

An optimization has been added for the Globus upload workflow, with a corresponding new database setting: `:GlobusBatchLookupSize`


See the [Database Settings](https://guides.dataverse.org/en/6.5/installation/config.html#GlobusBatchLookupSize) section of the Guides for more information.
7 changes: 7 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4849,6 +4849,13 @@ The URL where the `dataverse-globus <https://github.com/scholarsportal/dataverse

The interval in seconds between Dataverse calls to Globus to check on upload progress. Defaults to 50 seconds (or to 10 minutes, when the ``globus-use-experimental-async-framework`` feature flag is enabled). See :ref:`globus-support` for details.

.. _:GlobusBatchLookupSize:

:GlobusBatchLookupSize
++++++++++++++++++++++

In the initial implementation, when files were added to the dataset upon completion of a Globus upload task, Dataverse would make a separate Globus API call to look up the size of every new file. This proved to be a significant bottleneck at Harvard Dataverse with users transferring batches of many thousands of files (this in turn was made possible by the Globus improvements in v6.4). An optimized lookup mechanism was added in response, where the Globus Service makes a listing API call on the entire remote folder, then populates the file sizes for all the new file entries before passing them to the Ingest service. This approach however may in fact slow things down in a scenario where there are already thousands of files in the Globus folder for the dataset, and only a small number of new files are being added. To address this, the number of files in a batch for which this method should be used was made configurable. If not set, it will default to 50 (a completely arbitrary number). Setting it to 0 will always use this method with Globus uploads. Setting it to some very large number will disable it completely. This was made a database setting, as opposed to a JVM option, in order to make it configurable in real time.

:GlobusSingleFileTransfer
+++++++++++++++++++++++++

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public Boolean sendNotificationEmail(UserNotification notification, String comme
if (objectOfNotification != null){
String messageText = getMessageTextBasedOnNotification(notification, objectOfNotification, comment, requestor);
String subjectText = MailUtil.getSubjectTextBasedOnNotification(notification, objectOfNotification);
if (!(messageText.isEmpty() || subjectText.isEmpty())){
if (!(StringUtils.isEmpty(messageText) || StringUtils.isEmpty(subjectText))){
retval = sendSystemEmail(emailAddress, subjectText, messageText, isHtmlContent);
} else {
logger.warning("Skipping " + notification.getType() + " notification, because couldn't get valid message");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public long retrieveSizeFromMedia() {
JsonArray dataArray = responseJson.getJsonArray("DATA");
if (dataArray != null && dataArray.size() != 0) {
//File found
return (long) responseJson.getJsonArray("DATA").getJsonObject(0).getJsonNumber("size").longValueExact();
return (long) dataArray.getJsonObject(0).getJsonNumber("size").longValueExact();
}
} else {
logger.warning("Response from " + get.getURI().toString() + " was "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ public class AddReplaceFileHelper{
private String newFileName; // step 30
private String newFileContentType; // step 30
private String newStorageIdentifier; // step 30
private String newCheckSum; // step 30
private ChecksumType newCheckSumType; //step 30

// -- Optional
private DataFile fileToReplace; // step 25
Expand All @@ -146,6 +144,7 @@ public class AddReplaceFileHelper{
private DatasetVersion clone;
List<DataFile> initialFileList;
List<DataFile> finalFileList;
private boolean trustSuppliedFileSizes;

// -----------------------------------
// Ingested files
Expand Down Expand Up @@ -610,15 +609,9 @@ private boolean runAddReplacePhase1(Dataset owner,
return false;

}
if(optionalFileParams != null) {
if(optionalFileParams.hasCheckSum()) {
newCheckSum = optionalFileParams.getCheckSum();
newCheckSumType = optionalFileParams.getCheckSumType();
}
}

msgt("step_030_createNewFilesViaIngest");
if (!this.step_030_createNewFilesViaIngest()){
if (!this.step_030_createNewFilesViaIngest(optionalFileParams)){
return false;

}
Expand Down Expand Up @@ -1191,7 +1184,7 @@ private boolean step_007_auto_isReplacementInLatestVersion(DataFile existingFile
}


private boolean step_030_createNewFilesViaIngest(){
private boolean step_030_createNewFilesViaIngest(OptionalFileParams optionalFileParams){

if (this.hasError()){
return false;
Expand All @@ -1203,21 +1196,28 @@ private boolean step_030_createNewFilesViaIngest(){
//Don't repeatedly update the clone (losing changes) in multifile case
clone = workingVersion.cloneDatasetVersion();
}

Long suppliedFileSize = null;
String newCheckSum = null;
ChecksumType newCheckSumType = null;


if (optionalFileParams != null) {
if (optionalFileParams.hasCheckSum()) {
newCheckSum = optionalFileParams.getCheckSum();
newCheckSumType = optionalFileParams.getCheckSumType();
}
if (trustSuppliedFileSizes && optionalFileParams.hasFileSize()) {
suppliedFileSize = optionalFileParams.getFileSize();
}
}

try {
/*CreateDataFileResult result = FileUtil.createDataFiles(workingVersion,
this.newFileInputStream,
this.newFileName,
this.newFileContentType,
this.newStorageIdentifier,
this.newCheckSum,
this.newCheckSumType,
this.systemConfig);*/

UploadSessionQuotaLimit quota = null;
if (systemConfig.isStorageQuotasEnforced()) {
quota = fileService.getUploadSessionQuotaLimit(dataset);
}
Command<CreateDataFileResult> cmd = new CreateNewDataFilesCommand(dvRequest, workingVersion, newFileInputStream, newFileName, newFileContentType, newStorageIdentifier, quota, newCheckSum, newCheckSumType);
Command<CreateDataFileResult> cmd = new CreateNewDataFilesCommand(dvRequest, workingVersion, newFileInputStream, newFileName, newFileContentType, newStorageIdentifier, quota, newCheckSum, newCheckSumType, suppliedFileSize);
CreateDataFileResult createDataFilesResult = commandEngine.submit(cmd);
initialFileList = createDataFilesResult.getDataFiles();

Expand Down Expand Up @@ -2033,9 +2033,15 @@ public void setDuplicateFileWarning(String duplicateFileWarning) {
* @param jsonData - an array of jsonData entries (one per file) using the single add file jsonData format
* @param dataset
* @param authUser
* @param trustSuppliedSizes - whether to accept the fileSize values passed
* in jsonData (we don't want to trust the users of the S3 direct
* upload API with that information - we will verify the status of
* the files in the S3 bucket and confirm the sizes in the process.
* we do want GlobusService to be able to pass the file sizes, since
* they are obtained and verified via a Globus API lookup).
* @return
*/
public Response addFiles(String jsonData, Dataset dataset, User authUser) {
public Response addFiles(String jsonData, Dataset dataset, User authUser, boolean trustSuppliedFileSizes) {
msgt("(addFilesToDataset) jsonData: " + jsonData.toString());

JsonArrayBuilder jarr = Json.createArrayBuilder();
Expand All @@ -2044,6 +2050,7 @@ public Response addFiles(String jsonData, Dataset dataset, User authUser) {

int totalNumberofFiles = 0;
int successNumberofFiles = 0;
this.trustSuppliedFileSizes = trustSuppliedFileSizes;
// -----------------------------------------------------------
// Read jsonData and Parse files information from jsondata :
// -----------------------------------------------------------
Expand Down Expand Up @@ -2176,6 +2183,10 @@ public Response addFiles(String jsonData, Dataset dataset, User authUser) {
.add("data", Json.createObjectBuilder().add("Files", jarr).add("Result", result)).build() ).build();
}

public Response addFiles(String jsonData, Dataset dataset, User authUser) {
return addFiles(jsonData, dataset, authUser, false);
}

/**
* Replace multiple files with prepositioned replacements as listed in the
* jsonData. Works with direct upload, Globus, and other out-of-band methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
* - Provenance related information
*
* @author rmp553
* @todo (?) We may want to consider renaming this class to DataFileParams or
* DataFileInfo... it was originally created to encode some bits of info -
* the file "tags" specifically, that didn't fit in elsewhere in the normal
* workflow; but it's been expanded to cover pretty much everything else associated
* with DataFiles and it's not really "optional" anymore when, for example, used
* in the direct upload workflow. (?)
*/
public class OptionalFileParams {

Expand Down Expand Up @@ -76,6 +82,8 @@ public class OptionalFileParams {
public static final String MIME_TYPE_ATTR_NAME = "mimeType";
private String checkSumValue;
private ChecksumType checkSumType;
public static final String FILE_SIZE_ATTR_NAME = "fileSize";
private Long fileSize;
public static final String LEGACY_CHECKSUM_ATTR_NAME = "md5Hash";
public static final String CHECKSUM_OBJECT_NAME = "checksum";
public static final String CHECKSUM_OBJECT_TYPE = "@type";
Expand Down Expand Up @@ -268,6 +276,18 @@ public String getCheckSum() {
public ChecksumType getCheckSumType() {
return checkSumType;
}

public boolean hasFileSize() {
return fileSize != null;
}

public Long getFileSize() {
return fileSize;
}

public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}

/**
* Set tags
Expand Down Expand Up @@ -416,7 +436,13 @@ else if ((jsonObj.has(CHECKSUM_OBJECT_NAME)) && (!jsonObj.get(CHECKSUM_OBJECT_NA
this.checkSumType = ChecksumType.fromString(((JsonObject) jsonObj.get(CHECKSUM_OBJECT_NAME)).get(CHECKSUM_OBJECT_TYPE).getAsString());

}

// -------------------------------
// get file size as a Long, if supplied
// -------------------------------
if ((jsonObj.has(FILE_SIZE_ATTR_NAME)) && (!jsonObj.get(FILE_SIZE_ATTR_NAME).isJsonNull())){

this.fileSize = jsonObj.get(FILE_SIZE_ATTR_NAME).getAsLong();
}
// -------------------------------
// get tags
// -------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public CreateNewDataFilesCommand(DataverseRequest aRequest, DatasetVersion versi
this(aRequest, version, inputStream, fileName, suppliedContentType, newStorageIdentifier, quota, newCheckSum, newCheckSumType, null, null);
}

public CreateNewDataFilesCommand(DataverseRequest aRequest, DatasetVersion version, InputStream inputStream, String fileName, String suppliedContentType, String newStorageIdentifier, UploadSessionQuotaLimit quota, String newCheckSum, DataFile.ChecksumType newCheckSumType, Long newFileSize) {
this(aRequest, version, inputStream, fileName, suppliedContentType, newStorageIdentifier, quota, newCheckSum, newCheckSumType, newFileSize, null);
}

// This version of the command must be used when files are created in the
// context of creating a brand new dataset (from the Add Dataset page):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import edu.harvard.iq.dataverse.util.URLTokenUtil;
import edu.harvard.iq.dataverse.util.UrlSignerUtil;
import edu.harvard.iq.dataverse.util.json.JsonUtil;
import jakarta.json.JsonNumber;
import jakarta.json.JsonReader;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
Expand Down Expand Up @@ -284,6 +285,52 @@ private int makeDir(GlobusEndpoint endpoint, String dir) {
return result.status;
}

private Map<String, Long> lookupFileSizes(GlobusEndpoint endpoint, String dir) {
MakeRequestResponse result;

try {
logger.fine("Attempting to look up the contents of the Globus folder "+dir);
URL url = new URL(
"https://transfer.api.globusonline.org/v0.10/operation/endpoint/" + endpoint.getId()
+ "/ls?path=" + dir);
result = makeRequest(url, "Bearer", endpoint.getClientToken(), "GET", null);

switch (result.status) {
case 200:
logger.fine("Looked up directory " + dir + " successfully.");
break;
default:
logger.warning("Status " + result.status + " received when looking up dir " + dir);
logger.fine("Response: " + result.jsonResponse);
return null;
}
} catch (MalformedURLException ex) {
// Misconfiguration
logger.warning("Failed to list the contents of the directory "+ dir + " on endpoint " + endpoint.getId());
return null;
}

Map<String, Long> ret = new HashMap<>();

JsonObject listObject = JsonUtil.getJsonObject(result.jsonResponse);
JsonArray dataArray = listObject.getJsonArray("DATA");

if (dataArray != null && !dataArray.isEmpty()) {
for (int i = 0; i < dataArray.size(); i++) {
String dataType = dataArray.getJsonObject(i).getString("DATA_TYPE", null);
if (dataType != null && dataType.equals("file")) {
// is it safe to assume that any entry with a valid "DATA_TYPE": "file"
// will also have valid "name" and "size" entries?
String fileName = dataArray.getJsonObject(i).getString("name");
long fileSize = dataArray.getJsonObject(i).getJsonNumber("size").longValueExact();
ret.put(fileName, fileSize);
}
}
}

return ret;
}

private int requestPermission(GlobusEndpoint endpoint, Dataset dataset, Permissions permissions) {
Gson gson = new GsonBuilder().create();
MakeRequestResponse result = null;
Expand Down Expand Up @@ -938,9 +985,20 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut

inputList.add(fileId + "IDsplit" + fullPath + "IDsplit" + fileName);
}

Map<String, Long> fileSizeMap = null;

if (filesJsonArray.size() >= systemConfig.getGlobusBatchLookupSize()) {
// Look up the sizes of all the files in the dataset folder, to avoid
// looking them up one by one later:
// @todo: we should only be doing this if this is a managed store, probably (?)
GlobusEndpoint endpoint = getGlobusEndpoint(dataset);
fileSizeMap = lookupFileSizes(endpoint, endpoint.getBasePath());
}

// calculateMissingMetadataFields: checksum, mimetype
JsonObject newfilesJsonObject = calculateMissingMetadataFields(inputList, myLogger);

JsonArray newfilesJsonArray = newfilesJsonObject.getJsonArray("files");
logger.fine("Size: " + newfilesJsonArray.size());
logger.fine("Val: " + JsonUtil.prettyPrint(newfilesJsonArray.getJsonObject(0)));
Expand All @@ -964,20 +1022,26 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut
if (newfileJsonObject != null) {
logger.fine("List Size: " + newfileJsonObject.size());
// if (!newfileJsonObject.get(0).getString("hash").equalsIgnoreCase("null")) {
JsonPatch path = Json.createPatchBuilder()
JsonPatch patch = Json.createPatchBuilder()
.add("/md5Hash", newfileJsonObject.get(0).getString("hash")).build();
fileJsonObject = path.apply(fileJsonObject);
path = Json.createPatchBuilder()
fileJsonObject = patch.apply(fileJsonObject);
patch = Json.createPatchBuilder()
.add("/mimeType", newfileJsonObject.get(0).getString("mime")).build();
fileJsonObject = path.apply(fileJsonObject);
fileJsonObject = patch.apply(fileJsonObject);
// If we already know the size of this file on the Globus end,
// we'll pass it to /addFiles, to avoid looking up file sizes
// one by one:
if (fileSizeMap != null && fileSizeMap.get(fileId) != null) {
Long uploadedFileSize = fileSizeMap.get(fileId);
myLogger.info("Found size for file " + fileId + ": " + uploadedFileSize + " bytes");
patch = Json.createPatchBuilder()
.add("/fileSize", Json.createValue(uploadedFileSize)).build();
fileJsonObject = patch.apply(fileJsonObject);
} else {
logger.fine("No file size entry found for file "+fileId);
}
addFilesJsonData.add(fileJsonObject);
countSuccess++;
// } else {
// globusLogger.info(fileName
// + " will be skipped from adding to dataset by second API due to missing
// values ");
// countError++;
// }
} else {
myLogger.info(fileName
+ " will be skipped from adding to dataset in the final AddReplaceFileHelper.addFiles() call. ");
Expand Down Expand Up @@ -1029,7 +1093,7 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut
// The old code had 2 sec. of sleep, so ...
Thread.sleep(2000);

Response addFilesResponse = addFileHelper.addFiles(newjsonData, dataset, authUser);
Response addFilesResponse = addFileHelper.addFiles(newjsonData, dataset, authUser, true);

if (addFilesResponse == null) {
logger.info("null response from addFiles call");
Expand Down Expand Up @@ -1211,7 +1275,7 @@ private GlobusTaskState globusStatusCheck(GlobusEndpoint endpoint, String taskId
return task;
}

public JsonObject calculateMissingMetadataFields(List<String> inputList, Logger globusLogger)
private JsonObject calculateMissingMetadataFields(List<String> inputList, Logger globusLogger)
throws InterruptedException, ExecutionException, IOException {

List<CompletableFuture<FileDetailsHolder>> hashvalueCompletableFutures = inputList.stream()
Expand All @@ -1230,7 +1294,7 @@ public JsonObject calculateMissingMetadataFields(List<String> inputList, Logger
});

JsonArrayBuilder filesObject = (JsonArrayBuilder) completableFuture.get();

JsonObject output = Json.createObjectBuilder().add("files", filesObject).build();

return output;
Expand Down
Loading
Loading