From 209eceedf90b21af2c9ed1ab419c60895802aae1 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Sat, 16 Nov 2024 12:47:40 +0800 Subject: [PATCH 1/3] Support set accessId by another config dynamically --- .../org/apache/spark/shuffle/RssSparkConfig.java | 3 +++ .../spark/shuffle/DelegationRssShuffleManager.java | 7 +++++-- .../spark/shuffle/DelegationRssShuffleManager.java | 7 +++++++ .../shuffle/DelegationRssShuffleManagerTest.java | 12 ++++++++++++ 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 734dedcccd..2bab543536 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -322,6 +322,9 @@ public class RssSparkConfig { public static final ConfigEntry RSS_ACCESS_ID = createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); + public static final ConfigEntry RSS_ACCESS_ID_PROVIDER_KEY = + createStringBuilder(new ConfigBuilder("spark.rss.access.id.provider.key")) + .createWithDefault(""); public static final ConfigEntry RSS_ACCESS_TIMEOUT_MS = createIntegerBuilder( diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index af9295e559..657be54d1a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -112,8 +112,11 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); if (StringUtils.isEmpty(accessId)) { - LOG.warn("Access id key is empty"); - return false; + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } } long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index bb8ed3a901..1bb568d526 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -111,6 +111,13 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); + if (StringUtils.isEmpty(accessId)) { + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } + } if (StringUtils.isEmpty(accessId)) { LOG.warn("Access id key is empty"); return false; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index c7011f27eb..092869e31d 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -47,15 +47,27 @@ public void testCreateInDriver() throws Exception { SparkConf conf = new SparkConf(); assertCreateSortShuffleManager(conf); + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set("spark.foo.bar.key", "mockId"); + conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key"); + assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); assertCreateRssShuffleManager(conf); conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); assertCreateSortShuffleManager(conf); } From 4950a88aa42f58fb8e450f8ac7a82ef8970bf691 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Mon, 18 Nov 2024 14:20:00 +0800 Subject: [PATCH 2/3] Fix camel style config --- .../src/main/java/org/apache/spark/shuffle/RssSparkConfig.java | 2 +- .../org/apache/spark/shuffle/DelegationRssShuffleManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 2bab543536..45950a42ec 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -323,7 +323,7 @@ public class RssSparkConfig { public static final ConfigEntry RSS_ACCESS_ID = createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); public static final ConfigEntry RSS_ACCESS_ID_PROVIDER_KEY = - createStringBuilder(new ConfigBuilder("spark.rss.access.id.provider.key")) + createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey")) .createWithDefault(""); public static final ConfigEntry RSS_ACCESS_TIMEOUT_MS = diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 1bb568d526..622451e149 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -112,7 +112,7 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); if (StringUtils.isEmpty(accessId)) { - String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), ""); if (StringUtils.isNotEmpty(accessId)) { accessId = sparkConf.get(providerKey, ""); LOG.info("Get access id {} from provider key: {}", accessId, providerKey); From 923a4106e998591b22f7e6cd3ab8fcdc76922b2d Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Mon, 18 Nov 2024 15:33:36 +0800 Subject: [PATCH 3/3] Add doc --- docs/client_guide/spark_client_guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index 3e504b51e1..c189c064d0 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -87,6 +87,8 @@ The important configuration is listed as following. | spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data | | spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath | | spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage | +| spark.rss.access.id | - | The access id for request access rss cluster. This is used for DelegationRssShuffleManager | +| spark.rss.access.id.providerKey | - | Get access id from the value of the given provider key. This is used for DelegationRssShuffleManager | ### Block id bits