-
Notifications
You must be signed in to change notification settings - Fork 149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MINOR] improvement(spark-client): put sparkConf as extra properties while client request accessCluster #2254
base: master
Are you sure you want to change the base?
Changes from 5 commits
827ee04
f538b56
9b01609
1c93abf
9a3b25d
8d3f2ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.shuffle; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
|
@@ -32,6 +33,8 @@ | |
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; | ||
import org.apache.uniffle.client.request.RssAccessClusterRequest; | ||
import org.apache.uniffle.client.response.RssAccessClusterResponse; | ||
import org.apache.uniffle.common.config.RssClientConf; | ||
import org.apache.uniffle.common.config.RssConf; | ||
import org.apache.uniffle.common.exception.RssException; | ||
import org.apache.uniffle.common.rpc.StatusCode; | ||
import org.apache.uniffle.common.util.Constants; | ||
|
@@ -124,6 +127,16 @@ private boolean tryAccessCluster() { | |
extraProperties.put( | ||
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); | ||
|
||
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); | ||
List<String> excludeProperties = | ||
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); | ||
// Put all spark conf into extra properties, except which length is longer than 100 | ||
// to avoid extra properties too long. | ||
rssConf.getAll().stream() | ||
.filter(entry -> StringUtils.length((String) entry.getValue()) < 100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should you remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
.filter(entry -> !excludeProperties.contains(entry.getKey())) | ||
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); | ||
|
||
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); | ||
try { | ||
if (coordinatorClient != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.shuffle; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
|
@@ -32,6 +33,8 @@ | |
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; | ||
import org.apache.uniffle.client.request.RssAccessClusterRequest; | ||
import org.apache.uniffle.client.response.RssAccessClusterResponse; | ||
import org.apache.uniffle.common.config.RssClientConf; | ||
import org.apache.uniffle.common.config.RssConf; | ||
import org.apache.uniffle.common.exception.RssException; | ||
import org.apache.uniffle.common.rpc.StatusCode; | ||
import org.apache.uniffle.common.util.Constants; | ||
|
@@ -124,6 +127,16 @@ private boolean tryAccessCluster() { | |
extraProperties.put( | ||
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); | ||
|
||
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); | ||
List<String> excludeProperties = | ||
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default is null? Will it cause an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your tip, I set a default value. |
||
// Put all spark conf into extra properties, except which length is longer than 100 | ||
// to avoid extra properties too long. | ||
rssConf.getAll().stream() | ||
.filter(entry -> StringUtils.length((String) entry.getValue()) < 100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is too tricky.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Em... Thanks for your review! I'm sorry for this. I would greatly appreciate it if you could give me some advice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can implement a white list or configuration filter. The white list can be configured in the configuration. The configuration filter could have length configuration filter. The length could be configured in the configuration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think use black list instead of white list can be possible? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok for me, too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
.filter(entry -> !excludeProperties.contains(entry.getKey())) | ||
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); | ||
|
||
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); | ||
try { | ||
if (coordinatorClient != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -303,4 +303,10 @@ public class RssClientConf { | |
.withDescription( | ||
"The block id manager class of server for this application, " | ||
+ "the implementation of this interface to manage the shuffle block ids"); | ||
public static final ConfigOption<List<String>> RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a blank line. |
||
ConfigOptions.key("rss.client.reportExcludeProperties") | ||
.stringType() | ||
.asList() | ||
.defaultValues() | ||
.withDescription("the report exclude properties could be configured by this option"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you remove the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done