diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 734dedcccd..2bab543536 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -322,6 +322,9 @@ public class RssSparkConfig { public static final ConfigEntry RSS_ACCESS_ID = createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); + public static final ConfigEntry RSS_ACCESS_ID_PROVIDER_KEY = + createStringBuilder(new ConfigBuilder("spark.rss.access.id.provider.key")) + .createWithDefault(""); public static final ConfigEntry RSS_ACCESS_TIMEOUT_MS = createIntegerBuilder( diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index af9295e559..657be54d1a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -112,8 +112,11 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); if (StringUtils.isEmpty(accessId)) { - LOG.warn("Access id key is empty"); - return false; + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } } long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index bb8ed3a901..1bb568d526 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -111,6 +111,13 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); + if (StringUtils.isEmpty(accessId)) { + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } + } if (StringUtils.isEmpty(accessId)) { LOG.warn("Access id key is empty"); return false; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index c7011f27eb..092869e31d 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -47,15 +47,27 @@ public void testCreateInDriver() throws Exception { SparkConf conf = new SparkConf(); assertCreateSortShuffleManager(conf); + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set("spark.foo.bar.key", "mockId"); + conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key"); + assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); assertCreateRssShuffleManager(conf); conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); assertCreateSortShuffleManager(conf); }