From 528758059e674333ac4ca9b783d5adce7d61248d Mon Sep 17 00:00:00 2001 From: DanialJavady96 <154250392+DanialJavady96@users.noreply.github.com> Date: Tue, 30 Apr 2024 10:35:53 -0400 Subject: [PATCH] Refactor joins for conditional semis and antis (#14646) Add a new kernel to be used for both semi and anti joins. Add some new device functions for adding only one array of shared_memory for caching. Tests pass on my 3080. Authors: - https://github.com/DanialJavady96 - Danial Javady (https://github.com/ZelboK) - Yunsong Wang (https://github.com/PointKernel) - Bradley Dice (https://github.com/bdice) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/14646 --- cpp/src/join/conditional_join.cu | 149 +++++++++++++++++----- cpp/src/join/conditional_join_kernels.cuh | 94 ++++++++++++++ cpp/src/join/join_common_utils.cuh | 39 +++++- 3 files changed, 249 insertions(+), 33 deletions(-) diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu index 095093d08e5..f02dee5f7f5 100644 --- a/cpp/src/join/conditional_join.cu +++ b/cpp/src/join/conditional_join.cu @@ -37,6 +37,99 @@ namespace cudf { namespace detail { +std::unique_ptr> conditional_join_anti_semi( + table_view const& left, + table_view const& right, + ast::expression const& binary_predicate, + join_kind join_type, + std::optional output_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (right.num_rows() == 0) { + switch (join_type) { + case join_kind::LEFT_ANTI_JOIN: + return std::make_unique>(left.num_rows(), stream, mr); + case join_kind::LEFT_SEMI_JOIN: + return std::make_unique>(0, stream, mr); + default: CUDF_FAIL("Invalid join kind."); break; + } + } else if (left.num_rows() == 0) { + switch (join_type) { + case join_kind::LEFT_ANTI_JOIN: [[fallthrough]]; + case join_kind::LEFT_SEMI_JOIN: + return std::make_unique>(0, stream, mr); + default: CUDF_FAIL("Invalid join kind."); break; + } + } + + auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream); + + auto const parser = + ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr}; + CUDF_EXPECTS(parser.output_type().id() == type_id::BOOL8, + "The expression must produce a Boolean output."); + + auto left_table = table_device_view::create(left, stream); + auto right_table = table_device_view::create(right, stream); + + detail::grid_1d const config(left.num_rows(), DEFAULT_JOIN_BLOCK_SIZE); + auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block; + + // TODO: Remove the output_size parameter. It is not needed because the + // output size is bounded by the size of the left table. + std::size_t join_size; + if (output_size.has_value()) { + join_size = *output_size; + } else { + // Allocate storage for the counter used to get the size of the join output + rmm::device_scalar size(0, stream, mr); + if (has_nulls) { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, join_type, parser.device_expression_data, false, size.data()); + } else { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, join_type, parser.device_expression_data, false, size.data()); + } + join_size = size.value(stream); + } + + if (left.num_rows() == 0) { + return std::make_unique>(0, stream, mr); + } + + rmm::device_scalar write_index(0, stream); + + auto left_indices = std::make_unique>(join_size, stream, mr); + + auto const& join_output_l = left_indices->data(); + + if (has_nulls) { + conditional_join_anti_semi + <<>>( + *left_table, + *right_table, + join_type, + join_output_l, + write_index.data(), + parser.device_expression_data, + join_size); + } else { + conditional_join_anti_semi + <<>>( + *left_table, + *right_table, + join_type, + join_output_l, + write_index.data(), + parser.device_expression_data, + join_size); + } + return left_indices; +} + std::pair>, std::unique_ptr>> conditional_join(table_view const& left, @@ -50,9 +143,7 @@ conditional_join(table_view const& left, // We can immediately filter out cases where the right table is empty. In // some cases, we return all the rows of the left table with a corresponding // null index for the right table; in others, we return an empty output. - auto right_num_rows{right.num_rows()}; - auto left_num_rows{left.num_rows()}; - if (right_num_rows == 0) { + if (right.num_rows() == 0) { switch (join_type) { // Left, left anti, and full all return all the row indices from left // with a corresponding NULL from the right. @@ -67,7 +158,7 @@ conditional_join(table_view const& left, std::make_unique>(0, stream, mr)); default: CUDF_FAIL("Invalid join kind."); break; } - } else if (left_num_rows == 0) { + } else if (left.num_rows() == 0) { switch (join_type) { // Left, left anti, left semi, and inner joins all return empty sets. case join_kind::LEFT_JOIN: @@ -101,8 +192,8 @@ conditional_join(table_view const& left, // For inner joins we support optimizing the join by launching one thread for // whichever table is larger rather than always using the left table. - auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows); - detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows, + auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows()); + detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(), DEFAULT_JOIN_BLOCK_SIZE); auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block; join_kind const kernel_join_type = @@ -187,7 +278,7 @@ conditional_join(table_view const& left, // by any row in the left table. if (join_type == join_kind::FULL_JOIN) { auto complement_indices = detail::get_left_join_indices_complement( - join_indices.second, left_num_rows, right_num_rows, stream, mr); + join_indices.second, left.num_rows(), right.num_rows(), stream, mr); join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); } return join_indices; @@ -210,21 +301,19 @@ std::size_t compute_conditional_join_output_size(table_view const& left, // We can immediately filter out cases where one table is empty. In // some cases, we return all the rows of the other table with a corresponding // null index for the empty table; in others, we return an empty output. - auto right_num_rows{right.num_rows()}; - auto left_num_rows{left.num_rows()}; - if (right_num_rows == 0) { + if (right.num_rows() == 0) { switch (join_type) { // Left, left anti, and full all return all the row indices from left // with a corresponding NULL from the right. case join_kind::LEFT_JOIN: case join_kind::LEFT_ANTI_JOIN: - case join_kind::FULL_JOIN: return left_num_rows; + case join_kind::FULL_JOIN: return left.num_rows(); // Inner and left semi joins return empty output because no matches can exist. case join_kind::INNER_JOIN: case join_kind::LEFT_SEMI_JOIN: return 0; default: CUDF_FAIL("Invalid join kind."); break; } - } else if (left_num_rows == 0) { + } else if (left.num_rows() == 0) { switch (join_type) { // Left, left anti, left semi, and inner joins all return empty sets. case join_kind::LEFT_JOIN: @@ -232,7 +321,7 @@ std::size_t compute_conditional_join_output_size(table_view const& left, case join_kind::INNER_JOIN: case join_kind::LEFT_SEMI_JOIN: return 0; // Full joins need to return the trivial complement. - case join_kind::FULL_JOIN: return right_num_rows; + case join_kind::FULL_JOIN: return right.num_rows(); default: CUDF_FAIL("Invalid join kind."); break; } } @@ -254,8 +343,8 @@ std::size_t compute_conditional_join_output_size(table_view const& left, // For inner joins we support optimizing the join by launching one thread for // whichever table is larger rather than always using the left table. - auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows); - detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows, + auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows()); + detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(), DEFAULT_JOIN_BLOCK_SIZE); auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block; @@ -349,14 +438,13 @@ std::unique_ptr> conditional_left_semi_join( rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return std::move(detail::conditional_join(left, - right, - binary_predicate, - detail::join_kind::LEFT_SEMI_JOIN, - output_size, - cudf::get_default_stream(), - mr) - .first); + return std::move(detail::conditional_join_anti_semi(left, + right, + binary_predicate, + detail::join_kind::LEFT_SEMI_JOIN, + output_size, + cudf::get_default_stream(), + mr)); } std::unique_ptr> conditional_left_anti_join( @@ -367,14 +455,13 @@ std::unique_ptr> conditional_left_anti_join( rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return std::move(detail::conditional_join(left, - right, - binary_predicate, - detail::join_kind::LEFT_ANTI_JOIN, - output_size, - cudf::get_default_stream(), - mr) - .first); + return std::move(detail::conditional_join_anti_semi(left, + right, + binary_predicate, + detail::join_kind::LEFT_ANTI_JOIN, + output_size, + cudf::get_default_stream(), + mr)); } std::size_t conditional_inner_join_size(table_view const& left, diff --git a/cpp/src/join/conditional_join_kernels.cuh b/cpp/src/join/conditional_join_kernels.cuh index cc57fa7b03b..5e190eb2b27 100644 --- a/cpp/src/join/conditional_join_kernels.cuh +++ b/cpp/src/join/conditional_join_kernels.cuh @@ -271,6 +271,100 @@ CUDF_KERNEL void conditional_join(table_device_view left_table, } } +template +CUDF_KERNEL void conditional_join_anti_semi( + table_device_view left_table, + table_device_view right_table, + join_kind join_type, + cudf::size_type* join_output_l, + cudf::size_type* current_idx, + cudf::ast::detail::expression_device_view device_expression_data, + cudf::size_type const max_size) +{ + constexpr int num_warps = block_size / detail::warp_size; + __shared__ cudf::size_type current_idx_shared[num_warps]; + __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; + + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = + &intermediate_storage[threadIdx.x * device_expression_data.num_intermediates]; + + int const warp_id = threadIdx.x / detail::warp_size; + int const lane_id = threadIdx.x % detail::warp_size; + cudf::thread_index_type const outer_num_rows = left_table.num_rows(); + cudf::thread_index_type const inner_num_rows = right_table.num_rows(); + auto const stride = cudf::detail::grid_1d::grid_stride(); + auto const start_idx = cudf::detail::grid_1d::global_thread_id(); + + if (0 == lane_id) { current_idx_shared[warp_id] = 0; } + + __syncwarp(); + + unsigned int const activemask = __ballot_sync(0xffff'ffffu, start_idx < outer_num_rows); + + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, device_expression_data); + + for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows; + outer_row_index += stride) { + bool found_match = false; + for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows; + ++inner_row_index) { + auto output_dest = cudf::ast::detail::value_expression_result(); + + evaluator.evaluate( + output_dest, outer_row_index, inner_row_index, 0, thread_intermediate_storage); + + if (output_dest.is_valid() && output_dest.value()) { + if (join_type == join_kind::LEFT_SEMI_JOIN && !found_match) { + add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]); + } + found_match = true; + } + + __syncwarp(activemask); + + auto const do_flush = current_idx_shared[warp_id] + detail::warp_size >= output_cache_size; + auto const flush_mask = __ballot_sync(activemask, do_flush); + if (do_flush) { + flush_output_cache(flush_mask, + max_size, + warp_id, + lane_id, + current_idx, + current_idx_shared, + join_shared_l, + join_output_l); + __syncwarp(flush_mask); + if (0 == lane_id) { current_idx_shared[warp_id] = 0; } + } + __syncwarp(activemask); + } + + if ((join_type == join_kind::LEFT_ANTI_JOIN) && (!found_match)) { + add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]); + } + + __syncwarp(activemask); + + auto const do_flush = current_idx_shared[warp_id] > 0; + auto const flush_mask = __ballot_sync(activemask, do_flush); + if (do_flush) { + flush_output_cache(flush_mask, + max_size, + warp_id, + lane_id, + current_idx, + current_idx_shared, + join_shared_l, + join_output_l); + } + if (found_match) break; + } +} + } // namespace detail } // namespace cudf diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index 9758919c5b4..31f267d5cfb 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -281,12 +281,21 @@ __inline__ __device__ void add_pair_to_cache(size_type const first, size_type* joined_shared_r) { size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; - // its guaranteed to fit into the shared cache joined_shared_l[my_current_idx] = first; joined_shared_r[my_current_idx] = second; } +__inline__ __device__ void add_left_to_cache(size_type const first, + size_type* current_idx_shared, + int const warp_id, + size_type* joined_shared_l) +{ + size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; + + joined_shared_l[my_current_idx] = first; +} + template __device__ void flush_output_cache(unsigned int const activemask, cudf::size_type const max_size, @@ -300,7 +309,7 @@ __device__ void flush_output_cache(unsigned int const activemask, size_type* join_output_r) { // count how many active threads participating here which could be less than warp_size - int num_threads = __popc(activemask); + int const num_threads = __popc(activemask); cudf::size_type output_offset = 0; if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } @@ -322,6 +331,32 @@ __device__ void flush_output_cache(unsigned int const activemask, } } +template +__device__ void flush_output_cache(unsigned int const activemask, + cudf::size_type const max_size, + int const warp_id, + int const lane_id, + cudf::size_type* current_idx, + cudf::size_type current_idx_shared[num_warps], + size_type join_shared_l[num_warps][output_cache_size], + size_type* join_output_l) +{ + int const num_threads = __popc(activemask); + cudf::size_type output_offset = 0; + + if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } + + output_offset = cub::ShuffleIndex(output_offset, 0, activemask); + + for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id]; + shared_out_idx += num_threads) { + cudf::size_type thread_offset = output_offset + shared_out_idx; + if (thread_offset < max_size) { + join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; + } + } +} + } // namespace detail } // namespace cudf