diff --git a/modules/commons/src/main/java/org/apache/synapse/commons/vfs/VFSConstants.java b/modules/commons/src/main/java/org/apache/synapse/commons/vfs/VFSConstants.java index 57b947b7ab..ffe13214ad 100644 --- a/modules/commons/src/main/java/org/apache/synapse/commons/vfs/VFSConstants.java +++ b/modules/commons/src/main/java/org/apache/synapse/commons/vfs/VFSConstants.java @@ -47,6 +47,9 @@ public final class VFSConstants { public static final String TRANSPORT_FILE_LOCKING_ENABLED = "enable"; public static final String TRANSPORT_FILE_LOCKING_DISABLED = "disable"; + public static final String TRANSPORT_CHECK_SIZE_INTERVAL = "transport.vfs.CheckSizeInterval"; + public static final String TRANSPORT_CHECK_SIZE_IGNORE_EMPTY = "transport.vfs.CheckSizeIgnoreEmpty"; + /** * This parameter is used decide whether the resolving hostname IP of URIs are done at deployment or dynamically. * At usage default id 'false' which lead hostname resolution at deployment diff --git a/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java b/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java index 8953838fa1..c9c94b58c3 100644 --- a/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java +++ b/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java @@ -80,6 +80,12 @@ public class PollTableEntry extends AbstractPollTableEntry { /** moved file will have this formatted timestamp prefix */ private DateFormat moveTimestampFormat; + /** containing the time in [ms] between the size check on files (to avoid reading files which are currently written) */ + private String checkSizeInterval = null; + /** does the checkSize Lock mechanisme take empty files or not, default = false */ + private String checkSizeIgnoreEmpty = "false"; + + private boolean streaming; private int maxRetryCount; @@ -311,6 +317,30 @@ private void setMoveAfterFailure(String moveAfterFailure) throws AxisFault { } } + public void setCheckSizeInterval(String checkSizeInterval) { + this.checkSizeInterval = checkSizeInterval; + } + + public String getCheckSizeInterval() { + return checkSizeInterval; + } + + public boolean hasCheckSizeInterval() { + return (checkSizeInterval != null && checkSizeInterval.length() > 0); + } + + public void setCheckSizeIgnoreEmpty(String checkSizeIgnoreEmpty) { + this.checkSizeIgnoreEmpty = checkSizeIgnoreEmpty; + } + + public String getCheckSizeIgnoreEmpty() { + return checkSizeIgnoreEmpty; + } + + public boolean isCheckSizeIgnoreEmpty() { + return "true".equals(checkSizeIgnoreEmpty); + } + public boolean isStreaming() { return streaming; } @@ -609,6 +639,17 @@ protected boolean loadConfigurationsFromService(ParameterInclude params) throws Map schemeFileOptions = VFSUtils.parseSchemeFileOptions(fileURI, params); setVfsSchemeProperties(schemeFileOptions); + //get check size intervall for locking + String checkSizeIntervalString = ParamUtils.getOptionalParam(params, VFSConstants.TRANSPORT_CHECK_SIZE_INTERVAL); + setCheckSizeInterval(checkSizeIntervalString); + + //get check size ignore emtpy for locking + String checkSizeIgnoreEmptyString = ParamUtils.getOptionalParam(params, VFSConstants.TRANSPORT_CHECK_SIZE_IGNORE_EMPTY); + //set parameter only if it is set, default value is true + if (checkSizeIgnoreEmptyString != null) { + setCheckSizeIgnoreEmpty(checkSizeIgnoreEmptyString); + } + String strStreaming = ParamUtils.getOptionalParam(params, VFSConstants.STREAMING); if (strStreaming != null) { streaming = Boolean.parseBoolean(strStreaming); diff --git a/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java b/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java index 5e799166e9..693d64a5f3 100644 --- a/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java +++ b/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java @@ -18,7 +18,6 @@ */ package org.apache.synapse.transport.vfs; -import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; @@ -41,6 +40,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.input.AutoCloseInputStream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.vfs2.FileContent; import org.apache.commons.vfs2.FileNotFolderException; import org.apache.commons.vfs2.FileNotFoundException; @@ -65,6 +65,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.security.MessageDigest; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Comparator; @@ -140,6 +141,8 @@ public class VFSTransportListener extends AbstractPollingTransportListener 0) { + complete.update(buffer, 0, numRead); + } + } while (numRead != -1); + fis.close(); + return complete.digest(); + } + + private boolean isFileStillUploading(PollTableEntry entry, FileObject child) { + if(!entry.isCheckSizeIgnoreEmpty() && !entry.hasCheckSizeInterval()) { + //CheckEmpty and CheckSize are not active - return false (file is not uploading) + return false; + } + InputStream inputStream = null; + try { + //get first MD5 + log.debug("Create MD5 Checksum of File: "+VFSUtils.maskURLPassword(child.getName().toString())); + inputStream = child.getContent().getInputStream(); + String md5 = getMD5Checksum(inputStream); + return isFileEmpty(entry, md5) || isFileStillChangingSize(entry, child, md5); + } catch (Exception e) { + try { + if (ExceptionUtils.getStackTrace(e).contains("The file is being used by another process")) { + return true; + } + } catch (Exception e2) { + } + //return true when any exception occurs + return true; + } finally { + if(inputStream != null){ + try { + inputStream.close(); + } catch (Exception ex) { + } + } + } + } }