Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Client side foundations and refactor to support gRPC transport for controller APIs #1259

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<Class name="com.linkedin.venice.utils.RedundantExceptionFilter">
<Field name="singleton"/>
</Class>
<Class name="com.linkedin.venice.controllerapi.ControllerTransport">
<Class name="com.linkedin.venice.controllerapi.ControllerHttpTransportClient">
<Field name="OBJECT_MAPPER"/>
</Class>
<Class name="com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.venice.controller.converters;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.*;

import com.linkedin.venice.controller.requests.ControllerHttpRequest;
import com.linkedin.venice.controller.requests.CreateStoreRequest;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.QueryParams;
import com.linkedin.venice.protocols.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.CreateStoreGrpcResponse;
import java.util.Optional;


/**
* A utility class to hold all the conversion logic from {@link com.linkedin.venice.controller.requests.ControllerRequest}
* to transport specific request and transport specific responses back to {@link com.linkedin.venice.controllerapi.ControllerResponse}.
* The utility methods are registered with {@link GrpcConvertersRegistry} and {@link HttpConvertersRegistry} for the
* runtime to perform appropriate conversions. Refer to the documentation of the above converter registries for more
* information.
*/
public final class ConverterUtil {

// store related converters
public static CreateStoreGrpcRequest convertCreateStoreToGrpcRequest(CreateStoreRequest request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this code be a function of CreateStoreRequest? Perhaps the parent class ControllerRequest can become ControllerRequest<GRPC> and define a public abstract GRPC getGRPC() API. Then CreateStoreRequest extends ControllerRequest<CreateStoreGrpcRequest>? Would that eliminate the need for the whole registry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes back to my earlier comment - #1259 (comment)

Controller models (responses and requests) taking dependencies on the underlying transport through methods like toGrpc and fromGrpc would make the consumers of these models exposed to some underlying details (although we could control the access modifier) this also potential brings in the dependencies of the underlying transport dependencies for consumers of models.

Hence the choice to invert the dependency and have these concrete transport implementations depend on the models of controller abstraction and the conversion residing outside these models.

CreateStoreGrpcRequest.Builder builder = CreateStoreGrpcRequest.newBuilder()
.setStoreName(request.getStoreName())
.setKeySchema(request.getKeySchema())
.setValueSchema(request.getValueSchema())
.setIsSystemStore(request.isSystemStore());

Optional.ofNullable(request.getOwner()).ifPresent(builder::setOwner);
Optional.ofNullable(request.getAccessPermissions()).ifPresent(builder::setAccessPermission);

return builder.build();
}

public static ControllerHttpRequest convertCreateStoreToHttpRequest(CreateStoreRequest request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And similarly, the function to convert into a ControllerHttpRequest could likewise live over there?

QueryParams params = new QueryParams();

params.add(NAME, request.getStoreName())
.add(OWNER, request.getOwner())
.add(KEY_SCHEMA, request.getKeySchema())
.add(VALUE_SCHEMA, request.getValueSchema())
.add(IS_SYSTEM_STORE, request.isSystemStore())
.add(CLUSTER, request.getClusterName());

Optional.ofNullable(request.getAccessPermissions()).ifPresent(perm -> params.add(ACCESS_PERMISSION, perm));

return ControllerHttpRequest.newBuilder().setParam(params).build();
}

public static NewStoreResponse convertCreateStoreGrpcResponse(CreateStoreGrpcResponse grpcResponse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And similarly to the idea I mentioned in my last review... can this become a function of NewStoreResponse? We could have ControllerResponse<GRPC, R extends ControllerResponse >, with NewStoreResponse extends ControllerResponse<CreateStoreGrpcResponse, NewStoreResponse>? Although the API would be the reverse, e.g. public R fromGRPC(GRPC) {} or similar?

NewStoreResponse response = new NewStoreResponse();
response.setOwner(grpcResponse.getOwner());
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.venice.controller.converters;

import com.linkedin.venice.controller.requests.ControllerRequest;
import com.linkedin.venice.controller.requests.CreateStoreRequest;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.CreateStoreGrpcResponse;
import java.util.HashMap;
import java.util.Map;


/**
* A registry to hold all the converters for {@link ControllerRequest} and {@link ControllerResponse} to and back
* to gRPC specific request and responses. New addition of converters need to be registered below under the
* {@link #initialize()}. e.g.,
* <pre>
* // Register sample request converter
* registerRequestConverter(NewApiRequest.class, ConverterUtil::convertNewApiToGrpcRequest)
*
* // Register sample response converter
* registerResponseConverter(NewApiResponse.class, ConverterUtil::converterNewApiGrpcResponse)
* </pre>
*/
public class GrpcConvertersRegistry {
private final Map<Class<?>, RequestConverter<? extends ControllerRequest, ?>> requestRegistry = new HashMap<>();
private final Map<Class<?>, ResponseConverter<?, ? extends ControllerResponse>> responseRegistry = new HashMap<>();

public GrpcConvertersRegistry() {
initialize();
}

public void initialize() {
registerRequestConverter(CreateStoreRequest.class, ConverterUtil::convertCreateStoreToGrpcRequest);
registerResponseConverter(CreateStoreGrpcResponse.class, ConverterUtil::convertCreateStoreGrpcResponse);
}

private <T extends ControllerRequest, R> void registerRequestConverter(
Class<T> requestType,
RequestConverter<T, R> converter) {
requestRegistry.computeIfAbsent(requestType, k -> converter);
}

@SuppressWarnings("unchecked")
public <T extends ControllerRequest, R> RequestConverter<T, R> getRequestConverter(Class<T> requestType) {
return (RequestConverter<T, R>) requestRegistry.get(requestType);
}

private <T, R extends ControllerResponse> void registerResponseConverter(
Class<T> requestType,
ResponseConverter<T, R> converter) {
responseRegistry.computeIfAbsent(requestType, k -> converter);
}

@SuppressWarnings("unchecked")
public <T, R extends ControllerResponse> ResponseConverter<T, R> getResponseConverter(Class<T> requestType) {
return (ResponseConverter<T, R>) responseRegistry.get(requestType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.venice.controller.converters;

import com.linkedin.venice.controller.requests.ControllerHttpRequest;
import com.linkedin.venice.controller.requests.ControllerRequest;
import com.linkedin.venice.controller.requests.CreateStoreRequest;
import java.util.HashMap;
import java.util.Map;


/**
* A registry to hold all the converters for {@link ControllerRequest} to Http specific request.
* New addition of converters need to be registered below under the
* {@link #initialize()}. e.g.,
* <pre>
* // Register sample request converter
* registerRequestConverter(NewApiRequest.class, ConverterUtil::convertNewApiToHttpRequest)
* </pre>
*/
public class HttpConvertersRegistry {
private final Map<Class<?>, RequestConverter<? extends ControllerRequest, ControllerHttpRequest>> requestRegistry =
new HashMap<>();
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GrpcConvertersRegistry has both request and response converters. Does this one likewise need to have both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our current response classes are in a much better shape due to its decoupling from the transport details. It was only our request that had a strong coupling the underlying transport to begin with and hence the need to introduce a request abstraction that is purely driven and interacted by the controller client.

As a result, the current http flow already returns this response abstraction defined by the controller (ControllerResponse and its extensions); So I decided to piggy back on that abstraction and have GRPC ones convert their responses into the controller abstracted response.

Additionally, I didn't want to refactor the deserialization handling piece within the ControllerHttpTransport and even if we want to extract it, each ones don't need a concrete converter as the jackson way of deserializing things already converts the response into concrete ControllerReponse objects.

Let me know if that answers your question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just don't understand why we need it for one but not the other...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is because, we decided to use a separate response data model for gRPC and ControllerResponse POJO isn't suitable, unless, we loose the typing information on the gRPC path and have responses represent byte[] in which case we can reuse all of it;

We decided as a team to follow gRPC convention and make the new request, response strongly typed.


public HttpConvertersRegistry() {
initialize();
}

public void initialize() {
registerRequestConverter(CreateStoreRequest.class, ConverterUtil::convertCreateStoreToHttpRequest);
}

private <T extends ControllerRequest> void registerRequestConverter(
Class<T> requestType,
RequestConverter<T, ControllerHttpRequest> converter) {
requestRegistry.computeIfAbsent(requestType, k -> converter);
}

@SuppressWarnings("unchecked")
public <T extends ControllerRequest> RequestConverter<T, ControllerHttpRequest> getRequestConverter(
Class<T> requestType) {
return (RequestConverter<T, ControllerHttpRequest>) requestRegistry.get(requestType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.venice.controller.converters;

import com.linkedin.venice.controller.requests.ControllerRequest;


/**
* A general interface to represent request converters that operates on {@Link T} to produce a transport specific
* request {@link D}
* @param <T> Source controller request type
* @param <D> Transport specific request type
Comment on lines +9 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do T and D stand for? Although it is fine to use single letters for the generic type names, I think it's important to try to make these names as self-explanatory as possible. For example, I/O which stand for Input/Output might be fine. It is also fine to spell out whole words instead of using single letters, e.g. SOURCE/DESTINATION, or SRC/DEST.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T -> The controller request type (e.g., storeCreateRequest, jobStatusRequest)
D -> controller transport specific request type (e.g., http, grpc etc)

I did use some of elaborative naming for generic types in few places in the code where I saw no ambiguity. I thought a bit and then wasn't fully convinced if SRC and DEST was going convey its intent; I even wondered if SRCREQ to DESTREQ would help.

Given this converter interface is not a general converter interface rather specific to the controller requests, I found the above choices too generic and hence resorted write the parameter java document to provide some clarity. Do we have any convention on naming generic types within our code base?

For e.g., Can we have something like ControllerRequestType TransportRequestType?

Copy link
Contributor

@FelixGV FelixGV Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T usually stands for "Type", which is fine if there is just a single thing to maintain a generic "type" for. In this case there are two things, however, so T seems ambiguous. D might stand for "Destination" in this context, though I'm not sure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see; T is your concern as it by default has the type connotation. Let me update ControllerRequestType and TransportRequestType if you have no objections. If not, I can fallback to SRC and DEST which i think is okay along with the java docs

*/
@FunctionalInterface
public interface RequestConverter<T extends ControllerRequest, D> {
D convert(T request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.linkedin.venice.controller.converters;

import com.linkedin.venice.controllerapi.ControllerResponse;


/**
* A general interface to represent response converters that operates on transport specific response {@Link T} to produce
* a controller specific responesn {@link D}
* @param <T> Source transport specific response type
* @param <D> Controller response type
mynameborat marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface ResponseConverter<T, D extends ControllerResponse> {
D convert(T response);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linkedin.venice.controller.requests;

import com.linkedin.venice.controllerapi.QueryParams;
import com.linkedin.venice.utils.Time;


/**
* A general class to represent Http specific transport request that embodies all the Http requests
* send to the controller as part of controller APIs. This is a container class defined to help with the
* refactoring of ControllerClient to become transport protocol agnostic.
*/
public class ControllerHttpRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the relationship between this class and ControllerRequest? Should this one extend the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ControllerRequest abstraction is meant to decouple the actual transport related information. ControllerHttpRequest however, describes how ControllerRequest will look like if http was chosen as the medium of transport.

Originally, I thought about having some coupling between ControllerRequest, ControllerHttpRequest and GeneratedV3.. (grpc general request type)in the form of having ControllerRequest host few abstract methods like

  public abstract ControllerRequest {
      ...
      abstract ControllerHttpRequest convertToHttpRequest();

      abstract <T extends GeneratedV3..> T convertToGrpcRequest();
 }

However, the above spills some of the transport details into the model and hence chose against it;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... but isn't it Protobuf's design goal to provide a one stop shop for models and their transport? And isn't that also what we do with Jackson (i.e. the model classes have transport-related annotations sprinkled over them). What is the interest in decoupling model from transport? (I understand the purpose of decoupling in the abstract, but I am asking specifically in this context, what does it buy us?)

private static final int DEFAULT_REQUEST_TIMEOUT_MS = 600 * Time.MS_PER_SECOND;
private static final int DEFAULT_MAX_ATTEMPTS = 10;
private final QueryParams params;
private final byte[] data;

private final int timeoutMs;

private final int maxRetries;

public ControllerHttpRequest(QueryParams params, byte[] data, int timeoutMs, int maxRetries) {
this.params = params;
this.data = data;
this.timeoutMs = timeoutMs;
this.maxRetries = maxRetries;
}

public QueryParams getParams() {
return params;
}

public byte[] getData() {
return data;
}

public int getTimeoutMs() {
return timeoutMs;
}

public int getMaxRetries() {
return maxRetries;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private QueryParams params;
private byte[] data;
private int timeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;

private int maxRetries = DEFAULT_MAX_ATTEMPTS;

public Builder setParam(QueryParams params) {
this.params = params;
return this;
}

public Builder setData(byte[] data) {
this.data = data;
return this;
}

public Builder setTimeoutMs(int timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}

public Builder setMaxRetries(int retries) {
this.maxRetries = retries;
return this;
}

public ControllerHttpRequest build() {
return new ControllerHttpRequest(params, data, timeoutMs, maxRetries);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.linkedin.venice.controller.requests;

import com.linkedin.venice.controllerapi.ControllerRoute;


/**
* A data model to represent controller requests that are exposed through CLIs and Rest Spec to end users. Authors
* of new controller APIs will need to extend this class and plug in appropropriate fields needed for the new API.
*
* Additionally, authors need to define converters for the new APIs to be able to work with newer transport agnostic
* request model. Refer to {@link com.linkedin.venice.controller.converters.GrpcConvertersRegistry} and
* {@link com.linkedin.venice.controller.converters.HttpConvertersRegistry} to register converters for new request types
*
* For routing, the newer APIs need to define routes in {@link ControllerRoute} and
* {@link com.linkedin.venice.controller.transport.GrpcRoute} for the {@link com.linkedin.venice.controllerapi.ControllerClient}
* to dispatch requests in transport agnostic model.
*/
public abstract class ControllerRequest {
private static final long DEFAULT_TIMEOUT_MS = 30000L; // 10 seconds
private static final int DEFAULT_MAX_RETRIES = 5;
private final ControllerRoute route;
private final String clusterName;

private final long timeoutMs;

private final int maxRetries;

public ControllerRequest(String clusterName, ControllerRoute route) {
this(clusterName, route, DEFAULT_TIMEOUT_MS, DEFAULT_MAX_RETRIES);
}

public ControllerRequest(String clusterName, ControllerRoute route, long timeoutMs, int maxRetries) {
this.clusterName = clusterName;
this.route = route;
this.timeoutMs = timeoutMs;
this.maxRetries = maxRetries;
}

public String getClusterName() {
return clusterName;
}

public ControllerRoute getRoute() {
return route;
}

public long getTimeoutMs() {
return timeoutMs;
}

public int getMaxRetries() {
return maxRetries;
}

static abstract class Builder<T extends Builder<?>> {
String clusterName;

public T setClusterName(String clusterName) {
this.clusterName = clusterName;
return getThis();
}

abstract T getThis();
}
}
Loading
Loading