Skip to content

Commit

Permalink
feat: client sends routing cookie back to server (#1888)
Browse files Browse the repository at this point in the history
* feat: client sends retry cookie back to server

* udpate to use trailer instead of error info

* updating the header name

* address some comments

* udpate

* update tests and handling of retry cookie

* address comments

* address comments

* add cookie to readChangeStream

* also check headers and add a test

* simplify code

* clean up test

* clean up test

* update dependency

* test

* move MetadataSubject to a separate file

* add the file

* add license

* address comments

* close client

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mutianf and gcf-owl-bot[bot] authored Nov 27, 2023
1 parent 0827252 commit 4c73abd
Show file tree
Hide file tree
Showing 8 changed files with 924 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.26.0')
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import io.grpc.CallOptions;
import io.grpc.Metadata;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/** A cookie that holds information for retry or routing */
class CookiesHolder {

static final CallOptions.Key<CookiesHolder> COOKIES_HOLDER_KEY =
CallOptions.Key.create("bigtable-cookies");

/** Routing cookie key prefix. */
static final String COOKIE_KEY_PREFIX = "x-goog-cbt-cookie";

/** A map that stores all the routing cookies. */
private final Map<Metadata.Key<String>, String> cookies = new HashMap<>();

/** Returns CookiesHolder if presents in CallOptions, otherwise returns null. */
@Nullable
static CookiesHolder fromCallOptions(CallOptions options) {
// CookiesHolder should be added by CookiesServerStreamingCallable and
// CookiesUnaryCallable for most methods. However, methods like PingAndWarm
// doesn't support routing cookie, in which case this will return null.
return options.getOption(COOKIES_HOLDER_KEY);
}

/** Add all the routing cookies to headers if any. */
Metadata injectCookiesInRequestHeaders(Metadata headers) {
for (Metadata.Key<String> key : cookies.keySet()) {
headers.put(key, cookies.get(key));
}
return headers;
}

/**
* Iterate through all the keys in initial or trailing metadata, and add all the keys that match
* COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial
* metadata for the same keys.
*/
void extractCookiesFromMetadata(@Nullable Metadata trailers) {
if (trailers == null) {
return;
}
for (String key : trailers.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
cookies.put(metadataKey, value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A cookie interceptor that checks the cookie value from returned trailer, updates the cookie
* holder, and inject it in the header of the next request.
*/
class CookiesInterceptor implements ClientInterceptor {

private static final Logger LOG = Logger.getLogger(CookiesInterceptor.class.getName());

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Gets the CookiesHolder added from CookiesServerStreamingCallable and
// CookiesUnaryCallable.
// Add CookiesHolder content to request headers if there's any.
try {
CookiesHolder cookie = CookiesHolder.fromCallOptions(callOptions);
if (cookie != null) {
headers = cookie.injectCookiesInRequestHeaders(headers);
responseListener = new UpdateCookieListener<>(responseListener, cookie);
}
} catch (Throwable e) {
LOG.warning("Failed to inject cookie to request headers: " + e);
} finally {
super.start(responseListener, headers);
}
}
};
}

/** Add headers and trailers to CookiesHolder if there's any. * */
static class UpdateCookieListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {

private final CookiesHolder cookie;

UpdateCookieListener(ClientCall.Listener<RespT> delegate, CookiesHolder cookiesHolder) {
super(delegate);
this.cookie = cookiesHolder;
}

@Override
public void onHeaders(Metadata headers) {
try {
cookie.extractCookiesFromMetadata(headers);
} catch (Throwable e) {
LOG.log(Level.WARNING, "Failed to extract cookie from response headers.", e);
} finally {
super.onHeaders(headers);
}
}

@Override
public void onClose(Status status, Metadata trailers) {
try {
cookie.extractCookiesFromMetadata(trailers);
} catch (Throwable e) {
LOG.log(Level.WARNING, "Failed to extract cookie from response trailers.", e);
} finally {
super.onClose(status, trailers);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

/**
* The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's
* cookies will be merged into the value holder and will be sent out with the next retry attempt.
*/
class CookiesServerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> callable;

CookiesServerStreamingCallable(ServerStreamingCallable<RequestT, ResponseT> innerCallable) {
this.callable = innerCallable;
}

@Override
public void call(
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
GrpcCallContext grpcCallContext = (GrpcCallContext) context;
callable.call(
request,
responseObserver,
grpcCallContext.withCallOptions(
grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY;

import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;

/**
* The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's
* cookies will be merged into the value holder and will be sent out with the next retry attempt.
*/
class CookiesUnaryCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;

CookiesUnaryCallable(UnaryCallable<RequestT, ResponseT> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
GrpcCallContext grpcCallContext = (GrpcCallContext) context;
return innerCallable.futureCall(
request,
grpcCallContext.withCallOptions(
grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

// Inject channel priming
if (settings.isRefreshingChannel()) {
// Fix the credentials so that they can be shared
Expand All @@ -194,20 +202,18 @@ public static EnhancedBigtableStubSettings finalizeSettings(
}
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

// Inject the primer
InstantiatingGrpcChannelProvider transportProvider =
(InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider();

builder.setTransportChannelProvider(
transportProvider
.toBuilder()
.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()))
.build());
if (transportProvider != null) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
}
}

if (transportProvider != null) {
builder.setTransportChannelProvider(transportProvider.build());
}

ImmutableMap<TagKey, TagValue> attributes =
Expand Down Expand Up @@ -365,7 +371,11 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -401,7 +411,9 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -642,7 +654,9 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -924,7 +938,10 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1004,7 +1021,10 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -1017,7 +1037,11 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand Down
Loading

0 comments on commit 4c73abd

Please sign in to comment.