Spark provides the SQL function row_number
,
which assigns each row a consecutive number, starting from 1. This function works on a Window.
Assigning a row number over the entire Dataset will load the entire dataset into a single partition / executor.
This does not scale.
Spark extensions provide the Dataset
transformation withRowNumbers
, which assigns a global row number while scaling:
val df = Seq((1, "one"), (2, "TWO"), (2, "two"), (3, "three")).toDF("id", "value")
df.show()
// +---+-----+
// | id|value|
// +---+-----+
// | 1| one|
// | 2| TWO|
// | 2| two|
// | 3|three|
// +---+-----+
import uk.co.gresearch.spark._
df.withRowNumbers().show()
// +---+-----+----------+
// | id|value|row_number|
// +---+-----+----------+
// | 1| one| 1|
// | 2| two| 2|
// | 2| TWO| 3|
// | 3|three| 4|
// +---+-----+----------+
In Java:
import uk.co.gresearch.spark.RowNumbers;
RowNumbers.of(df).show();
// +---+-----+----------+
// | id|value|row_number|
// +---+-----+----------+
// | 1| one| 1|
// | 2| two| 2|
// | 2| TWO| 3|
// | 3|three| 4|
// +---+-----+----------+
In Python:
import gresearch.spark
df.with_row_numbers().show()
# +---+-----+----------+
# | id|value|row_number|
# +---+-----+----------+
# | 1| one| 1|
# | 2| two| 2|
# | 2| TWO| 3|
# | 3|three| 4|
# +---+-----+----------+
Row numbers are assigned in the current order of the Dataset. If you want a specific order, provide columns as follows:
df.withRowNumbers($"id".desc, $"value").show()
// +---+-----+----------+
// | id|value|row_number|
// +---+-----+----------+
// | 3|three| 1|
// | 2| TWO| 2|
// | 2| two| 3|
// | 1| one| 4|
// +---+-----+----------+
In Java:
RowNumbers.withOrderColumns(df.col("id").desc(), df.col("value")).of(df).show();
// +---+-----+----------+
// | id|value|row_number|
// +---+-----+----------+
// | 3|three| 1|
// | 2| TWO| 2|
// | 2| two| 3|
// | 1| one| 4|
// +---+-----+----------+
In Python:
df.with_row_numbers(order=[df.id.desc(), df.value]).show()
# +---+-----+----------+
# | id|value|row_number|
# +---+-----+----------+
# | 3|three| 1|
# | 2| TWO| 2|
# | 2| two| 3|
# | 1| one| 4|
# +---+-----+----------+
The column name that contains the row number can be changed by providing the rowNumberColumnName
argument:
df.withRowNumbers(rowNumberColumnName="row").show()
// +---+-----+---+
// | id|value|row|
// +---+-----+---+
// | 1| one| 1|
// | 2| TWO| 2|
// | 2| two| 3|
// | 3|three| 4|
// +---+-----+---+
In Java:
RowNumbers.withRowNumberColumnName("row").of(df).show();
// +---+-----+---+
// | id|value|row|
// +---+-----+---+
// | 1| one| 1|
// | 2| TWO| 2|
// | 2| two| 3|
// | 3|three| 4|
// +---+-----+---+
In Python:
df.with_row_numbers(row_number_column_name='row').show()
# +---+-----+---+
# | id|value|row|
# +---+-----+---+
# | 1| one| 1|
# | 2| TWO| 2|
# | 2| two| 3|
# | 3|three| 4|
# +---+-----+---+
The withRowNumbers
transformation requires the input Dataset to be
cached /
persisted,
after adding an intermediate column. You can specify the level of persistence through the storageLevel
parameter.
import org.apache.spark.storage.StorageLevel
val dfWithRowNumbers = df.withRowNumbers(storageLevel=StorageLevel.DISK_ONLY)
In Java:
import org.apache.spark.storage.StorageLevel;
Dataset<Row> dfWithRowNumbers = RowNumbers.withStorageLevel(StorageLevel.DISK_ONLY()).of(df);
In Python:
from pyspark.storagelevel import StorageLevel
df_with_row_numbers = df.with_row_numbers(storage_level=StorageLevel.DISK_ONLY)
If you want control over when to un-persist this intermediate Dataset, you can provide an UnpersistHandle
and call it
when you are done with the result Dataset:
import uk.co.gresearch.spark.UnpersistHandle
val unpersist = UnpersistHandle()
val dfWithRowNumbers = df.withRowNumbers(unpersistHandle=unpersist);
// after you are done with dfWithRowNumbers you may want to call unpersist()
unpersist(blocking=false)
In Java:
import uk.co.gresearch.spark.UnpersistHandle;
UnpersistHandle unpersist = new UnpersistHandle();
Dataset<Row> dfWithRowNumbers = RowNumbers.withUnpersistHandle(unpersist).of(df);
// after you are done with dfWithRowNumbers you may want to call unpersist()
unpersist.apply(true);
In Python:
unpersist = spark.unpersist_handle()
df_with_row_numbers = df.with_row_numbers(unpersist_handle=unpersist)
# after you are done with df_with_row_numbers you may want to call unpersist()
unpersist(blocking=True)
You will recognize that Spark logs the following warning:
WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
This warning is unavoidable, because withRowNumbers
has to pull information about the initial partitions into a single partition.
Fortunately, there are only 12 Bytes per input partition required, so this amount of data usually fits into a single partition and the warning can safely be ignored.
Note that this feature is not supported in Python when connected with a Spark Connect server.