diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 734dedcccd..45950a42ec 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -322,6 +322,9 @@ public class RssSparkConfig { public static final ConfigEntry RSS_ACCESS_ID = createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); + public static final ConfigEntry RSS_ACCESS_ID_PROVIDER_KEY = + createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey")) + .createWithDefault(""); public static final ConfigEntry RSS_ACCESS_TIMEOUT_MS = createIntegerBuilder( diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index af9295e559..657be54d1a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -112,8 +112,11 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); if (StringUtils.isEmpty(accessId)) { - LOG.warn("Access id key is empty"); - return false; + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } } long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index bb8ed3a901..622451e149 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -111,6 +111,13 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); + if (StringUtils.isEmpty(accessId)) { + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } + } if (StringUtils.isEmpty(accessId)) { LOG.warn("Access id key is empty"); return false; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index c7011f27eb..092869e31d 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -47,15 +47,27 @@ public void testCreateInDriver() throws Exception { SparkConf conf = new SparkConf(); assertCreateSortShuffleManager(conf); + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set("spark.foo.bar.key", "mockId"); + conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key"); + assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); assertCreateRssShuffleManager(conf); conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); assertCreateSortShuffleManager(conf); } diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index 3e504b51e1..c189c064d0 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -87,6 +87,8 @@ The important configuration is listed as following. | spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data | | spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath | | spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage | +| spark.rss.access.id | - | The access id for request access rss cluster. This is used for DelegationRssShuffleManager | +| spark.rss.access.id.providerKey | - | Get access id from the value of the given provider key. This is used for DelegationRssShuffleManager | ### Block id bits