Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add support for gRPC in venice-controller - Part I #1396

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
id 'maven-publish'
id 'com.diffplug.spotless' version '6.12.0'
id 'com.dorongold.task-tree' version '2.1.0'
id 'com.github.johnrengelman.shadow' version '6.1.0' apply false
id 'com.github.johnrengelman.shadow' version '7.1.2' apply false
id 'com.github.spotbugs' version '4.8.0' apply false
id 'org.gradle.test-retry' version '1.5.0' apply false
id 'com.form.diff-coverage' version '0.9.5' apply false
Expand Down Expand Up @@ -87,6 +87,7 @@ ext.libraries = [
grpcProtobuf: "io.grpc:grpc-protobuf:${grpcVersion}",
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
grpcTesting: "io.grpc:grpc-testing:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
hadoopHdfs: "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}",
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
Expand Down Expand Up @@ -515,6 +516,12 @@ subprojects {
value = 'COVEREDRATIO'
minimum = threshold
}
// Ignore generate files
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect { fileTree(dir: it, exclude: [
'**/com/linkedin/venice/protocols/**',
])}))
}
}
}
}
Expand Down Expand Up @@ -818,4 +825,4 @@ task verifyJdkVersion {
gradle.taskGraph.whenReady {
// Ensure the JDK version is verified before any other tasks
verifyJdkVersion
}
}
5 changes: 5 additions & 0 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ jar {
}
}

shadowJar {
// Enable merging service files from different dependencies. Required to make gRPC based clients work.
mergeServiceFiles()
}

ext {
jacocoCoverageThreshold = 0.00
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -74,7 +72,7 @@ public GrpcTransportClient(GrpcClientConfig grpcClientConfig) {
this.port = port;
this.serverGrpcChannels = new VeniceConcurrentHashMap<>();
this.stubCache = new VeniceConcurrentHashMap<>();
this.channelCredentials = buildChannelCredentials(sslFactory);
this.channelCredentials = GrpcUtils.buildChannelCredentials(sslFactory);
}

@Override
Expand All @@ -99,25 +97,6 @@ public void close() throws IOException {
r2TransportClientForNonStorageOps.close();
}

@VisibleForTesting
ChannelCredentials buildChannelCredentials(SSLFactory sslFactory) {
// TODO: Evaluate if this needs to fail instead since it depends on plain text support on server
if (sslFactory == null) {
return InsecureChannelCredentials.create();
}

try {
TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder()
.keyManager(GrpcUtils.getKeyManagers(sslFactory))
.trustManager(GrpcUtils.getTrustManagers(sslFactory));
return tlsBuilder.build();
} catch (Exception e) {
throw new VeniceClientException(
"Failed to initialize SSL channel credentials for Venice gRPC Transport Client",
e);
}
}

@VisibleForTesting
VeniceClientRequest buildVeniceClientRequest(String[] requestParts, byte[] requestBody, boolean isSingleGet) {
VeniceClientRequest.Builder requestBuilder = VeniceClientRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package com.linkedin.venice.fastclient.transport;

import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.HttpMethod;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.fastclient.GrpcClientConfig;
import com.linkedin.venice.protocols.VeniceClientRequest;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.security.SSLFactory;
import io.grpc.ChannelCredentials;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -61,14 +68,6 @@ public void setUp() {
grpcTransportClient = new GrpcTransportClient(mockClientConfig);
}

@Test(expectedExceptions = VeniceClientException.class)
public void testBuildChannelCredentials() {
ChannelCredentials actualChannelCredentials = grpcTransportClient.buildChannelCredentials(null);
assertNotNull(actualChannelCredentials, "Null ssl factory should default to insecure channel credentials");

grpcTransportClient.buildChannelCredentials(mock(SSLFactory.class));
}

@Test
public void testBuildVeniceClientRequestForSingleGet() {
VeniceClientRequest clientRequest =
Expand Down
6 changes: 4 additions & 2 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@
</Match>
<Match>
<!--Ignore SpotBug Checks for the following package as classes are generated by protoc compiler-->
<Package name="com.linkedin.venice.protocols"/>
<Bug pattern="MS_EXPOSE_REP"/>
<Or>
<Package name="com.linkedin.venice.protocols"/>
<Package name="com.linkedin.venice.protocols.controller"/>
</Or>
</Match>
<Match>
<Bug pattern="SE_INNER_CLASS"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ private ConfigKeys() {
// What tags to assign to a controller instance
public static final String CONTROLLER_INSTANCE_TAG_LIST = "controller.instance.tag.list";

/**
* Whether to enable gRPC server in controller or not.
*/
public static final String CONTROLLER_GRPC_SERVER_ENABLED = "controller.grpc.server.enabled";

/**
* A port for the controller to listen on for incoming requests. On this port, the controller will
* server non-ssl requests.
*/
public static final String CONTROLLER_ADMIN_GRPC_PORT = "controller.admin.grpc.port";
/**
* A port for the controller to listen on for incoming requests. On this port, the controller will
* only serve ssl requests.
*/
public static final String CONTROLLER_ADMIN_SECURE_GRPC_PORT = "controller.admin.secure.grpc.port";

/**
* Number of threads to use for the gRPC server in controller.
*/
public static final String CONTROLLER_GRPC_SERVER_THREAD_COUNT = "controller.grpc.server.thread.count";

/** List of forbidden admin paths */
public static final String CONTROLLER_DISABLED_ROUTES = "controller.cluster.disabled.routes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ public class ControllerApiConstants {
public static final String SOURCE_GRID_FABRIC = "source_grid_fabric";
public static final String BATCH_JOB_HEARTBEAT_ENABLED = "batch_job_heartbeat_enabled";

public static final String NAME = "store_name";
public static final String STORE_NAME = "store_name";
/**
* @deprecated Use {@link #STORE_NAME} instead.
*/
public static final String NAME = STORE_NAME;
public static final String STORE_PARTITION = "store_partition";
public static final String STORE_VERSION = "store_version";
public static final String OWNER = "owner";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public class LeaderControllerResponse
private String cluster;
private String url;
private String secureUrl = null;
private String grpcUrl = null;
private String secureGrpcUrl = null;

public String getCluster() {
return cluster;
Expand All @@ -29,4 +31,20 @@ public String getSecureUrl() {
public void setSecureUrl(String url) {
this.secureUrl = url;
}

public void setGrpcUrl(String url) {
this.grpcUrl = url;
}

public String getGrpcUrl() {
return grpcUrl;
}

public void setSecureGrpcUrl(String url) {
this.secureGrpcUrl = url;
}

public String getSecureGrpcUrl() {
return secureGrpcUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.venice.controllerapi.request;

public class ClusterDiscoveryRequest extends ControllerRequest {
private static final String CLUSTER_NAME_PLACEHOLDER = "UNKNOWN";

public ClusterDiscoveryRequest(String storeName) {
super(CLUSTER_NAME_PLACEHOLDER, storeName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.venice.controllerapi.request;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_NAME;


/**
* Base class for request objects used in controller endpoints.
*
* Extend this class to ensure required parameters are validated in the constructor of the extending class.
* This class is intended for use on both the client and server sides.
* All required parameters should be passed to and validated within the constructor of the extending class.
*/
public class ControllerRequest {
protected String clusterName;
protected String storeName;

public ControllerRequest(String clusterName) {
this.clusterName = validateParam(clusterName, CLUSTER);
this.storeName = null;
}

public ControllerRequest(String clusterName, String storeName) {
this.clusterName = validateParam(clusterName, CLUSTER);
this.storeName = validateParam(storeName, STORE_NAME);
}

public String getClusterName() {
return clusterName;
}

public String getStoreName() {
return storeName;
}

public static String validateParam(String param, String paramName) {
if (param == null || param.isEmpty()) {
throw new IllegalArgumentException("The request is missing the " + paramName + ", which is a mandatory field.");
}
return param;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.venice.controllerapi.request;

/**
* Represents a request to create a new store in the specified Venice cluster with the provided parameters.
* This class encapsulates all necessary details for the creation of a store, including its name, owner,
* schema definitions, and access permissions.
*/
public class CreateNewStoreRequest extends ControllerRequest {
public static final String DEFAULT_STORE_OWNER = "";

private final String owner;
private final String keySchema;
private final String valueSchema;
private final boolean isSystemStore;

// a JSON string representing the access permissions for the store
private final String accessPermissions;

public CreateNewStoreRequest(
String clusterName,
String storeName,
String owner,
String keySchema,
String valueSchema,
String accessPermissions,
boolean isSystemStore) {
super(clusterName, storeName);
this.keySchema = validateParam(keySchema, "Key schema");
this.valueSchema = validateParam(valueSchema, "Value schema");
this.owner = owner == null ? DEFAULT_STORE_OWNER : owner;
this.accessPermissions = accessPermissions;
this.isSystemStore = isSystemStore;
}

public String getOwner() {
return owner;
}

public String getKeySchema() {
return keySchema;
}

public String getValueSchema() {
return valueSchema;
}

public String getAccessPermissions() {
return accessPermissions;
}

public boolean isSystemStore() {
return isSystemStore;
}
}
Loading
Loading