Skip to content

Commit

Permalink
Support set accessId by another config dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Nov 16, 2024
1 parent 4eebbd4 commit 209ecee
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ public class RssSparkConfig {

public static final ConfigEntry<String> RSS_ACCESS_ID =
createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault("");
public static final ConfigEntry<String> RSS_ACCESS_ID_PROVIDER_KEY =
createStringBuilder(new ConfigBuilder("spark.rss.access.id.provider.key"))
.createWithDefault("");

public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS =
createIntegerBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException {
private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "");
if (StringUtils.isNotEmpty(accessId)) {
accessId = sparkConf.get(providerKey, "");
LOG.info("Get access id {} from provider key: {}", accessId, providerKey);
}
}
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException {

private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
if (StringUtils.isEmpty(accessId)) {
String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "");
if (StringUtils.isNotEmpty(accessId)) {
accessId = sparkConf.get(providerKey, "");
LOG.info("Get access id {} from provider key: {}", accessId, providerKey);
}
}
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,27 @@ public void testCreateInDriver() throws Exception {

SparkConf conf = new SparkConf();
assertCreateSortShuffleManager(conf);
conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set("spark.foo.bar.key", "mockId");
conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key");
assertCreateSortShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
assertCreateSortShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
assertCreateSortShuffleManager(conf);
}
Expand Down

0 comments on commit 209ecee

Please sign in to comment.