diff --git a/build.gradle b/build.gradle index 9c02cfde4a..9f4fa4e3fa 100644 --- a/build.gradle +++ b/build.gradle @@ -40,8 +40,9 @@ if (project.hasProperty('overrideBuildEnvironment')) { def avroVersion = '1.10.2' def avroUtilVersion = '0.3.21' def grpcVersion = '1.49.2' -def kafkaGroup = 'com.linkedin.kafka' -def kafkaVersion = '2.4.1.78' +// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork: +def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka' +def kafkaVersion = '2.4.1' // '2.4.1.65' def log4j2Version = '2.17.1' def pegasusVersion = '29.31.0' def protobufVersion = '3.21.7' @@ -52,14 +53,15 @@ def alpnAgentVersion = '2.0.10' def hadoopVersion = '2.10.2' def apacheSparkVersion = '3.3.3' def antlrVersion = '4.8' +def scala = '2.12' ext.libraries = [ alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}", antlr4: "org.antlr:antlr4:${antlrVersion}", antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}", - apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}", - apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}", - apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}", + apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}", + apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}", + apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}", avro: "org.apache.avro:avro:${avroVersion}", avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}", avroMapred: "org.apache.avro:avro-mapred:${avroVersion}", @@ -101,7 +103,7 @@ ext.libraries = [ jna: 'net.java.dev.jna:jna:4.5.1', jsr305: 'com.google.code.findbugs:jsr305:3.0.2', joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2', - kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}", + kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}", kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}", kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test", log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}", @@ -194,6 +196,8 @@ subprojects { if (JavaVersion.current() >= JavaVersion.VERSION_1_9) { tasks.withType(JavaCompile) { + // Compiler arguments can be injected here... + // options.compilerArgs << '-Xlint:unchecked' options.release = 8 } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java index 097936c17b..d4e003ec65 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java @@ -27,6 +27,7 @@ import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.UndefinedPropertyException; import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.KafkaSSLUtils; import com.linkedin.venice.utils.RegionUtils; import com.linkedin.venice.utils.Utils; @@ -45,7 +46,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -90,8 +90,8 @@ public class VeniceClusterConfig { private final VeniceProperties clusterProperties; - private final SecurityProtocol kafkaSecurityProtocol; - private final Map kafkaBootstrapUrlToSecurityProtocol; + private final PubSubSecurityProtocol kafkaSecurityProtocol; + private final Map kafkaBootstrapUrlToSecurityProtocol; private final Optional sslConfig; public VeniceClusterConfig(VeniceProperties clusterProps, Map> kafkaClusterMap) @@ -136,17 +136,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>(); Int2ObjectMap tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>(); - Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); + Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); Map tmpKafkaUrlResolution = new HashMap<>(); boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty(); @@ -183,7 +183,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map sslConfig = serverConfig.getSslConfig(); if (!sslConfig.isPresent()) { @@ -1127,7 +1127,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot } properties.putAll(sslConfig.get().getKafkaSSLConfig()); } - properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name); + properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); return new VeniceProperties(properties); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 37c664f45b..65ca79333e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -44,6 +44,7 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; @@ -63,7 +64,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.avro.Schema; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -129,7 +129,7 @@ private void setupMockConfig() { doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy(); doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster(); - doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); + doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize(); diff --git a/clients/venice-admin-tool/build.gradle b/clients/venice-admin-tool/build.gradle index 32de0e29f8..a6f46156a6 100644 --- a/clients/venice-admin-tool/build.gradle +++ b/clients/venice-admin-tool/build.gradle @@ -28,6 +28,8 @@ dependencies { exclude group: 'org.apache.helix' } implementation('org.apache.helix:metrics-common:1.4.1:jdk8') + implementation libraries.zstd + testImplementation project(':internal:venice-common').sourceSets.test.output } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java new file mode 100644 index 0000000000..b09d63dabc --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java @@ -0,0 +1,25 @@ +package com.linkedin.venice.pubsub.api; + +import java.util.Locale; + + +/** + * This enum is equivalent to Kafka's SecurityProtocol enum. + * + * We need this abstraction because Kafka's enum is present in two different namespaces, which are different between + * LinkedIn's fork and the Apache fork. + */ +public enum PubSubSecurityProtocol { + /** Un-authenticated, non-encrypted channel */ + PLAINTEXT, + /** SSL channel */ + SSL, + /** SASL authenticated, non-encrypted channel */ + SASL_PLAINTEXT, + /** SASL authenticated, SSL channel */ + SASL_SSL; + + public static PubSubSecurityProtocol forName(String name) { + return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java index 9946842eed..19ed1a1a2c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.utils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; public class KafkaSSLUtils { @@ -26,26 +26,20 @@ public class KafkaSSLUtils { SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); - /** - * Right now, Venice only supports two Kafka protocols: - * {@link SecurityProtocol#PLAINTEXT} - * {@link SecurityProtocol#SSL} - * - * @param kafkaProtocol - * @return - */ public static boolean isKafkaProtocolValid(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } public static boolean isKafkaSSLProtocol(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } - public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) { - return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL; + public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) { + return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL; } /** diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java index ecfea7c98c..00e2a9da70 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null); @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))) diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java index a0a197af02..1f2a998d7b 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java @@ -2,13 +2,13 @@ import static org.testng.Assert.*; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.VeniceProperties; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.testng.annotations.Test; @@ -20,23 +20,23 @@ public class ApacheKafkaAdminConfigTest { @Test public void testSetupSaslInKafkaAdminPlaintext() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT); } @Test public void testSetupSaslInKafkaAdminSSL() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL); } - private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) { + private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) { Properties properties = new Properties(); properties.put("cluster.name", "cluster"); properties.put("zookeeper.address", "localhost:2181"); properties.put("kafka.bootstrap.servers", "localhost:9092"); properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG); properties.put("kafka.sasl.mechanism", SASL_MECHANISM); - properties.put("kafka.security.protocol", securityProtocol.name); - if (securityProtocol.name.contains("SSL")) { + properties.put("kafka.security.protocol", securityProtocol.name()); + if (securityProtocol.name().contains("SSL")) { properties.put("ssl.truststore.location", "-"); properties.put("ssl.truststore.password", ""); properties.put("ssl.truststore.type", "JKS"); @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) { Properties adminProperties = serverConfig.getAdminProperties(); assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config")); assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism")); - assertEquals(securityProtocol.name, adminProperties.get("security.protocol")); + assertEquals(securityProtocol.name(), adminProperties.get("security.protocol")); } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java new file mode 100644 index 0000000000..5edbd018e2 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java @@ -0,0 +1,86 @@ +package com.linkedin.venice.pubsub.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.utils.ReflectUtils; +import java.util.HashSet; +import java.util.Set; +import org.testng.annotations.Test; + + +public class PubSubSecurityProtocolTest { + private static final Class SPECIFIC_ENUM_CLASS = getFQCN(); + + private static Class getFQCN() { + String simpleClassName = "SecurityProtocol"; + try { + String apacheKafkaFQCN = "org.apache.kafka.common.security.auth" + "." + simpleClassName; + Class apacheKafkaSecurityProtocol = ReflectUtils.loadClass(apacheKafkaFQCN); + if (apacheKafkaSecurityProtocol != null && apacheKafkaSecurityProtocol.isEnum()) { + return apacheKafkaSecurityProtocol; + } + } catch (Exception e) { + // Expected if not using Apache Kafka. + } + try { + String liKafkaFQCN = "org.apache.kafka.common.protocol" + "." + simpleClassName; + Class liKafkaSecurityProtocol = ReflectUtils.loadClass(liKafkaFQCN); + if (liKafkaSecurityProtocol != null && liKafkaSecurityProtocol.isEnum()) { + return liKafkaSecurityProtocol; + } + } catch (Exception e) { + // Expected if not using LI Kafka... + } + throw new IllegalStateException( + "Neither the Apache Kafka nor LinkedIn Kafka version of " + simpleClassName + " were found on the classpath!"); + } + + /** + * This is a proof of concept of how we could instantiate the specific enum, though as of now there is no need for it. + * + * @return an enum instance specific to the pub sub library present on the class path. + */ + private static Enum getPubSubSpecificEnum(String enumValueName) { + return Enum.valueOf(SPECIFIC_ENUM_CLASS, enumValueName); + } + + /** + * This test merely checks that our own PubSubSecurityProtocol contains only values which exist in Kafka's own enum. + * It can be run with the classpath containing either the Apache fork or LinkedIn's fork of Kafka, and succeed. + */ + @Test + public void testInstantiation() { + for (PubSubSecurityProtocol value: PubSubSecurityProtocol.values()) { + Enum specificEnum = getPubSubSpecificEnum(value.name()); + assertNotNull(specificEnum); + assertTrue(specificEnum.getClass().isEnum()); + assertEquals(specificEnum.getClass().getSimpleName(), "SecurityProtocol"); + Set expectedPackageNames = new HashSet<>(2); + expectedPackageNames.add("org.apache.kafka.common.security.auth"); + expectedPackageNames.add("org.apache.kafka.common.protocol"); + assertTrue(expectedPackageNames.contains(specificEnum.getClass().getPackage().getName())); + assertEquals(specificEnum.name(), value.name()); + } + } + + @Test + public void testForName() { + assertEquals(PubSubSecurityProtocol.forName("plaintext"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("PLAINTEXT"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Plaintext"), PubSubSecurityProtocol.PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("ssl"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("SSL"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("Ssl"), PubSubSecurityProtocol.SSL); + + assertEquals(PubSubSecurityProtocol.forName("sasl_plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("SASL_PLAINTEXT"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("sasl_ssl"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("SASL_SSL"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Ssl"), PubSubSecurityProtocol.SASL_SSL); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java index 0a36349c14..ef67ef05ae 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java @@ -2,7 +2,7 @@ import static org.testng.Assert.*; -import org.apache.kafka.common.protocol.SecurityProtocol; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import org.testng.annotations.Test; @@ -25,9 +25,9 @@ public void testIsKafkaSSLProtocol() { @Test public void testTestIsKafkaSSLProtocol() { - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.PLAINTEXT)); - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_PLAINTEXT)); } } diff --git a/internal/venice-test-common/build.gradle b/internal/venice-test-common/build.gradle index f19fb40b66..ace505c90a 100644 --- a/internal/venice-test-common/build.gradle +++ b/internal/venice-test-common/build.gradle @@ -36,7 +36,6 @@ configurations { } } implementation { - exclude group: 'org.apache.kafka' exclude group: 'org.mortbay.jetty', module: 'servlet-api' } integrationTestImplementation.extendsFrom testImplementation @@ -109,6 +108,7 @@ dependencies { implementation libraries.samzaApi implementation libraries.spark implementation libraries.testng + implementation libraries.zstd implementation (libraries.mapreduceClientJobClient) { exclude group: 'org.apache.avro' @@ -119,6 +119,7 @@ dependencies { testImplementation project(':internal:venice-common').sourceSets.test.output testImplementation libraries.log4j2core testImplementation libraries.log4j2api + testImplementation libraries.kafkaClients jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get() jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils') diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java index aa0d6d7730..b65d2b1f9a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.integration.utils; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.SslUtils.VeniceTlsConfiguration; import java.util.Properties; import kafka.server.KafkaConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; /** @@ -43,7 +43,7 @@ public static Properties getLocalCommonKafkaSSLConfig(VeniceTlsConfiguration tls public static Properties getLocalKafkaClientSSLConfig() { Properties properties = new Properties(); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PubSubSecurityProtocol.SSL.name()); properties.putAll(getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); return properties; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index a13c8ba6eb..78e51df38b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -65,6 +65,7 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer; import com.linkedin.venice.stats.TehutiUtils; import com.linkedin.venice.utils.PropertyBuilder; @@ -82,7 +83,6 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +228,7 @@ static StatefulServiceProvider generateService(VeniceCo } if (options.isSslToKafka()) { - builder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 82c7393b4f..669abfa9fe 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -42,6 +42,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.AllowlistAccessor; import com.linkedin.venice.helix.ZkAllowlistAccessor; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.server.VeniceServerContext; @@ -72,7 +73,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.io.FileUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -256,7 +256,7 @@ static StatefulServiceProvider generateService( .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000) .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); if (sslToKafka) { - serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 5065c3d52e..3c9e760c5c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -18,6 +18,7 @@ import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; @@ -33,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -257,8 +257,8 @@ private static Map> addKafkaClusterIDMappingToServer List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { - SecurityProtocol baseSecurityProtocol = SecurityProtocol - .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name)); + PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol.valueOf( + serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name())); Map> kafkaClusterMap = new HashMap<>(); Map mapping; @@ -266,21 +266,21 @@ private static Map> addKafkaClusterIDMappingToServer mapping = new HashMap<>(); int clusterId = i - 1; mapping.put(KAFKA_CLUSTER_MAP_KEY_NAME, regionNames.get(clusterId)); - SecurityProtocol securityProtocol = baseSecurityProtocol; + PubSubSecurityProtocol securityProtocol = baseSecurityProtocol; if (clusterId > 0) { // Testing mixed security on any 2-layer setup with 2 or more DCs. - securityProtocol = SecurityProtocol.SSL; + securityProtocol = PubSubSecurityProtocol.SSL; } - mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name); + mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name()); // N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this // is why the index here is offset by 1 compared to the cluster ID. PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i); - String kafkaAddress = securityProtocol == SecurityProtocol.SSL + String kafkaAddress = securityProtocol == PubSubSecurityProtocol.SSL ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_URL, kafkaAddress); - String otherKafkaAddress = securityProtocol == SecurityProtocol.PLAINTEXT + String otherKafkaAddress = securityProtocol == PubSubSecurityProtocol.PLAINTEXT ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_OTHER_URLS, otherKafkaAddress); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java index 05a0861ac6..fedfec47a8 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/VeniceEnumValueTest.java @@ -60,7 +60,9 @@ public void test() { Function valueOfFunction = value -> { try { - return (T) valueOfMethod.invoke(null, value); + @SuppressWarnings("unchecked") + T valueOfReturn = (T) valueOfMethod.invoke(null, value); + return valueOfReturn; } catch (Exception e) { if (e.getClass() == InvocationTargetException.class && e.getCause() instanceof VeniceException) { // Those are expected for invalid values, so we bubble them up. @@ -95,7 +97,8 @@ public void test() { // Check that no other enum values exist besides those that are expected Method valuesFunction = getPublicStaticFunction(this.enumClass, VALUES_METHOD_NAME, new Class[0]); try { - T[] types = (T[]) valuesFunction.invoke(null, new Class[0]); + @SuppressWarnings("unchecked") + T[] types = (T[]) valuesFunction.invoke(null, new Object[0]); for (T type: types) { assertTrue( expectedMapping.containsKey(type.getValue()), @@ -106,7 +109,7 @@ public void test() { } } - private static Method getPublicStaticFunction(Class klass, String functionName, Class... params) { + private static Method getPublicStaticFunction(Class klass, String functionName, Class... params) { try { Method function = klass.getDeclaredMethod(functionName, params); assertTrue( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 2840f828cb..928ff1c0be 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -188,6 +188,7 @@ import com.linkedin.venice.meta.RoutingStrategy; import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory; import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService; import com.linkedin.venice.status.BatchJobHeartbeatConfigs; import com.linkedin.venice.utils.HelixUtils; @@ -212,7 +213,6 @@ import org.apache.helix.cloud.constants.CloudProvider; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.model.CloudConfig; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -599,7 +599,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) { this.sslKafkaBootStrapServers = sslToKafka ? props.getString(SSL_KAFKA_BOOTSTRAP_SERVERS) : null; this.helixSendMessageTimeoutMilliseconds = props.getInt(HELIX_SEND_MESSAGE_TIMEOUT_MS, 10000); - this.kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name()); + this.kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name()); if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocol)) { throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocol); }