Skip to content

Commit

Permalink
Adding update of throttler to wdt service
Browse files Browse the repository at this point in the history
Summary: At times we might wanna update throttler in a running service. This is a temporary thing till we move to dynamic flags

Reviewed By: @ldemailly

Differential Revision: D2471488
  • Loading branch information
nikunjy authored and ldemailly committed Sep 25, 2015
1 parent 7d99005 commit 1607bd3
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
14 changes: 14 additions & 0 deletions Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ const double kPeakMultiplier = 1.2;
const int kBucketMultiplier = 2;
const double kTimeMultiplier = 0.25;

std::shared_ptr<Throttler> Throttler::makeThrottler(const WdtOptions& options) {
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
return Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes,
options.throttler_log_time_millis);
}

std::shared_ptr<Throttler> Throttler::makeThrottler(
double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis) {
Expand Down Expand Up @@ -226,6 +235,11 @@ void Throttler::deRegisterTransfer() {
refCount_--;
}

double Throttler::getBytesProgress() {
folly::SpinLockGuard lock(throttlerMutex_);
return bytesProgress_;
}

double Throttler::getAvgRateBytesPerSec() {
folly::SpinLockGuard lock(throttlerMutex_);
return avgRateBytesPerSec_;
Expand Down
6 changes: 6 additions & 0 deletions Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class Throttler {
double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis);

/// Utility method that makes throttler using the wdt options
static std::shared_ptr<Throttler> makeThrottler(const WdtOptions& options);

/**
* Sometimes the options passed to throttler might not make sense so this
* method tries to auto configure them
Expand Down Expand Up @@ -109,6 +112,9 @@ class Throttler {
/// Set the throttler logging time in millis
void setThrottlerLogTimeMillis(int64_t throttlerLogTimeMillis);

/// Get the bytes processed till now
double getBytesProgress();

friend std::ostream& operator<<(std::ostream& stream,
const Throttler& throttler);

Expand Down
17 changes: 6 additions & 11 deletions WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ErrorCode WdtUri::process(const string& url) {
urlPiece = StringPiece(url, WDT_URL_PREFIX.size());
if (urlPiece.empty()) {
LOG(ERROR) << "Empty host name " << url;
return URI_PARSE_ERROR;;
return URI_PARSE_ERROR;
}
ErrorCode status = OK;
// Parse hot name
Expand All @@ -84,7 +84,7 @@ ErrorCode WdtUri::process(const string& url) {
urlPiece.advance(hostNameEnd + 1);
} else {
size_t urlIndex = 0;
for (;urlIndex < urlPiece.size(); ++urlIndex) {
for (; urlIndex < urlPiece.size(); ++urlIndex) {
if (urlPiece[urlIndex] == ':') {
break;
}
Expand Down Expand Up @@ -116,23 +116,23 @@ ErrorCode WdtUri::process(const string& url) {
string portStr;
portStr.assign(urlPiece.data(), 0, paramsIndex);
port_ = folly::to<int32_t>(portStr);
} catch(std::exception& e) {
} catch (std::exception& e) {
LOG(ERROR) << "Invalid port, can't be parsed " << url;
status = URI_PARSE_ERROR;
}
urlPiece.advance(paramsIndex);
}

if (urlPiece.empty()) {
return status;;
return status;
}

if (urlPiece[0] != '?') {
LOG(ERROR) << "Unexpected delimiter for params " << urlPiece[0];
return URI_PARSE_ERROR;
}
urlPiece.advance(1);
//parse params
// parse params
while (!urlPiece.empty()) {
StringPiece keyValuePair = urlPiece.split_step('&');
if (keyValuePair.empty()) {
Expand Down Expand Up @@ -431,12 +431,7 @@ void WdtBase::configureThrottler() {
WDT_CHECK(!throttler_);
VLOG(1) << "Configuring throttler options";
const auto& options = WdtOptions::get();
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
throttler_ = Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes,
options.throttler_log_time_millis);
throttler_ = Throttler::makeThrottler(options);
if (throttler_) {
LOG(INFO) << "Enabling throttling " << *throttler_;
} else {
Expand Down
5 changes: 4 additions & 1 deletion WdtResourceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ void WdtControllerBase::updateMaxSendersLimit(int64_t maxNumSenders) {

void WdtControllerBase::setThrottler(shared_ptr<Throttler> throttler) {
throttler_ = throttler;
LOG(INFO) << "Set the throttler for " << controllerName_ << " " << *throttler;
if (throttler) {
LOG(INFO) << "Set the throttler for " << controllerName_ << " "
<< *throttler;
}
}

shared_ptr<Throttler> WdtControllerBase::getThrottler() const {
Expand Down

0 comments on commit 1607bd3

Please sign in to comment.