diff --git a/Throttler.cpp b/Throttler.cpp index e82d8986..a5303482 100644 --- a/Throttler.cpp +++ b/Throttler.cpp @@ -20,6 +20,15 @@ const double kPeakMultiplier = 1.2; const int kBucketMultiplier = 2; const double kTimeMultiplier = 0.25; +std::shared_ptr Throttler::makeThrottler(const WdtOptions& options) { + double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB; + double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB; + double bucketLimitBytes = options.throttler_bucket_limit * kMbToB; + return Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec, + bucketLimitBytes, + options.throttler_log_time_millis); +} + std::shared_ptr Throttler::makeThrottler( double avgRateBytesPerSec, double peakRateBytesPerSec, double bucketLimitBytes, int64_t throttlerLogTimeMillis) { @@ -226,6 +235,11 @@ void Throttler::deRegisterTransfer() { refCount_--; } +double Throttler::getBytesProgress() { + folly::SpinLockGuard lock(throttlerMutex_); + return bytesProgress_; +} + double Throttler::getAvgRateBytesPerSec() { folly::SpinLockGuard lock(throttlerMutex_); return avgRateBytesPerSec_; diff --git a/Throttler.h b/Throttler.h index 3b2c6afb..40a97975 100644 --- a/Throttler.h +++ b/Throttler.h @@ -37,6 +37,9 @@ class Throttler { double avgRateBytesPerSec, double peakRateBytesPerSec, double bucketLimitBytes, int64_t throttlerLogTimeMillis); + /// Utility method that makes throttler using the wdt options + static std::shared_ptr makeThrottler(const WdtOptions& options); + /** * Sometimes the options passed to throttler might not make sense so this * method tries to auto configure them @@ -109,6 +112,9 @@ class Throttler { /// Set the throttler logging time in millis void setThrottlerLogTimeMillis(int64_t throttlerLogTimeMillis); + /// Get the bytes processed till now + double getBytesProgress(); + friend std::ostream& operator<<(std::ostream& stream, const Throttler& throttler); diff --git a/WdtBase.cpp b/WdtBase.cpp index 8de386e4..8af26dd4 100644 --- a/WdtBase.cpp +++ b/WdtBase.cpp @@ -69,7 +69,7 @@ ErrorCode WdtUri::process(const string& url) { urlPiece = StringPiece(url, WDT_URL_PREFIX.size()); if (urlPiece.empty()) { LOG(ERROR) << "Empty host name " << url; - return URI_PARSE_ERROR;; + return URI_PARSE_ERROR; } ErrorCode status = OK; // Parse hot name @@ -84,7 +84,7 @@ ErrorCode WdtUri::process(const string& url) { urlPiece.advance(hostNameEnd + 1); } else { size_t urlIndex = 0; - for (;urlIndex < urlPiece.size(); ++urlIndex) { + for (; urlIndex < urlPiece.size(); ++urlIndex) { if (urlPiece[urlIndex] == ':') { break; } @@ -116,7 +116,7 @@ ErrorCode WdtUri::process(const string& url) { string portStr; portStr.assign(urlPiece.data(), 0, paramsIndex); port_ = folly::to(portStr); - } catch(std::exception& e) { + } catch (std::exception& e) { LOG(ERROR) << "Invalid port, can't be parsed " << url; status = URI_PARSE_ERROR; } @@ -124,7 +124,7 @@ ErrorCode WdtUri::process(const string& url) { } if (urlPiece.empty()) { - return status;; + return status; } if (urlPiece[0] != '?') { @@ -132,7 +132,7 @@ ErrorCode WdtUri::process(const string& url) { return URI_PARSE_ERROR; } urlPiece.advance(1); - //parse params + // parse params while (!urlPiece.empty()) { StringPiece keyValuePair = urlPiece.split_step('&'); if (keyValuePair.empty()) { @@ -431,12 +431,7 @@ void WdtBase::configureThrottler() { WDT_CHECK(!throttler_); VLOG(1) << "Configuring throttler options"; const auto& options = WdtOptions::get(); - double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB; - double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB; - double bucketLimitBytes = options.throttler_bucket_limit * kMbToB; - throttler_ = Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec, - bucketLimitBytes, - options.throttler_log_time_millis); + throttler_ = Throttler::makeThrottler(options); if (throttler_) { LOG(INFO) << "Enabling throttling " << *throttler_; } else { diff --git a/WdtResourceController.cpp b/WdtResourceController.cpp index 78264636..bb8045ce 100644 --- a/WdtResourceController.cpp +++ b/WdtResourceController.cpp @@ -30,7 +30,10 @@ void WdtControllerBase::updateMaxSendersLimit(int64_t maxNumSenders) { void WdtControllerBase::setThrottler(shared_ptr throttler) { throttler_ = throttler; - LOG(INFO) << "Set the throttler for " << controllerName_ << " " << *throttler; + if (throttler) { + LOG(INFO) << "Set the throttler for " << controllerName_ << " " + << *throttler; + } } shared_ptr WdtControllerBase::getThrottler() const {