diff --git a/CMakeLists.txt b/CMakeLists.txt index e5297b2f..c4b6c5ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2) # There is no C per se in WDT but if you use CXX only here many checks fail # Version is Major.Minor.YYMMDDX for up to 10 releases per day # Minor currently is also the protocol version - has to match with Protocol.cpp -project("WDT" LANGUAGES C CXX VERSION 1.20.1509240) +project("WDT" LANGUAGES C CXX VERSION 1.20.1509241) # On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2) set(CMAKE_CXX_STANDARD 11) @@ -297,4 +297,7 @@ if (BUILD_TESTING) add_test(NAME WdtFileListTest COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/wdt_file_list_test.py") + add_test(NAME WdtOverwriteTest COMMAND + "${CMAKE_CURRENT_SOURCE_DIR}/wdt_overwrite_test.py") + endif(BUILD_TESTING) diff --git a/FileCreator.cpp b/FileCreator.cpp index 4b557b25..9ed63b9e 100644 --- a/FileCreator.cpp +++ b/FileCreator.cpp @@ -55,7 +55,14 @@ bool FileCreator::setFileSize(int fd, int64_t fileSize) { int FileCreator::openAndSetSize(BlockDetails const *blockDetails) { const auto &options = WdtOptions::get(); - int fd = createFile(blockDetails->fileName); + int fd; + const bool doCreate = blockDetails->allocationStatus == NOT_EXISTS; + const bool isTooLarge = (blockDetails->allocationStatus == EXISTS_TOO_LARGE); + if (doCreate) { + fd = createFile(blockDetails->fileName); + } else { + fd = openExistingFile(blockDetails->fileName); + } if (fd < 0) { return -1; } @@ -67,14 +74,13 @@ int FileCreator::openAndSetSize(BlockDetails const *blockDetails) { return -1; } if (options.isLogBasedResumption()) { - if (blockDetails->allocationStatus == EXISTS_TOO_LARGE) { + if (isTooLarge) { LOG(WARNING) << "File size smaller in the sender side " << blockDetails->fileName << ", marking previous transferred chunks as invalid"; transferLogManager_.addInvalidationEntry(blockDetails->prevSeqId); } - if (blockDetails->allocationStatus == EXISTS_TOO_LARGE || - blockDetails->allocationStatus == NOT_EXISTS) { + if (isTooLarge || doCreate) { transferLogManager_.addFileCreationEntry( blockDetails->fileName, blockDetails->seqId, blockDetails->fileSize); } else { @@ -147,11 +153,33 @@ int FileCreator::openForBlocks(int threadIndex, return -1; } } - return createFile(blockDetails->fileName); + return openExistingFile(blockDetails->fileName); } using std::string; +int FileCreator::openExistingFile(const string &relPathStr) { + // This should have been validated earlier and errored out + // instead of crashing here + WDT_CHECK(!relPathStr.empty()); + WDT_CHECK(relPathStr[0] != '/'); + WDT_CHECK(relPathStr.back() != '/'); + + string path(rootDir_); + path.append(relPathStr); + + int openFlags = O_WRONLY; + START_PERF_TIMER + int res = open(path.c_str(), openFlags, 0644); + RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN) + if (res < 0) { + PLOG(ERROR) << "failed opening file " << path; + return -1; + } + VLOG(1) << "successfully opened file " << path; + return res; +} + int FileCreator::createFile(const string &relPathStr) { CHECK(!relPathStr.empty()); CHECK(relPathStr[0] != '/'); @@ -167,19 +195,39 @@ int FileCreator::createFile(const string &relPathStr) { std::string dir; if (p) { dir.assign(relPathStr.data(), p); - if (!createDirRecursively(dir)) { + START_PERF_TIMER + const bool dirSuccess1 = createDirRecursively(dir); + RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE) + if (!dirSuccess1) { // retry with force LOG(ERROR) << "failed to create dir " << dir << " recursively, " << "trying to force directory creation"; - if (!createDirRecursively(dir, true /* force */)) { + START_PERF_TIMER + const bool dirSuccess2 = createDirRecursively(dir, true /* force */); + RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE) + if (!dirSuccess2) { LOG(ERROR) << "failed to create dir " << dir << " recursively"; return -1; } } } int openFlags = O_CREAT | O_WRONLY; + auto &options = WdtOptions::get(); + // When doing download resumption we sometime open files that do already + // exist and we need to overwrite them anyway (files which have been + // discarded from the log for some reason) + if (options.overwrite || options.enable_download_resumption) { + // Make sure file size resumption will not get messed up if we + // expect to create this file + openFlags |= O_TRUNC; + } else { + // Make sure open will fail if we don't allow overwriting and + // the file happens to already exist + openFlags |= O_EXCL; + } START_PERF_TIMER int res = open(path.c_str(), openFlags, 0644); + RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN) if (res < 0) { if (dir.empty()) { PLOG(ERROR) << "failed creating file " << path; @@ -187,19 +235,22 @@ int FileCreator::createFile(const string &relPathStr) { } PLOG(ERROR) << "failed creating file " << path << ", trying to " << "force directory creation"; - if (!createDirRecursively(dir, true /* force */)) { - LOG(ERROR) << "failed to create dir " << dir << " recursively"; - return -1; + { + START_PERF_TIMER + const bool dirSuccess = createDirRecursively(dir, true /* force */); + RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE) + if (!dirSuccess) { + LOG(ERROR) << "failed to create dir " << dir << " recursively"; + return -1; + } } START_PERF_TIMER res = open(path.c_str(), openFlags, 0644); + RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN) if (res < 0) { PLOG(ERROR) << "failed creating file " << path; return -1; } - RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN) - } else { - RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN) } VLOG(1) << "successfully created file " << path; return res; @@ -210,7 +261,7 @@ bool FileCreator::createDirRecursively(const std::string dir, bool force) { return true; } - CHECK(dir.back() == '/'); + WDT_CHECK(dir.back() == '/'); int64_t lastIndex = dir.size() - 1; while (lastIndex > 0 && dir[lastIndex - 1] != '/') { diff --git a/FileCreator.h b/FileCreator.h index 6fb7bfd3..33af2836 100644 --- a/FileCreator.h +++ b/FileCreator.h @@ -57,17 +57,6 @@ class FileCreator { delete[] threadConditionVariables_; } - /** - * Opens the file and sets its size. If the existing file size is greater than - * required size, the file is truncated using ftruncate. Space is - * allocated using posix_fallocate. - * - * @param blockDetails block-details - * - * @return file descriptor in case of success, -1 otherwise - */ - int openAndSetSize(BlockDetails const *blockDetails); - /** * This is used to open the file in block mode. If the current thread is the * first one to try to open the file, then it allocates space using @@ -94,18 +83,34 @@ class FileCreator { } private: + /** + * Opens the file and sets its size. If the existing file size is greater than + * required size, the file is truncated using ftruncate. Space is + * allocated using posix_fallocate. + * + * @param blockDetails block-details + * + * @return file descriptor in case of success, -1 otherwise + */ + int openAndSetSize(BlockDetails const *blockDetails); + /** * Create a file and open for writing, recursively create subdirs. * Subdirs are only created once due to createdDirs_ cache, but * if an open fails where we assumed the directory already exists * based on cache, we try creating the dir and open again before - * failing. + * failing. Will not overwrite existing files unless overwrite option + * is set. * * @param relPath path relative to root dir * * @return file descriptor or -1 on error */ int createFile(const std::string &relPath); + /** + * Open existing file + */ + int openExistingFile(const std::string &relPath); /** * sets the size of the file. If the size is greater then the diff --git a/FileWriter.cpp b/FileWriter.cpp index 9e371793..a6aa80b1 100644 --- a/FileWriter.cpp +++ b/FileWriter.cpp @@ -24,21 +24,15 @@ ErrorCode FileWriter::open() { if (options.skip_writes) { return OK; } - if (blockDetails_->fileSize == blockDetails_->dataSize) { - // single block file - WDT_CHECK(blockDetails_->offset == 0); - fd_ = fileCreator_->openAndSetSize(blockDetails_); - } else { - // multi block file - fd_ = fileCreator_->openForBlocks(threadIndex_, blockDetails_); - if (fd_ >= 0 && blockDetails_->offset > 0) { - START_PERF_TIMER - if (lseek(fd_, blockDetails_->offset, SEEK_SET) < 0) { - PLOG(ERROR) << "Unable to seek " << blockDetails_->fileName; - close(); - } else { - RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK) - } + // TODO: consider a working optimization for small files + fd_ = fileCreator_->openForBlocks(threadIndex_, blockDetails_); + if (fd_ >= 0 && blockDetails_->offset > 0) { + START_PERF_TIMER + const int ret = lseek(fd_, blockDetails_->offset, SEEK_SET); + RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK); + if (ret < 0) { + PLOG(ERROR) << "Unable to seek " << blockDetails_->fileName; + close(); } } if (fd_ == -1) { diff --git a/README.md b/README.md index f5193b55..7bc76976 100644 --- a/README.md +++ b/README.md @@ -274,7 +274,7 @@ Make sure to do the following, before "arc diff": fbconfig --clang --with-project-version clang:dev -r wdt - fbmake runtests --extended-tests + fbmake runtests --run-disabled --extended-tests fbmake runtests_opt fbmake opt diff --git a/Reporting.cpp b/Reporting.cpp index c516609c..f4301067 100644 --- a/Reporting.cpp +++ b/Reporting.cpp @@ -246,9 +246,12 @@ void ProgressReporter::logProgress(int64_t effectiveDataBytes, int progress, folly::ThreadLocalPtr perfStatReport; const std::string PerfStatReport::statTypeDescription_[] = { - "Socket Read", "Socket Write", "File Open", "File Close", - "File Read", "File Write", "Sync File Range", "fsync", - "File Seek", "Throttler Sleep", "Receiver Wait Sleep"}; + "Socket Read", "Socket Write", + "File Open", "File Close", + "File Read", "File Write", + "Sync File Range", "fsync", + "File Seek", "Throttler Sleep", + "Receiver Wait Sleep", "Directory creation"}; PerfStatReport::PerfStatReport() { static_assert( @@ -363,7 +366,7 @@ std::ostream& operator<<(std::ostream& os, const PerfStatReport& statReport) { os << "p95 " << time << " "; } if (p99Count > runningCount && p99Count <= runningCount + count) { - os << "p99 " << time << "\n"; + os << "p99 " << time; } runningCount += count; @@ -373,7 +376,7 @@ std::ostream& operator<<(std::ostream& os, const PerfStatReport& statReport) { } buckets[currentBucketIndex] += count; } - + os << '\n'; for (int i = 0; i < numBuckets; i++) { if (buckets[i] == 0) { continue; diff --git a/Reporting.h b/Reporting.h index f028ea7c..dfc2abf5 100644 --- a/Reporting.h +++ b/Reporting.h @@ -476,6 +476,7 @@ class PerfStatReport { RECEIVER_WAIT_SLEEP, // receiver sleep duration between sending wait cmd to // sender. A high sum for this suggests threads // were not properly load balanced + DIRECTORY_CREATE, END }; diff --git a/WdtBase.cpp b/WdtBase.cpp index 8af26dd4..6a953502 100644 --- a/WdtBase.cpp +++ b/WdtBase.cpp @@ -286,7 +286,7 @@ string WdtTransferRequest::generateUrl(bool genFull) const { wdtUri.setQueryParam(TRANSFER_ID_PARAM, transferId); wdtUri.setQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM, folly::to(protocolVersion)); - const auto &options = WdtOptions::get(); + const auto& options = WdtOptions::get(); if (options.url_backward_compatibility) { wdtUri.setQueryParam(LEGACY_PROTOCOL_VERSION_PARAM, folly::to(LEGACY_PROTCOL_VERSION)); @@ -313,7 +313,7 @@ void WdtTransferRequest::serializePorts(WdtUri& wdtUri) const { } prevPort = ports[i]; } - const auto &options = WdtOptions::get(); + const auto& options = WdtOptions::get(); if (hasHoles || options.url_backward_compatibility) { wdtUri.setQueryParam(PORTS_PARAM, getSerializedPortsList()); } else { diff --git a/WdtConfig.h b/WdtConfig.h index 3ebeed91..0cd42788 100644 --- a/WdtConfig.h +++ b/WdtConfig.h @@ -8,9 +8,9 @@ #define WDT_VERSION_MAJOR 1 #define WDT_VERSION_MINOR 20 -#define WDT_VERSION_BUILD 1509240 +#define WDT_VERSION_BUILD 1509241 // Add -fbcode to version str -#define WDT_VERSION_STR "1.20.1509240-fbcode" +#define WDT_VERSION_STR "1.20.1509241-fbcode" // Tie minor and proto version #define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR diff --git a/WdtFlags.cpp.inc b/WdtFlags.cpp.inc index 3bef79ca..a0c6b4ce 100644 --- a/WdtFlags.cpp.inc +++ b/WdtFlags.cpp.inc @@ -125,7 +125,7 @@ WDT_OPT(namespace_receiver_limit, int32, "A value of zero disables limits"); #ifdef WDT_SUPPORTS_ODIRECT -WDT_OPT(odirect_reads, bool, +WDT_OPT(odirect_reads, bool, "Wdt can read files in O_DIRECT mode, set this flag to true" " to make sender read all files in O_DIRECT"); #else @@ -150,3 +150,4 @@ WDT_OPT(open_files_during_discovery, bool, "If true, files are opened when they are discovered"); WDT_OPT(url_backward_compatibility, bool, "If true, we send url that works with older version(<19)"); +WDT_OPT(overwrite, bool, "Allow the receiver to overwrite existing files"); diff --git a/WdtOptions.cpp b/WdtOptions.cpp index f0bce52c..b977dc30 100644 --- a/WdtOptions.cpp +++ b/WdtOptions.cpp @@ -11,9 +11,20 @@ namespace facebook { namespace wdt { -#define CHANGE_IF_NOT_SPECIFIED(option, specifiedOptions, value) \ - if (specifiedOptions.find(#option) == specifiedOptions.end()) { \ - option = value; \ +/** + * Macro to change the default of some flags based on some other flag + * Example of usage: + * if (enable_download_resumption) { + * CHANGE_IF_NOT_SPECIFIED(overwrite, userSpecifiedOptions, true, + * "(download resumption)") + * } + */ +#define CHANGE_IF_NOT_SPECIFIED(option, specifiedOptions, value, msg) \ + if (specifiedOptions.find(#option) == specifiedOptions.end()) { \ + LOG(INFO) << "Setting " << #option << " to " << value << " " << msg; \ + option = value; \ + } else { \ + LOG(INFO) << "Not overwriting user specified " << #option << " " << msg; \ } const std::string WdtOptions::FLASH_OPTION_TYPE = "flash"; @@ -23,10 +34,13 @@ void WdtOptions::modifyOptions( const std::string& optionType, const std::set& userSpecifiedOptions) { if (optionType == DISK_OPTION_TYPE) { - CHANGE_IF_NOT_SPECIFIED(num_ports, userSpecifiedOptions, 1) - CHANGE_IF_NOT_SPECIFIED(block_size_mbytes, userSpecifiedOptions, -1) - CHANGE_IF_NOT_SPECIFIED(disable_preallocation, userSpecifiedOptions, true) - CHANGE_IF_NOT_SPECIFIED(resume_using_dir_tree, userSpecifiedOptions, true) + std::string msg("(disk option type)"); + CHANGE_IF_NOT_SPECIFIED(num_ports, userSpecifiedOptions, 1, msg) + CHANGE_IF_NOT_SPECIFIED(block_size_mbytes, userSpecifiedOptions, -1, msg) + CHANGE_IF_NOT_SPECIFIED(disable_preallocation, userSpecifiedOptions, true, + msg) + CHANGE_IF_NOT_SPECIFIED(resume_using_dir_tree, userSpecifiedOptions, true, + msg) return; } if (optionType != FLASH_OPTION_TYPE) { diff --git a/WdtOptions.h b/WdtOptions.h index c19959cb..2ff2124f 100644 --- a/WdtOptions.h +++ b/WdtOptions.h @@ -300,6 +300,11 @@ class WdtOptions { */ bool url_backward_compatibility{false}; + /** + * If true, wdt can overwrite existing files + */ + bool overwrite{false}; + /** * @return whether files should be pre-allocated or not */ diff --git a/common_functions.sh b/common_functions.sh index c7f77a5e..0b383530 100644 --- a/common_functions.sh +++ b/common_functions.sh @@ -8,7 +8,7 @@ acquireIptableLock() { if [ $STATUS -eq 0 ]; then break fi - echo "Failed to get iptable lock" + echo "Failed to get iptable lock $IPTABLE_LOCK_FILE" done } @@ -104,6 +104,9 @@ printServerLog() { wdtExit() { undoLastIpTableChange + if [ $1 -ne 0 ] ; then + echo "Failing test $0 : Test#${TEST_COUNT} - Logs in $DIR" + fi exit $1 } @@ -225,7 +228,7 @@ verifyTransferAndCleanup() { # treating PROTOCOL_ERROR as errors grep "PROTOCOL_ERROR" "$DIR/server${TEST_COUNT}.log" > /dev/null && STATUS=1 grep "PROTOCOL_ERROR" "$DIR/client${TEST_COUNT}.log" > /dev/null && STATUS=1 - + if [ $STATUS -eq 0 ] ; then echo "Test $TEST_COUNT succeeded" removeDestination diff --git a/common_utils.py b/common_utils.py index f1a4c9d9..c79994af 100644 --- a/common_utils.py +++ b/common_utils.py @@ -17,11 +17,14 @@ def start_receiver(receiver_cmd, root_dir, test_count): stdout=subprocess.PIPE, stderr=open(server_log, 'w')) connection_url = receiver_process.stdout.readline().strip() + if not connection_url: + print("ERR: Unable to get the connection url from receiver!") return (receiver_process, connection_url) def run_sender(sender_cmd, root_dir, test_count): - sender_cmd = sender_cmd + " 2>&1 | tee {0}/client{1}.log".format( - root_dir, test_count) + # TODO: fix this to not use tee, this is python... + sender_cmd = "sh -c \"set -o pipefail; " + sender_cmd \ + + " 2>&1 | tee {0}/client{1}.log\"".format(root_dir, test_count) print("Sender: " + sender_cmd) return os.system(sender_cmd) diff --git a/wdt_network_test.sh b/wdt_network_test.sh index 9bd13826..823b7f42 100755 --- a/wdt_network_test.sh +++ b/wdt_network_test.sh @@ -58,7 +58,7 @@ ERROR_COUNT=25 TEST_COUNT=0 WDTBIN_BASE="_bin/wdt/wdt --transfer_id $$" -WDTBIN_OPTS="-ipv6 -start_port=$STARTING_PORT \ +WDTBIN_OPTS="-enable_perf_stat_collection -ipv6 -start_port=$STARTING_PORT \ -avg_mbytes_per_sec=60 -max_mbytes_per_sec=65 -run_as_daemon=false \ -full_reporting -read_timeout_millis=495 -write_timeout_millis=495 \ -progress_report_interval_millis=-1 -abort_check_interval_millis=100 \ diff --git a/wdt_overwrite_test.py b/wdt_overwrite_test.py new file mode 100755 index 00000000..74d02da1 --- /dev/null +++ b/wdt_overwrite_test.py @@ -0,0 +1,68 @@ +#! /usr/bin/env python + +import os +from common_utils import * + +def run_test(name, receiver_extra_flags, sender_extra_flags): + global wdtbin, test_count, root_dir + print("{0}. Testing {1}".format(test_count, name)) + receiver_cmd = "{0} -start_port 0 {1}".format(wdtbin, receiver_extra_flags) + (receiver_process, connection_url) = start_receiver( + receiver_cmd, root_dir, test_count) + sender_cmd = ("{0} -connection_url \'{1}\' {2}").format( + wdtbin, connection_url, sender_extra_flags) + sender_status = run_sender(sender_cmd, root_dir, test_count) + print("status for sender {0}".format(sender_status)) + receiver_status = receiver_process.wait() + print("status for receiver {0}".format(receiver_status)) + test_count += 1 + return sender_status, receiver_status + +def error(what): + global broken + print("ERR {0}".format(what)) + broken += 1 + +wdtbin = os.getcwd() + "/_bin/wdt/wdt" # todo this should handled in common ! + +root_dir = create_test_directory("/tmp") +data_dir = root_dir + "dir1" +create_directory(data_dir) +os.chdir(data_dir) +fname = "existingfile" +f = open(fname, "w") +f.write("existing data") +f.close() +mtime = os.stat(fname).st_mtime +print("mtime is {0}".format(mtime)) + +test_count = 1 + +sender_st, rec_st = run_test("default - should fail", "", "") + +broken = 0 +if sender_st == 0: + error("sender should have failed") +if rec_st == 0: + error("receiver should have failed") +newmtime = os.stat(fname).st_mtime +if newmtime != mtime: + error("file shouldn't have changed {0} -> {1}".format(mtime, newmtime)) + +mtime = newmtime + +sender_st, rec_st = run_test("with -overwrite should succeed", "-overwrite", "") + +if sender_st != 0: + error("sender should have worked") +if rec_st != 0: + error("receiver should have worked") +newmtime = os.stat(fname).st_mtime +if newmtime == mtime: + error("file should have changed from {0}".format(mtime)) + +print("Total issues {0}".format(broken)) +if not broken: + print("Good run, deleting logs in " + root_dir) + shutil.rmtree(root_dir) +exit(broken)