Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Parallel stats collection within each partition #2203

Closed

Conversation

fred-db
Copy link
Contributor

@fred-db fred-db commented Oct 18, 2023

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

  • Do stats collection in parallel within a partition instead of sequentially to reduce the time spent idle waiting for network requests to go through while fetching the file status or the parquet footers from cloud store
  • Using global threadpool on each executor to do parallel stats collection
  • Code to partition the dataset before collecting stats to increase the achievable throughput

How was this patch tested?

  • Existing UTs should cover the correct collection of statistics.

Does this PR introduce any user-facing changes?

No

@felipepessoto
Copy link
Contributor

Is this only improving the scenario where user recompute stats for existing files?

I'm asking because generating stats takes considerable amount of time when inserting data, in my experiments around 20% of overhead in comparison when stats.collect is disabled.

@vkorukanti
Copy link
Collaborator

Is this only improving the scenario where user recompute stats for existing files?

I'm asking because generating stats takes considerable amount of time when inserting data, in my experiments around 20% of overhead in comparison when stats.collect is disabled.

AFAIK the method computeStats is used only when updating/deleting table with deletion vectors (DVs). It is a must to recompute stats for files that don't have stats when the AddFile contains a DV.

@@ -27,6 +28,7 @@ import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeltaStatistics._
import org.apache.spark.sql.delta.util.{DeltaFileOperations, JsonUtils}
import org.apache.spark.sql.delta.util.threads.DeltaThreadPool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The package name seems to be wrong. Are you missing a change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I am depending on this PR #2154 being merged first. I didn't want to include the changes of it in this PR, as it would make the code review harder. But once the other PR is merged, this should match up.

@@ -137,6 +189,21 @@ object StatsCollectionUtils
}
}

object ParallelFetchPool {
val NUM_THREADS_PER_CORE = 10
val MAX_THREADS = 1024
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread count is a bit high. Have you seen any issues with your testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any issues when trying it out on a 4 worker cluster. I think you could run into some throttling issues if the cluster you use is very large and you have to collect stats for DVs for many files. But then you can always increase the number of files per partition for stats collection, that should reduce throttling issues.

@vkorukanti vkorukanti changed the title Parallel stats collection within each partition [Spark] Parallel stats collection within each partition Oct 20, 2023
Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@fred-db fred-db force-pushed the multi-threaded-stats-collection branch from f96a232 to 1bb6352 Compare October 27, 2023 08:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants