Skip to content

Commit

Permalink
Client change for using request priority to enable flow control
Browse files Browse the repository at this point in the history
In this stage, client needs to do 3 things:
1. Always create flow control callable regardless of client flag
2. Make sure the entire callable behave like no-op if RateLimitInfo is not present.
3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag.

Meanwhile, setting client flag would still set the feature flag.

On server side, we'll compute and return RateLimitInfo if server side sees priority is low.
  • Loading branch information
kongweihan committed Sep 11, 2023
1 parent 7cc8a28 commit f29295e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -728,19 +728,19 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable =
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> statsHeader =
new StatsHeadersServerStreamingCallable<>(base);

if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
callable = new RateLimitingServerStreamingCallable(callable);
}
// Always create this callable because flow control will be enabled by the presence of RateLimitInfo, not the client flag
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> flowControl = new RateLimitingServerStreamingCallable(
statsHeader);

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(callable);
new ConvertExceptionCallable<>(flowControl);

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() {
this.setTransportChannelProvider(channelProviderBuilder.build());
}

// Will be deprecated once we migrate flow control user to use request priority
if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
// only set mutate rows feature flag when this feature is enabled
featureFlags.setMutateRowsRateLimit(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -64,6 +65,10 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

// Disabled by default, enabled if RateLimitInfo is present, which is set on server side when
// feature flag is present or low request priority is used.
private volatile boolean rateLimitEnabled = false;

private final RateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
Expand All @@ -81,32 +86,27 @@ public void call(
MutateRowsRequest request,
ResponseObserver<MutateRowsResponse> responseObserver,
ApiCallContext context) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (rateLimitEnabled) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
Expand All @@ -116,7 +116,11 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(MutateRowsResponse response) {
// Must not limit rate if RateLimitInfo is not present.
// Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag
// setting.
if (response.hasRateLimitInfo()) {
rateLimitEnabled = true;
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
Expand All @@ -126,6 +130,9 @@ protected void onResponseImpl(MutateRowsResponse response) {
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
} else {
// Disable in case customer switched from low to higher priorities.
rateLimitEnabled = false;
}
}

Expand Down

0 comments on commit f29295e

Please sign in to comment.