-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate thread locals to Delta thread pools #2154
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits, otherwise it's looks good thanks :)
// Capture an immutable threadsafe snapshot of the current local properties | ||
val capturedProperties = sparkContext | ||
.map(sc => CapturedSparkThreadLocals.toValuesArray( | ||
org.apache.spark.util.Utils.cloneProperties(sc.getLocalProperties))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import Utils
instead of using qualified name here? You can rename it (e.g. to SparkUtils
) if it clashes.
capturedProperties.foreach { p => | ||
sparkContext.foreach(_.setLocalProperties(CapturedSparkThreadLocals.toProperties(p))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capturedProperties.foreach { p => | |
sparkContext.foreach(_.setLocalProperties(CapturedSparkThreadLocals.toProperties(p))) | |
} | |
for { | |
p <- capturedProperties | |
sc <- sparkContext | |
} sc.setLocalProperties(CapturedSparkThreadLocals.toProperties(p)) |
Isn't this much more readable? ;)
throw t | ||
} finally { | ||
TaskContext.setTaskContext(previousTaskContext) | ||
previousProperties.foreach(p => sparkContext.foreach(_.setLocalProperties(p))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
props.foreach { kvp => | ||
resultProps.put(kvp._1, kvp._2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, hear me out:
props.foreach { kvp => | |
resultProps.put(kvp._1, kvp._2) | |
} | |
for ((key, value) <- props) { | |
resultProps.put(key, value) | |
} |
} | ||
|
||
test("That CapturedSparkThreadLocals properly restores the existing spark properties." + | ||
" Changes to local properties inside a task do not affect the original properties") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
60faa14
to
da5724d
Compare
Hi @larsk-db , thank you so much for reviewing! :) I addressed your comments, hope it looks good now. I also moved the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thank you!
da5724d
to
0d69c60
Compare
* The default thread pool executor in Apache Spark does not forward thread locals to threads spawned in a thread pool. * This can cause issues if the threads depend on the thread locals. * To fix this, we introduce a wrapper class around the thread pool executor that forwards thread locals. Closes delta-io#2154 GitOrigin-RevId: 9e9423e4b041232457ffaab18f5f96490bb45b88
Which Delta project/connector is this regarding?
Description
How was this patch tested?
SparkThreadLocalForwardingExecutor
to ensure thread locals are forwarded and reset after future finished.Does this PR introduce any user-facing changes?
No