diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 5e02a5910b115..9c5adca7e2863 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -55,16 +55,28 @@ import org.apache.spark.util.SparkClassUtils * only `Column` but also other types such as a native string. The other variants currently exist * for historical reasons. * - * @groupname udf_funcs UDF functions + * @groupname udf_funcs UDF, UDAF and UDT * @groupname agg_funcs Aggregate functions - * @groupname datetime_funcs Date time functions - * @groupname sort_funcs Sorting functions - * @groupname normal_funcs Non-aggregate functions - * @groupname math_funcs Math functions + * @groupname datetime_funcs Date and Timestamp functions + * @groupname sort_funcs Sort functions + * @groupname normal_funcs Normal functions + * @groupname math_funcs Mathematical functions + * @groupname bitwise_funcs Bitwise functions + * @groupname predicate_funcs Predicate functions + * @groupname conditional_funcs Conditional functions + * @groupname hash_funcs Hash functions * @groupname misc_funcs Misc functions * @groupname window_funcs Window functions + * @groupname generator_funcs Generator functions * @groupname string_funcs String functions * @groupname collection_funcs Collection functions + * @groupname array_funcs Array functions + * @groupname map_funcs Map functions + * @groupname struct_funcs Struct functions + * @groupname csv_funcs CSV functions + * @groupname json_funcs JSON functions + * @groupname xml_funcs XML functions + * @groupname url_funcs URL functions * @groupname partition_transforms Partition transform functions * @groupname Ungrouped Support functions for DataFrames * @@ -101,6 +113,7 @@ object functions { * Scala Symbol, it is converted into a [[Column]] also. Otherwise, a new [[Column]] is created * to represent the literal value. * + * @group normal_funcs * @since 3.4.0 */ def lit(literal: Any): Column = { @@ -145,7 +158,7 @@ object functions { /** * Creates a struct with the given field names and values. * - * @group normal_funcs + * @group struct_funcs * @since 3.5.0 */ def named_struct(cols: Column*): Column = Column.fn("named_struct", cols: _*) @@ -1610,7 +1623,7 @@ object functions { /** * Creates a new array column. The input columns must all have the same data type. * - * @group normal_funcs + * @group array_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1619,7 +1632,7 @@ object functions { /** * Creates a new array column. The input columns must all have the same data type. * - * @group normal_funcs + * @group array_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1632,7 +1645,7 @@ object functions { * value1, key2, value2, ...). The key columns must all have the same data type, and can't be * null. The value columns must all have the same data type. * - * @group normal_funcs + * @group map_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1642,7 +1655,7 @@ object functions { * Creates a new map column. The array in the first column is used for keys. The array in the * second column is used for values. All elements in the array for key should not be null. * - * @group normal_funcs + * @group map_funcs * @since 3.4.0 */ def map_from_arrays(keys: Column, values: Column): Column = @@ -1698,7 +1711,7 @@ object functions { * For example, `coalesce(a, b, c)` will return a if a is not null, or b if a is null and b is * not null, or c if both a and b are null but c is not null. * - * @group normal_funcs + * @group conditional_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1707,7 +1720,7 @@ object functions { /** * Creates a string column for the file name of the current Spark task. * - * @group normal_funcs + * @group misc_funcs * @since 3.4.0 */ def input_file_name(): Column = Column.fn("input_file_name") @@ -1715,7 +1728,7 @@ object functions { /** * Return true iff the column is NaN. * - * @group normal_funcs + * @group predicate_funcs * @since 3.4.0 */ def isnan(e: Column): Column = e.isNaN @@ -1723,7 +1736,7 @@ object functions { /** * Return true iff the column is null. * - * @group normal_funcs + * @group predicate_funcs * @since 3.4.0 */ def isnull(e: Column): Column = e.isNull @@ -1743,7 +1756,7 @@ object functions { * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. * }}} * - * @group normal_funcs + * @group misc_funcs * @since 3.4.0 */ @deprecated("Use monotonically_increasing_id()", "2.0.0") @@ -1764,7 +1777,7 @@ object functions { * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. * }}} * - * @group normal_funcs + * @group misc_funcs * @since 3.4.0 */ def monotonically_increasing_id(): Column = Column.fn("monotonically_increasing_id") @@ -1774,7 +1787,7 @@ object functions { * * Both inputs should be floating point columns (DoubleType or FloatType). * - * @group normal_funcs + * @group conditional_funcs * @since 3.4.0 */ def nanvl(col1: Column, col2: Column): Column = Column.fn("nanvl", col1, col2) @@ -1790,7 +1803,7 @@ object functions { * df.select( negate(df.col("amount")) ); * }}} * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ def negate(e: Column): Column = -e @@ -1805,7 +1818,7 @@ object functions { * df.filter( not(df.col("isActive")) ); * }}} * - * @group normal_funcs + * @group predicate_funcs * @since 3.4.0 */ def not(e: Column): Column = !e @@ -1817,7 +1830,7 @@ object functions { * @note * The function is non-deterministic in general case. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ def rand(seed: Long): Column = Column.fn("rand", lit(seed)) @@ -1829,7 +1842,7 @@ object functions { * @note * The function is non-deterministic in general case. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ def rand(): Column = Column.fn("rand", lit(SparkClassUtils.random.nextLong)) @@ -1841,7 +1854,7 @@ object functions { * @note * The function is non-deterministic in general case. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ def randn(seed: Long): Column = Column.fn("randn", lit(seed)) @@ -1853,7 +1866,7 @@ object functions { * @note * The function is non-deterministic in general case. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ def randn(): Column = Column.fn("randn", lit(SparkClassUtils.random.nextLong)) @@ -1864,7 +1877,7 @@ object functions { * @note * This is non-deterministic because it depends on data partitioning and task scheduling. * - * @group normal_funcs + * @group misc_funcs * @since 3.4.0 */ def spark_partition_id(): Column = Column.fn("spark_partition_id") @@ -1943,7 +1956,7 @@ object functions { * StructField's name, otherwise, the newly generated StructField's name would be auto generated * as `col` with a suffix `index + 1`, i.e. col1, col2, col3, ... * - * @group normal_funcs + * @group struct_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1952,7 +1965,7 @@ object functions { /** * Creates a new struct column that composes multiple input columns. * - * @group normal_funcs + * @group struct_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -1978,7 +1991,7 @@ object functions { * .otherwise(2)) * }}} * - * @group normal_funcs + * @group conditional_funcs * @since 3.4.0 */ def when(condition: Column, value: Any): Column = Column { builder => @@ -1991,7 +2004,7 @@ object functions { /** * Computes bitwise NOT (~) of a number. * - * @group normal_funcs + * @group bitwise_funcs * @since 3.4.0 */ @deprecated("Use bitwise_not", "3.2.0") @@ -2000,7 +2013,7 @@ object functions { /** * Computes bitwise NOT (~) of a number. * - * @group normal_funcs + * @group bitwise_funcs * @since 3.4.0 */ def bitwise_not(e: Column): Column = Column.fn("~", e) @@ -2512,7 +2525,7 @@ object functions { * Returns the greatest value of the list of values, skipping null values. This function takes * at least 2 parameters. It will return null iff all parameters are null. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -2522,7 +2535,7 @@ object functions { * Returns the greatest value of the list of column names, skipping null values. This function * takes at least 2 parameters. It will return null iff all parameters are null. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -2615,7 +2628,7 @@ object functions { * Returns the least value of the list of values, skipping null values. This function takes at * least 2 parameters. It will return null iff all parameters are null. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -2625,7 +2638,7 @@ object functions { * Returns the least value of the list of column names, skipping null values. This function * takes at least 2 parameters. It will return null iff all parameters are null. * - * @group normal_funcs + * @group math_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -2909,7 +2922,7 @@ object functions { * Shift the given value numBits left. If the given value is a long value, this function will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ @deprecated("Use shiftleft", "3.2.0") @@ -2919,7 +2932,7 @@ object functions { * Shift the given value numBits left. If the given value is a long value, this function will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ def shiftleft(e: Column, numBits: Int): Column = Column.fn("shiftleft", e, lit(numBits)) @@ -2928,7 +2941,7 @@ object functions { * (Signed) shift the given value numBits right. If the given value is a long value, it will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ @deprecated("Use shiftright", "3.2.0") @@ -2938,7 +2951,7 @@ object functions { * (Signed) shift the given value numBits right. If the given value is a long value, it will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ def shiftright(e: Column, numBits: Int): Column = Column.fn("shiftright", e, lit(numBits)) @@ -2947,7 +2960,7 @@ object functions { * Unsigned shift the given value numBits right. If the given value is a long value, it will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ @deprecated("Use shiftrightunsigned", "3.2.0") @@ -2957,7 +2970,7 @@ object functions { * Unsigned shift the given value numBits right. If the given value is a long value, it will * return a long value else it will return an integer value. * - * @group math_funcs + * @group bitwise_funcs * @since 3.4.0 */ def shiftrightunsigned(e: Column, numBits: Int): Column = @@ -3220,7 +3233,7 @@ object functions { * Calculates the MD5 digest of a binary column and returns the value as a 32 character hex * string. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ def md5(e: Column): Column = Column.fn("md5", e) @@ -3229,7 +3242,7 @@ object functions { * Calculates the SHA-1 digest of a binary column and returns the value as a 40 character hex * string. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ def sha1(e: Column): Column = Column.fn("sha1", e) @@ -3243,7 +3256,7 @@ object functions { * @param numBits * one of 224, 256, 384, or 512. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ def sha2(e: Column, numBits: Int): Column = { @@ -3257,7 +3270,7 @@ object functions { * Calculates the cyclic redundancy check value (CRC32) of a binary column and returns the value * as a bigint. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ def crc32(e: Column): Column = Column.fn("crc32", e) @@ -3265,7 +3278,7 @@ object functions { /** * Calculates the hash code of given columns, and returns the result as an int column. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -3275,7 +3288,7 @@ object functions { * Calculates the hash code of given columns using the 64-bit variant of the xxHash algorithm, * and returns the result as a long column. The hash computation uses an initial seed of 42. * - * @group misc_funcs + * @group hash_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -3632,7 +3645,7 @@ object functions { /** * Returns a sha1 hash value as a hex string of the `col`. * - * @group misc_funcs + * @group hash_funcs * @since 3.5.0 */ def sha(col: Column): Column = Column.fn("sha", col) @@ -3708,7 +3721,7 @@ object functions { * Returns a random value with independent and identically distributed (i.i.d.) uniformly * distributed values in [0, 1). * - * @group misc_funcs + * @group math_funcs * @since 3.5.0 */ def random(seed: Column): Column = Column.fn("random", seed) @@ -3717,7 +3730,7 @@ object functions { * Returns a random value with independent and identically distributed (i.i.d.) uniformly * distributed values in [0, 1). * - * @group misc_funcs + * @group math_funcs * @since 3.5.0 */ def random(): Column = Column.fn("random", lit(SparkClassUtils.random.nextLong)) @@ -3744,7 +3757,7 @@ object functions { * Returns a bitmap with the positions of the bits set from all the values from the input * column. The input column will most likely be bitmap_bit_position(). * - * @group misc_funcs + * @group agg_funcs * @since 3.5.0 */ def bitmap_construct_agg(col: Column): Column = @@ -3762,7 +3775,7 @@ object functions { * Returns a bitmap that is the bitwise OR of all of the bitmaps from the input column. The * input column should be bitmaps created from bitmap_construct_agg(). * - * @group misc_funcs + * @group agg_funcs * @since 3.5.0 */ def bitmap_or_agg(col: Column): Column = Column.fn("bitmap_or_agg", col) @@ -3993,7 +4006,7 @@ object functions { /** * Returns true if `str` matches `regexp`, or false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def rlike(str: Column, regexp: Column): Column = Column.fn("rlike", str, regexp) @@ -4001,7 +4014,7 @@ object functions { /** * Returns true if `str` matches `regexp`, or false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def regexp(str: Column, regexp: Column): Column = Column.fn("regexp", str, regexp) @@ -4009,7 +4022,7 @@ object functions { /** * Returns true if `str` matches `regexp`, or false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def regexp_like(str: Column, regexp: Column): Column = Column.fn("regexp_like", str, regexp) @@ -4477,7 +4490,7 @@ object functions { /** * Extracts a part from a URL. * - * @group string_funcs + * @group url_funcs * @since 3.5.0 */ def parse_url(url: Column, partToExtract: Column, key: Column): Column = @@ -4486,7 +4499,7 @@ object functions { /** * Extracts a part from a URL. * - * @group string_funcs + * @group url_funcs * @since 3.5.0 */ def parse_url(url: Column, partToExtract: Column): Column = @@ -4505,7 +4518,7 @@ object functions { * Decodes a `str` in 'application/x-www-form-urlencoded' format using a specific encoding * scheme. * - * @group string_funcs + * @group url_funcs * @since 3.5.0 */ def url_decode(str: Column): Column = Column.fn("url_decode", str) @@ -4514,7 +4527,7 @@ object functions { * Translates a string into 'application/x-www-form-urlencoded' format using a specific encoding * scheme. * - * @group string_funcs + * @group url_funcs * @since 3.5.0 */ def url_encode(str: Column): Column = Column.fn("url_encode", str) @@ -4677,7 +4690,7 @@ object functions { * Returns true if str matches `pattern` with `escapeChar`, null if any arguments are null, * false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def like(str: Column, pattern: Column, escapeChar: Column): Column = @@ -4687,7 +4700,7 @@ object functions { * Returns true if str matches `pattern` with `escapeChar`('\'), null if any arguments are null, * false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def like(str: Column, pattern: Column): Column = Column.fn("like", str, pattern) @@ -4696,7 +4709,7 @@ object functions { * Returns true if str matches `pattern` with `escapeChar` case-insensitively, null if any * arguments are null, false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def ilike(str: Column, pattern: Column, escapeChar: Column): Column = @@ -4706,7 +4719,7 @@ object functions { * Returns true if str matches `pattern` with `escapeChar`('\') case-insensitively, null if any * arguments are null, false otherwise. * - * @group string_funcs + * @group predicate_funcs * @since 3.5.0 */ def ilike(str: Column, pattern: Column): Column = Column.fn("ilike", str, pattern) @@ -5945,7 +5958,7 @@ object functions { /** * Returns null if the array is null, true if the array contains `value`, and false otherwise. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_contains(column: Column, value: Any): Column = @@ -5955,7 +5968,7 @@ object functions { * Returns an ARRAY containing all elements from the source ARRAY as well as the new element. * The new element/column is located at end of the ARRAY. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_append(column: Column, element: Any): Column = @@ -5965,7 +5978,7 @@ object functions { * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns * `false` otherwise. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2) @@ -5981,7 +5994,7 @@ object functions { * @param length * the length of the slice * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def slice(x: Column, start: Int, length: Int): Column = @@ -5998,7 +6011,7 @@ object functions { * @param length * the length of the slice * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def slice(x: Column, start: Column, length: Column): Column = @@ -6007,7 +6020,7 @@ object functions { /** * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with * `nullReplacement`. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_join(column: Column, delimiter: String, nullReplacement: String): Column = @@ -6015,7 +6028,7 @@ object functions { /** * Concatenates the elements of `column` using the `delimiter`. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_join(column: Column, delimiter: String): Column = @@ -6039,7 +6052,7 @@ object functions { * The position is not zero based, but 1 based index. Returns 0 if value could not be found in * array. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_position(column: Column, value: Any): Column = @@ -6062,7 +6075,7 @@ object functions { * (map, key) - Returns value for given key. The function always returns NULL if the key is not * contained in the map. * - * @group map_funcs + * @group collection_funcs * @since 3.5.0 */ def try_element_at(column: Column, value: Column): Column = @@ -6072,7 +6085,7 @@ object functions { * Returns element of array at given (0-based) index. If the index points outside of the array * boundaries, then this function returns NULL. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def get(column: Column, index: Column): Column = Column.fn("get", column, index) @@ -6102,7 +6115,7 @@ object functions { /** * Remove all elements that equal to element from the given array. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_remove(column: Column, element: Any): Column = @@ -6111,7 +6124,7 @@ object functions { /** * Remove all null elements from the given array. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_compact(column: Column): Column = Column.fn("array_compact", column) @@ -6120,7 +6133,7 @@ object functions { * Returns an array containing value as well as all elements from array. The new element is * positioned at the beginning of the array. * - * @group collection_funcs + * @group array_funcs * @since 3.5.0 */ def array_prepend(column: Column, element: Any): Column = @@ -6128,7 +6141,7 @@ object functions { /** * Removes duplicate values from the array. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_distinct(e: Column): Column = Column.fn("array_distinct", e) @@ -6137,7 +6150,7 @@ object functions { * Returns an array of the elements in the intersection of the given two arrays, without * duplicates. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_intersect(col1: Column, col2: Column): Column = @@ -6146,7 +6159,7 @@ object functions { /** * Adds an item into a given array at a specified position * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_insert(arr: Column, pos: Column, value: Column): Column = @@ -6155,7 +6168,7 @@ object functions { /** * Returns an array of the elements in the union of the given two arrays, without duplicates. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_union(col1: Column, col2: Column): Column = @@ -6165,7 +6178,7 @@ object functions { * Returns an array of the elements in the first array but not in the second array, without * duplicates. The order of elements in the result is not determined * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_except(col1: Column, col2: Column): Column = @@ -6174,7 +6187,7 @@ object functions { /** * Returns a string array of values within the nodes of xml that match the XPath expression. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath(xml: Column, path: Column): Column = @@ -6183,7 +6196,7 @@ object functions { /** * Returns true if the XPath expression evaluates to true, or if a matching node is found. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_boolean(xml: Column, path: Column): Column = @@ -6193,7 +6206,7 @@ object functions { * Returns a double value, the value zero if no match is found, or NaN if a match is found but * the value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_double(xml: Column, path: Column): Column = @@ -6203,7 +6216,7 @@ object functions { * Returns a double value, the value zero if no match is found, or NaN if a match is found but * the value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_number(xml: Column, path: Column): Column = @@ -6213,7 +6226,7 @@ object functions { * Returns a float value, the value zero if no match is found, or NaN if a match is found but * the value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_float(xml: Column, path: Column): Column = @@ -6223,7 +6236,7 @@ object functions { * Returns an integer value, or the value zero if no match is found, or a match is found but the * value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_int(xml: Column, path: Column): Column = @@ -6233,7 +6246,7 @@ object functions { * Returns a long integer value, or the value zero if no match is found, or a match is found but * the value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_long(xml: Column, path: Column): Column = @@ -6243,7 +6256,7 @@ object functions { * Returns a short integer value, or the value zero if no match is found, or a match is found * but the value is non-numeric. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_short(xml: Column, path: Column): Column = @@ -6252,7 +6265,7 @@ object functions { /** * Returns the text contents of the first xml node that matches the XPath expression. * - * @group "xml_funcs" + * @group xml_funcs * @since 3.5.0 */ def xpath_string(xml: Column, path: Column): Column = @@ -6603,7 +6616,7 @@ object functions { * name `col` for elements in the array and `key` and `value` for elements in the map unless * specified otherwise. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def explode(e: Column): Column = Column.fn("explode", e) @@ -6613,7 +6626,7 @@ object functions { * name `col` for elements in the array and `key` and `value` for elements in the map unless * specified otherwise. Unlike explode, if the array/map is null or empty then null is produced. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def explode_outer(e: Column): Column = Column.fn("explode_outer", e) @@ -6623,7 +6636,7 @@ object functions { * default column name `pos` for position, and `col` for elements in the array and `key` and * `value` for elements in the map unless specified otherwise. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def posexplode(e: Column): Column = Column.fn("posexplode", e) @@ -6634,7 +6647,7 @@ object functions { * `value` for elements in the map unless specified otherwise. Unlike posexplode, if the * array/map is null or empty then the row (null, null) is produced. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def posexplode_outer(e: Column): Column = Column.fn("posexplode_outer", e) @@ -6642,7 +6655,7 @@ object functions { /** * Creates a new row for each element in the given array of structs. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def inline(e: Column): Column = Column.fn("inline", e) @@ -6651,7 +6664,7 @@ object functions { * Creates a new row for each element in the given array of structs. Unlike inline, if the array * is null or empty then null is produced for each nested column. * - * @group collection_funcs + * @group generator_funcs * @since 3.4.0 */ def inline_outer(e: Column): Column = Column.fn("inline_outer", e) @@ -6660,7 +6673,7 @@ object functions { * Extracts json object from a json string based on json path specified, and returns json string * of the extracted json object. It will return null if the input json string is invalid. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def get_json_object(e: Column, path: String): Column = @@ -6669,7 +6682,7 @@ object functions { /** * Creates a new row for a json column according to the given field names. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -6693,7 +6706,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6716,7 +6729,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6739,7 +6752,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6762,7 +6775,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6779,7 +6792,7 @@ object functions { * @param schema * the schema to use when parsing the json string * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def from_json(e: Column, schema: StructType): Column = @@ -6795,7 +6808,7 @@ object functions { * @param schema * the schema to use when parsing the json string * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def from_json(e: Column, schema: DataType): Column = @@ -6817,7 +6830,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6841,7 +6854,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6861,7 +6874,7 @@ object functions { * @param schema * the schema to use when parsing the json string * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def from_json(e: Column, schema: Column): Column = { @@ -6884,7 +6897,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6924,7 +6937,7 @@ object functions { * @param json * a JSON string. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def schema_of_json(json: String): Column = schema_of_json(lit(json)) @@ -6935,7 +6948,7 @@ object functions { * @param json * a foldable string column containing a JSON string. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def schema_of_json(json: Column): Column = Column.fn("schema_of_json", json) @@ -6954,7 +6967,7 @@ object functions { * @return * a column with string literal containing schema in DDL format. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6976,7 +6989,7 @@ object functions { * Source Option in the version you use. Additionally the function supports the `pretty` * option which enables pretty JSON generation. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -6998,7 +7011,7 @@ object functions { * Source Option in the version you use. Additionally the function supports the `pretty` * option which enables pretty JSON generation. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -7012,7 +7025,7 @@ object functions { * @param e * a column containing a struct, an array or a map. * - * @group collection_funcs + * @group json_funcs * @since 3.4.0 */ def to_json(e: Column): Column = @@ -7035,7 +7048,7 @@ object functions { * ordering of the array elements. Null elements will be placed at the beginning of the returned * array. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def sort_array(e: Column): Column = sort_array(e, asc = true) @@ -7046,7 +7059,7 @@ object functions { * double/float type. Null elements will be placed at the beginning of the returned array in * ascending order or at the end of the returned array in descending order. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def sort_array(e: Column, asc: Boolean): Column = Column.fn("sort_array", e, lit(asc)) @@ -7055,7 +7068,7 @@ object functions { * Returns the minimum value in the array. NaN is greater than any non-NaN elements for * double/float type. NULL elements are skipped. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_min(e: Column): Column = Column.fn("array_min", e) @@ -7064,7 +7077,7 @@ object functions { * Returns the maximum value in the array. NaN is greater than any non-NaN elements for * double/float type. NULL elements are skipped. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_max(e: Column): Column = Column.fn("array_max", e) @@ -7075,7 +7088,7 @@ object functions { * @note * The function is non-deterministic. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def shuffle(e: Column): Column = Column.fn("shuffle", e, lit(SparkClassUtils.random.nextLong)) @@ -7090,7 +7103,7 @@ object functions { /** * Creates a single array from an array of arrays. If a structure of nested arrays is deeper * than two levels, only one level of nesting is removed. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def flatten(e: Column): Column = Column.fn("flatten", e) @@ -7098,7 +7111,7 @@ object functions { /** * Generate a sequence of integers from start to stop, incrementing by step. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def sequence(start: Column, stop: Column, step: Column): Column = @@ -7108,7 +7121,7 @@ object functions { * Generate a sequence of integers from start to stop, incrementing by 1 if start is less than * or equal to stop, otherwise -1. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def sequence(start: Column, stop: Column): Column = @@ -7118,7 +7131,7 @@ object functions { * Creates an array containing the left argument repeated the number of times given by the right * argument. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_repeat(left: Column, right: Column): Column = Column.fn("array_repeat", left, right) @@ -7127,14 +7140,14 @@ object functions { * Creates an array containing the left argument repeated the number of times given by the right * argument. * - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ def array_repeat(e: Column, count: Int): Column = array_repeat(e, lit(count)) /** * Returns true if the map contains the key. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ def map_contains_key(column: Column, key: Any): Column = @@ -7142,28 +7155,28 @@ object functions { /** * Returns an unordered array containing the keys of the map. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ def map_keys(e: Column): Column = Column.fn("map_keys", e) /** * Returns an unordered array containing the values of the map. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ def map_values(e: Column): Column = Column.fn("map_values", e) /** * Returns an unordered array of all entries in the given map. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ def map_entries(e: Column): Column = Column.fn("map_entries", e) /** * Returns a map created from the given array of entries. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ def map_from_entries(e: Column): Column = Column.fn("map_from_entries", e) @@ -7171,7 +7184,7 @@ object functions { /** * Returns a merged array of structs in which the N-th struct contains all N-th values of input * arrays. - * @group collection_funcs + * @group array_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -7179,7 +7192,7 @@ object functions { /** * Returns the union of all the given maps. - * @group collection_funcs + * @group map_funcs * @since 3.4.0 */ @scala.annotation.varargs @@ -7200,7 +7213,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -7222,7 +7235,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -7238,7 +7251,7 @@ object functions { * @param csv * a CSV string. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ def schema_of_csv(csv: String): Column = schema_of_csv(lit(csv)) @@ -7249,7 +7262,7 @@ object functions { * @param csv * a foldable string column containing a CSV string. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ def schema_of_csv(csv: Column): Column = schema_of_csv(csv, Collections.emptyMap()) @@ -7268,7 +7281,7 @@ object functions { * @return * a column with string literal containing schema in DDL format. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -7288,7 +7301,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data * Source Option in the version you use. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ // scalastyle:on line.size.limit @@ -7302,7 +7315,7 @@ object functions { * @param e * a column containing a struct. * - * @group collection_funcs + * @group csv_funcs * @since 3.4.0 */ def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap()) @@ -7321,12 +7334,12 @@ object functions { * See Data * Source Option in the version you use. - * @group collection_funcs + * @group xml_funcs * @since 4.0.0 */ // scalastyle:on line.size.limit def from_xml(e: Column, schema: StructType, options: java.util.Map[String, String]): Column = - from_xml(e, lit(schema.json), options.asScala.toIterator) + from_xml(e, lit(schema.json), options.asScala.iterator) // scalastyle:off line.size.limit @@ -7343,7 +7356,7 @@ object functions { * See Data * Source Option in the version you use. - * @group collection_funcs + * @group xml_funcs * @since 4.0.0 */ // scalastyle:on line.size.limit @@ -7366,7 +7379,7 @@ object functions { * a string column containing XML data. * @param schema * the schema to use when parsing the XML string - * @group collection_funcs + * @group xml_funcs * @since 4.0.0 */ // scalastyle:on line.size.limit @@ -7388,7 +7401,7 @@ object functions { * See Data * Source Option in the version you use. - * @group collection_funcs + * @group xml_funcs * * @since 4.0.0 */ @@ -7404,7 +7417,7 @@ object functions { * a string column containing XML data. * @param schema * the schema to use when parsing the XML string - * @group collection_funcs + * @group xml_funcs * * @since 4.0.0 */ @@ -7430,7 +7443,7 @@ object functions { * * @param xml * a foldable string column containing a XML string. - * @group collection_funcs + * @group xml_funcs * @since 4.0.0 */ def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml) @@ -7449,7 +7462,7 @@ object functions { * Source Option in the version you use. * @return * a column with string literal containing schema in DDL format. - * @group collection_funcs + * @group xml_funcs * @since 4.0.0 */ // scalastyle:on line.size.limit @@ -7460,7 +7473,7 @@ object functions { /** * Returns the total number of elements in the array. The function returns null for null input. * - * @group collection_funcs + * @group array_funcs * @since 3.5.0 */ def array_size(e: Column): Column = Column.fn("array_size", e) @@ -7481,7 +7494,7 @@ object functions { * Returns the number of elements in the outermost JSON array. `NULL` is returned in case of any * other valid JSON string, `NULL` or an invalid JSON. * - * @group collection_funcs + * @group json_funcs * @since 3.5.0 */ def json_array_length(e: Column): Column = Column.fn("json_array_length", e) @@ -7491,7 +7504,7 @@ object functions { * given, all the keys of the outermost object will be returned as an array. If it is any other * valid JSON string, an invalid JSON string or an empty string, the function returns null. * - * @group collection_funcs + * @group json_funcs * @since 3.5.0 */ def json_object_keys(e: Column): Column = Column.fn("json_object_keys", e) @@ -7836,7 +7849,7 @@ object functions { /** * Returns `col2` if `col1` is null, or `col1` otherwise. * - * @group predicates_funcs + * @group conditional_funcs * @since 3.5.0 */ def ifnull(col1: Column, col2: Column): Column = Column.fn("ifnull", col1, col2) @@ -7844,7 +7857,7 @@ object functions { /** * Returns true if `col` is not null, or false otherwise. * - * @group predicates_funcs + * @group predicate_funcs * @since 3.5.0 */ def isnotnull(col: Column): Column = Column.fn("isnotnull", col) @@ -7853,7 +7866,7 @@ object functions { * Returns same result as the EQUAL(=) operator for non-null operands, but returns true if both * are null, false if one of the them is null. * - * @group predicates_funcs + * @group predicate_funcs * @since 3.5.0 */ def equal_null(col1: Column, col2: Column): Column = Column.fn("equal_null", col1, col2) @@ -7861,7 +7874,7 @@ object functions { /** * Returns null if `col1` equals to `col2`, or `col1` otherwise. * - * @group predicates_funcs + * @group conditional_funcs * @since 3.5.0 */ def nullif(col1: Column, col2: Column): Column = Column.fn("nullif", col1, col2) @@ -7869,7 +7882,7 @@ object functions { /** * Returns `col2` if `col1` is null, or `col1` otherwise. * - * @group predicates_funcs + * @group conditional_funcs * @since 3.5.0 */ def nvl(col1: Column, col2: Column): Column = Column.fn("nvl", col1, col2) @@ -7877,7 +7890,7 @@ object functions { /** * Returns `col2` if `col1` is not null, or `col3` otherwise. * - * @group predicates_funcs + * @group conditional_funcs * @since 3.5.0 */ def nvl2(col1: Column, col2: Column, col3: Column): Column = Column.fn("nvl2", col1, col2, col3) @@ -8350,6 +8363,7 @@ object functions { * function name that follows the SQL identifier syntax (can be quoted, can be qualified) * @param cols * the expression parameters of function + * @group normal_funcs * @since 3.5.0 */ @scala.annotation.varargs diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 6d825f22b35fa..6e0a04cf4eb4d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -647,7 +647,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM test("Dataset result collection") { def checkResult(rows: IterableOnce[java.lang.Long], expectedValues: Long*): Unit = { - rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach { + rows.iterator.zipAll(expectedValues.iterator, null, null).foreach { case (actual, expected) => assert(actual === expected) } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 08496c36b28a2..ba5ecc7a045ac 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -63,15 +63,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( /** * Interrupt this sender and make it exit. */ - def interrupt(): Unit = executionObserver.synchronized { + def interrupt(): Unit = { interrupted = true - executionObserver.notifyAll() + wakeUp() } // For testing - private[connect] def setDeadline(deadlineMs: Long) = executionObserver.synchronized { + private[connect] def setDeadline(deadlineMs: Long) = { deadlineTimeMillis = deadlineMs - executionObserver.notifyAll() + wakeUp() } def run(lastConsumedStreamIndex: Long): Unit = { @@ -152,9 +152,6 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( s"lastConsumedStreamIndex=$lastConsumedStreamIndex") val startTime = System.nanoTime() - // register to be notified about available responses. - executionObserver.attachConsumer(this) - var nextIndex = lastConsumedStreamIndex + 1 var finished = false @@ -191,7 +188,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( sentResponsesSize > maximumResponseSize || deadlineTimeMillis < System.currentTimeMillis() logTrace(s"Trying to get next response with index=$nextIndex.") - executionObserver.synchronized { + executionObserver.responseLock.synchronized { logTrace(s"Acquired executionObserver lock.") val sleepStart = System.nanoTime() var sleepEnd = 0L @@ -208,7 +205,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( if (response.isEmpty) { val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis()) logTrace(s"Wait for response to become available with timeout=$timeout ms.") - executionObserver.wait(timeout) + executionObserver.responseLock.wait(timeout) logTrace(s"Reacquired executionObserver lock after waiting.") sleepEnd = System.nanoTime() } @@ -339,4 +336,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } } } + + private def wakeUp(): Unit = { + // Can be sleeping on either of these two locks, wake them up. + // (Neither of these locks is ever taken for extended period of time, so this won't block) + executionObserver.responseLock.synchronized { + executionObserver.responseLock.notifyAll() + } + grpcCallObserverReadySignal.synchronized { + grpcCallObserverReadySignal.notifyAll() + } + } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 859ec7e6b1983..e99e3a94f73a6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.connect.service.ExecuteHolder /** * This StreamObserver is running on the execution thread. Execution pushes responses to it, it * caches them. ExecuteResponseGRPCSender is the consumer of the responses ExecuteResponseObserver - * "produces". It waits on the monitor of ExecuteResponseObserver. New produced responses notify - * the monitor. + * "produces". It waits on the responseLock. New produced responses notify the responseLock. * @see * getResponse. * @@ -85,10 +84,12 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: private[connect] var highestConsumedIndex: Long = 0 /** - * Consumer that waits for available responses. There can be only one at a time, @see - * attachConsumer. + * Lock used for synchronization between responseObserver and grpcResponseSenders. * + * grpcResponseSenders wait on it for a new response to be available. * grpcResponseSenders also + * notify it to wake up when interrupted * responseObserver notifies it when new responses are + * available. */ - private var responseSender: Option[ExecuteGrpcResponseSender[T]] = None + private[connect] val responseLock = new Object() // Statistics about cached responses. private val cachedSizeUntilHighestConsumed = CachedSize() @@ -106,7 +107,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: 0 } - def onNext(r: T): Unit = synchronized { + def onNext(r: T): Unit = responseLock.synchronized { if (finalProducedIndex.nonEmpty) { throw new IllegalStateException("Stream onNext can't be called after stream completed") } @@ -125,10 +126,10 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: logDebug( s"Execution opId=${executeHolder.operationId} produced response " + s"responseId=${responseId} idx=$lastProducedIndex") - notifyAll() + responseLock.notifyAll() } - def onError(t: Throwable): Unit = synchronized { + def onError(t: Throwable): Unit = responseLock.synchronized { if (finalProducedIndex.nonEmpty) { throw new IllegalStateException("Stream onError can't be called after stream completed") } @@ -137,10 +138,10 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: logDebug( s"Execution opId=${executeHolder.operationId} produced error. " + s"Last stream index is $lastProducedIndex.") - notifyAll() + responseLock.notifyAll() } - def onCompleted(): Unit = synchronized { + def onCompleted(): Unit = responseLock.synchronized { if (finalProducedIndex.nonEmpty) { throw new IllegalStateException("Stream onCompleted can't be called after stream completed") } @@ -148,14 +149,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: logDebug( s"Execution opId=${executeHolder.operationId} completed stream. " + s"Last stream index is $lastProducedIndex.") - notifyAll() - } - - /** Attach a new consumer (ExecuteResponseGRPCSender). */ - def attachConsumer(newSender: ExecuteGrpcResponseSender[T]): Unit = synchronized { - // interrupt the current sender before attaching new one - responseSender.foreach(_.interrupt()) - responseSender = Some(newSender) + responseLock.notifyAll() } /** @@ -163,7 +157,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: * this response observer assumes that the response is consumed, and the response and previous * response can be uncached, keeping retryBufferSize of responses for the case of retries. */ - def consumeResponse(index: Long): Option[CachedStreamResponse[T]] = synchronized { + def consumeResponse(index: Long): Option[CachedStreamResponse[T]] = responseLock.synchronized { // we index stream responses from 1, getting a lower index would be invalid. assert(index >= 1) // it would be invalid if consumer would skip a response @@ -198,17 +192,17 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: } /** Get the stream error if there is one, otherwise None. */ - def getError(): Option[Throwable] = synchronized { + def getError(): Option[Throwable] = responseLock.synchronized { error } /** If the stream is finished, the index of the last response, otherwise None. */ - def getLastResponseIndex(): Option[Long] = synchronized { + def getLastResponseIndex(): Option[Long] = responseLock.synchronized { finalProducedIndex } /** Get the index in the stream for given response id. */ - def getResponseIndexById(responseId: String): Long = synchronized { + def getResponseIndexById(responseId: String): Long = responseLock.synchronized { responseIdToIndex.getOrElse( responseId, throw new SparkSQLException( @@ -217,7 +211,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: } /** Remove cached responses up to and including response with given id. */ - def removeResponsesUntilId(responseId: String): Unit = synchronized { + def removeResponsesUntilId(responseId: String): Unit = responseLock.synchronized { val index = getResponseIndexById(responseId) removeResponsesUntilIndex(index) logDebug( @@ -229,7 +223,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: } /** Remove all cached responses */ - def removeAll(): Unit = synchronized { + def removeAll(): Unit = responseLock.synchronized { removeResponsesUntilIndex(lastProducedIndex) logInfo( s"Release all for opId=${executeHolder.operationId}. Execution stats: " + @@ -242,7 +236,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: } /** Returns if the stream is finished. */ - def completed(): Boolean = synchronized { + def completed(): Boolean = responseLock.synchronized { finalProducedIndex.isDefined } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 96ed593e72ff2..ea2bbe0093fcd 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -195,11 +195,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends val responseObserver = executeHolder.responseObserver val command = request.getPlan.getCommand - val planner = new SparkConnectPlanner(executeHolder.sessionHolder) - planner.process( - command = command, - responseObserver = responseObserver, - executeHolder = executeHolder) + val planner = new SparkConnectPlanner(executeHolder) + planner.process(command = command, responseObserver = responseObserver) } private def requestString(request: Message) = { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index 546e4446d195b..ddad7da447557 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -56,7 +56,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) throw new IllegalStateException( s"Illegal operation type ${request.getPlan.getOpTypeCase} to be handled here.") } - val planner = new SparkConnectPlanner(sessionHolder) + val planner = new SparkConnectPlanner(executeHolder) val tracker = executeHolder.eventsManager.createQueryPlanningTracker val dataframe = Dataset.ofRows( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index eead5cb38ad85..fa964c02a253e 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -86,7 +86,18 @@ final case class InvalidCommandInput( private val cause: Throwable = null) extends Exception(message, cause) -class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { +class SparkConnectPlanner( + val sessionHolder: SessionHolder, + val executeHolderOpt: Option[ExecuteHolder] = None) + extends Logging { + + def this(executeHolder: ExecuteHolder) = { + this(executeHolder.sessionHolder, Some(executeHolder)) + } + + if (!executeHolderOpt.forall { e => e.sessionHolder == sessionHolder }) { + throw new IllegalArgumentException("executeHolder does not belong to sessionHolder") + } private[connect] def session: SparkSession = sessionHolder.session @@ -94,6 +105,10 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { private[connect] def sessionId: String = sessionHolder.sessionId + lazy val executeHolder = executeHolderOpt.getOrElse { + throw new IllegalArgumentException("executeHolder is not set") + } + private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) @@ -2461,48 +2476,39 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { def process( command: proto.Command, - responseObserver: StreamObserver[ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { command.getCommandTypeCase match { case proto.Command.CommandTypeCase.REGISTER_FUNCTION => - handleRegisterUserDefinedFunction(command.getRegisterFunction, executeHolder) + handleRegisterUserDefinedFunction(command.getRegisterFunction) case proto.Command.CommandTypeCase.REGISTER_TABLE_FUNCTION => - handleRegisterUserDefinedTableFunction(command.getRegisterTableFunction, executeHolder) + handleRegisterUserDefinedTableFunction(command.getRegisterTableFunction) case proto.Command.CommandTypeCase.WRITE_OPERATION => - handleWriteOperation(command.getWriteOperation, executeHolder) + handleWriteOperation(command.getWriteOperation) case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW => - handleCreateViewCommand(command.getCreateDataframeView, executeHolder) + handleCreateViewCommand(command.getCreateDataframeView) case proto.Command.CommandTypeCase.WRITE_OPERATION_V2 => - handleWriteOperationV2(command.getWriteOperationV2, executeHolder) + handleWriteOperationV2(command.getWriteOperationV2) case proto.Command.CommandTypeCase.EXTENSION => - handleCommandPlugin(command.getExtension, executeHolder) + handleCommandPlugin(command.getExtension) case proto.Command.CommandTypeCase.SQL_COMMAND => - handleSqlCommand(command.getSqlCommand, responseObserver, executeHolder) + handleSqlCommand(command.getSqlCommand, responseObserver) case proto.Command.CommandTypeCase.WRITE_STREAM_OPERATION_START => - handleWriteStreamOperationStart( - command.getWriteStreamOperationStart, - responseObserver, - executeHolder) + handleWriteStreamOperationStart(command.getWriteStreamOperationStart, responseObserver) case proto.Command.CommandTypeCase.STREAMING_QUERY_COMMAND => - handleStreamingQueryCommand( - command.getStreamingQueryCommand, - responseObserver, - executeHolder) + handleStreamingQueryCommand(command.getStreamingQueryCommand, responseObserver) case proto.Command.CommandTypeCase.STREAMING_QUERY_MANAGER_COMMAND => handleStreamingQueryManagerCommand( command.getStreamingQueryManagerCommand, - responseObserver, - executeHolder) + responseObserver) case proto.Command.CommandTypeCase.GET_RESOURCES_COMMAND => - handleGetResourcesCommand(responseObserver, executeHolder) + handleGetResourcesCommand(responseObserver) case _ => throw new UnsupportedOperationException(s"$command not supported.") } } def handleSqlCommand( getSqlCommand: SqlCommand, - responseObserver: StreamObserver[ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { // Eagerly execute commands of the provided SQL string. val args = getSqlCommand.getArgsMap val namedArguments = getSqlCommand.getNamedArgumentsMap @@ -2600,8 +2606,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { } private def handleRegisterUserDefinedFunction( - fun: proto.CommonInlineUserDefinedFunction, - executeHolder: ExecuteHolder): Unit = { + fun: proto.CommonInlineUserDefinedFunction): Unit = { fun.getFunctionCase match { case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF => handleRegisterPythonUDF(fun) @@ -2617,8 +2622,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { } private def handleRegisterUserDefinedTableFunction( - fun: proto.CommonInlineUserDefinedTableFunction, - executeHolder: ExecuteHolder): Unit = { + fun: proto.CommonInlineUserDefinedTableFunction): Unit = { fun.getFunctionCase match { case proto.CommonInlineUserDefinedTableFunction.FunctionCase.PYTHON_UDTF => val function = createPythonUserDefinedTableFunction(fun) @@ -2685,7 +2689,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { session.udf.register(fun.getFunctionName, udf) } - private def handleCommandPlugin(extension: ProtoAny, executeHolder: ExecuteHolder): Unit = { + private def handleCommandPlugin(extension: ProtoAny): Unit = { SparkConnectPluginRegistry.commandRegistry // Lazily traverse the collection. .view @@ -2698,9 +2702,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { executeHolder.eventsManager.postFinished() } - private def handleCreateViewCommand( - createView: proto.CreateDataFrameViewCommand, - executeHolder: ExecuteHolder): Unit = { + private def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): Unit = { val viewType = if (createView.getIsGlobal) GlobalTempView else LocalTempView val tableIdentifier = @@ -2736,9 +2738,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { * * @param writeOperation */ - private def handleWriteOperation( - writeOperation: proto.WriteOperation, - executeHolder: ExecuteHolder): Unit = { + private def handleWriteOperation(writeOperation: proto.WriteOperation): Unit = { // Transform the input plan into the logical plan. val plan = transformRelation(writeOperation.getInput) // And create a Dataset from the plan. @@ -2810,9 +2810,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { * * @param writeOperation */ - def handleWriteOperationV2( - writeOperation: proto.WriteOperationV2, - executeHolder: ExecuteHolder): Unit = { + def handleWriteOperationV2(writeOperation: proto.WriteOperationV2): Unit = { // Transform the input plan into the logical plan. val plan = transformRelation(writeOperation.getInput) // And create a Dataset from the plan. @@ -2873,8 +2871,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { def handleWriteStreamOperationStart( writeOp: WriteStreamOperationStart, - responseObserver: StreamObserver[ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { val plan = transformRelation(writeOp.getInput) val tracker = executeHolder.eventsManager.createQueryPlanningTracker val dataset = Dataset.ofRows(session, plan, tracker) @@ -2999,8 +2996,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { def handleStreamingQueryCommand( command: StreamingQueryCommand, - responseObserver: StreamObserver[ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { val id = command.getQueryId.getId val runId = command.getQueryId.getRunId @@ -3177,8 +3173,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { def handleStreamingQueryManagerCommand( command: StreamingQueryManagerCommand, - responseObserver: StreamObserver[ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { val respBuilder = StreamingQueryManagerCommandResult.newBuilder() command.getCommandCase match { @@ -3257,8 +3252,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { } def handleGetResourcesCommand( - responseObserver: StreamObserver[proto.ExecutePlanResponse], - executeHolder: ExecuteHolder): Unit = { + responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = { executeHolder.eventsManager.postFinished() responseObserver.onNext( proto.ExecutePlanResponse diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 0593edc2f6fda..eed8cc01f7c66 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -164,6 +164,10 @@ private[connect] class ExecuteHolder( private def addGrpcResponseSender( sender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse]) = synchronized { if (closedTime.isEmpty) { + // Interrupt all other senders - there can be only one active sender. + // Interrupted senders will remove themselves with removeGrpcResponseSender when they exit. + grpcResponseSenders.foreach(_.interrupt()) + // And add this one. grpcResponseSenders += sender lastAttachedRpcTime = None } else { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index eb84dfc4e3df8..dfada825df47d 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -58,8 +58,8 @@ trait SparkConnectPlanTest extends SharedSparkSession { def transform(cmd: proto.Command): Unit = { val executeHolder = buildExecutePlanHolder(cmd) - new SparkConnectPlanner(executeHolder.sessionHolder) - .process(cmd, new MockObserver(), executeHolder) + new SparkConnectPlanner(executeHolder) + .process(cmd, new MockObserver()) } def readRel: proto.Relation = @@ -148,7 +148,7 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { test("Simple Limit") { assertThrows[IndexOutOfBoundsException] { - new SparkConnectPlanner(None.orNull) + new SparkConnectPlanner(SessionHolder.forTesting(None.orNull)) .transformRelation( proto.Relation.newBuilder .setLimit(proto.Limit.newBuilder.setLimit(10)) @@ -159,10 +159,11 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { test("InvalidInputs") { // No Relation Set intercept[IndexOutOfBoundsException]( - new SparkConnectPlanner(None.orNull).transformRelation(proto.Relation.newBuilder().build())) + new SparkConnectPlanner(SessionHolder.forTesting(None.orNull)) + .transformRelation(proto.Relation.newBuilder().build())) intercept[InvalidPlanInput]( - new SparkConnectPlanner(None.orNull) + new SparkConnectPlanner(SessionHolder.forTesting(None.orNull)) .transformRelation( proto.Relation.newBuilder.setUnknown(proto.Unknown.newBuilder().build()).build())) } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala index fdb9032379419..ea9ae3ed9d9c1 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala @@ -196,8 +196,8 @@ class SparkConnectPluginRegistrySuite extends SharedSparkSession with SparkConne .build() val executeHolder = buildExecutePlanHolder(plan) - new SparkConnectPlanner(executeHolder.sessionHolder) - .process(plan, new MockObserver(), executeHolder) + new SparkConnectPlanner(executeHolder) + .process(plan, new MockObserver()) assert(spark.sparkContext.getLocalProperty("testingProperty").equals("Martin")) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index cc4370ad02e06..5c1887be5b8b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -153,7 +153,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
  • Drivers: {state.activeDrivers.length} Running ({state.activeDrivers.count(_.state == DriverState.SUBMITTED)} Waiting), - {state.completedDrivers.length} Completed
  • + {state.completedDrivers.length} Completed + ({state.completedDrivers.count(_.state == DriverState.KILLED)} Killed, + {state.completedDrivers.count(_.state == DriverState.FAILED)} Failed, + {state.completedDrivers.count(_.state == DriverState.ERROR)} Error) +
  • Status: {state.status}
  • diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 95d2bdc39e133..856e639fcd9ae 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -18,6 +18,7 @@ package org.apache.spark.serializer import java.io._ +import java.lang.reflect.{InvocationHandler, Method, Proxy} import java.nio.ByteBuffer import scala.reflect.ClassTag @@ -79,7 +80,7 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa // scalastyle:off classforname val resolved = ifaces.map(iface => Class.forName(iface, false, loader)) // scalastyle:on classforname - java.lang.reflect.Proxy.getProxyClass(loader, resolved: _*) + Proxy.newProxyInstance(loader, resolved, DummyInvocationHandler).getClass } } @@ -88,6 +89,12 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa def close(): Unit = { objIn.close() } } +private[spark] object DummyInvocationHandler extends InvocationHandler { + override def invoke(proxy: Any, method: Method, args: Array[AnyRef]): AnyRef = { + throw new UnsupportedOperationException("Not implemented") + } +} + private object JavaDeserializationStream { val primitiveMappings = Map[String, Class[_]]( diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 436899448d633..1695f06c35be9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -56,7 +56,7 @@ private[spark] object Utils extends SparkCollectionUtils { override def compare(l: T, r: T): Int = ord.compare(l, r) } GuavaIterators.mergeSorted( - inputs.map(_.toIterator.asJava).asJava, ordering).asScala + inputs.map(_.iterator.asJava).asJava, ordering).asScala } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 44dc9a5f97dab..842a26148b4e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -161,7 +161,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { def resultHandler(x: Int, y: Unit): Unit = {} val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully, - 0 until rdd.partitions.size, resultHandler, () => ()) + 0 until rdd.partitions.size, resultHandler, ()) // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. intercept[TimeoutException] { diff --git a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala index 29421f7aa9e36..297e4fd53ab4f 100644 --- a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala @@ -57,13 +57,13 @@ class CompletionIteratorSuite extends SparkFunSuite { sub = null iter.toArray - for (_ <- 1 to 100 if !ref.isEnqueued) { + for (_ <- 1 to 100 if !ref.refersTo(null)) { System.gc() - if (!ref.isEnqueued) { + if (!ref.refersTo(null)) { Thread.sleep(10) } } - assert(ref.isEnqueued) + assert(ref.refersTo(null)) assert(refQueue.poll() === ref) } } diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 774422a9cd9d1..9fb823abaa3ab 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2385,6 +2385,11 @@ Here are the configs regarding to RocksDB instance of the state store provider: Total memory to be occupied by blocks in high priority pool as a fraction of memory allocated across all RocksDB instances on a single node using maxMemoryUsageMB. 0.1 + + spark.sql.streaming.stateStore.rocksdb.allowFAllocate + Allow the rocksdb runtime to use fallocate to pre-allocate disk space for logs, etc... Disable for apps that have many smaller state stores to trade off disk space for write performance. + true + ##### RocksDB State Store Memory Management diff --git a/pom.xml b/pom.xml index 3e43bc047079d..e3a19257c8c12 100644 --- a/pom.xml +++ b/pom.xml @@ -2970,9 +2970,6 @@ -Wconf:cat=scaladoc:wv -Wconf:cat=lint-multiarg-infix:wv -Wconf:cat=other-nullary-override:wv - -Wconf:cat=other-match-analysis&site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv - -Wconf:cat=other-pure-statement&site=org.apache.spark.streaming.util.FileBasedWriteAheadLog.readAll.readFile:wv - -Wconf:cat=other-pure-statement&site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite:wv