Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into deprecate-to-from-dask-dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora authored Apr 30, 2024
2 parents d0c2b12 + 5287580 commit dd346dd
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 33 deletions.
149 changes: 118 additions & 31 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,99 @@
namespace cudf {
namespace detail {

std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind join_type,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (right.num_rows() == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left.num_rows() == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN: [[fallthrough]];
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
}

auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream);

auto const parser =
ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr};
CUDF_EXPECTS(parser.output_type().id() == type_id::BOOL8,
"The expression must produce a Boolean output.");

auto left_table = table_device_view::create(left, stream);
auto right_table = table_device_view::create(right, stream);

detail::grid_1d const config(left.num_rows(), DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

// TODO: Remove the output_size parameter. It is not needed because the
// output size is bounded by the size of the left table.
std::size_t join_size;
if (output_size.has_value()) {
join_size = *output_size;
} else {
// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);
if (has_nulls) {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
} else {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
}
join_size = size.value(stream);
}

if (left.num_rows() == 0) {
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
}

rmm::device_scalar<size_type> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

auto const& join_output_l = left_indices->data();

if (has_nulls) {
conditional_join_anti_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
} else {
conditional_join_anti_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
}
return left_indices;
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
conditional_join(table_view const& left,
Expand All @@ -50,9 +143,7 @@ conditional_join(table_view const& left,
// We can immediately filter out cases where the right table is empty. In
// some cases, we return all the rows of the left table with a corresponding
// null index for the right table; in others, we return an empty output.
auto right_num_rows{right.num_rows()};
auto left_num_rows{left.num_rows()};
if (right_num_rows == 0) {
if (right.num_rows() == 0) {
switch (join_type) {
// Left, left anti, and full all return all the row indices from left
// with a corresponding NULL from the right.
Expand All @@ -67,7 +158,7 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left_num_rows == 0) {
} else if (left.num_rows() == 0) {
switch (join_type) {
// Left, left anti, left semi, and inner joins all return empty sets.
case join_kind::LEFT_JOIN:
Expand Down Expand Up @@ -101,8 +192,8 @@ conditional_join(table_view const& left,

// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows());
detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(),
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;
join_kind const kernel_join_type =
Expand Down Expand Up @@ -187,7 +278,7 @@ conditional_join(table_view const& left,
// by any row in the left table.
if (join_type == join_kind::FULL_JOIN) {
auto complement_indices = detail::get_left_join_indices_complement(
join_indices.second, left_num_rows, right_num_rows, stream, mr);
join_indices.second, left.num_rows(), right.num_rows(), stream, mr);
join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream);
}
return join_indices;
Expand All @@ -210,29 +301,27 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// We can immediately filter out cases where one table is empty. In
// some cases, we return all the rows of the other table with a corresponding
// null index for the empty table; in others, we return an empty output.
auto right_num_rows{right.num_rows()};
auto left_num_rows{left.num_rows()};
if (right_num_rows == 0) {
if (right.num_rows() == 0) {
switch (join_type) {
// Left, left anti, and full all return all the row indices from left
// with a corresponding NULL from the right.
case join_kind::LEFT_JOIN:
case join_kind::LEFT_ANTI_JOIN:
case join_kind::FULL_JOIN: return left_num_rows;
case join_kind::FULL_JOIN: return left.num_rows();
// Inner and left semi joins return empty output because no matches can exist.
case join_kind::INNER_JOIN:
case join_kind::LEFT_SEMI_JOIN: return 0;
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left_num_rows == 0) {
} else if (left.num_rows() == 0) {
switch (join_type) {
// Left, left anti, left semi, and inner joins all return empty sets.
case join_kind::LEFT_JOIN:
case join_kind::LEFT_ANTI_JOIN:
case join_kind::INNER_JOIN:
case join_kind::LEFT_SEMI_JOIN: return 0;
// Full joins need to return the trivial complement.
case join_kind::FULL_JOIN: return right_num_rows;
case join_kind::FULL_JOIN: return right.num_rows();
default: CUDF_FAIL("Invalid join kind."); break;
}
}
Expand All @@ -254,8 +343,8 @@ std::size_t compute_conditional_join_output_size(table_view const& left,

// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows());
detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(),
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

Expand Down Expand Up @@ -349,14 +438,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_semi_join(
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
Expand All @@ -367,14 +455,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::size_t conditional_inner_join_size(table_view const& left,
Expand Down
94 changes: 94 additions & 0 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,100 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,
}
}

template <cudf::size_type block_size, cudf::size_type output_cache_size, bool has_nulls>
CUDF_KERNEL void conditional_join_anti_semi(
table_device_view left_table,
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
cudf::ast::detail::IntermediateDataType<has_nulls>* intermediate_storage =
reinterpret_cast<cudf::ast::detail::IntermediateDataType<has_nulls>*>(raw_intermediate_storage);
auto thread_intermediate_storage =
&intermediate_storage[threadIdx.x * device_expression_data.num_intermediates];

int const warp_id = threadIdx.x / detail::warp_size;
int const lane_id = threadIdx.x % detail::warp_size;
cudf::thread_index_type const outer_num_rows = left_table.num_rows();
cudf::thread_index_type const inner_num_rows = right_table.num_rows();
auto const stride = cudf::detail::grid_1d::grid_stride();
auto const start_idx = cudf::detail::grid_1d::global_thread_id();

if (0 == lane_id) { current_idx_shared[warp_id] = 0; }

__syncwarp();

unsigned int const activemask = __ballot_sync(0xffff'ffffu, start_idx < outer_num_rows);

auto evaluator = cudf::ast::detail::expression_evaluator<has_nulls>(
left_table, right_table, device_expression_data);

for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows;
outer_row_index += stride) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

evaluator.evaluate(
output_dest, outer_row_index, inner_row_index, 0, thread_intermediate_storage);

if (output_dest.is_valid() && output_dest.value()) {
if (join_type == join_kind::LEFT_SEMI_JOIN && !found_match) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}
found_match = true;
}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] + detail::warp_size >= output_cache_size;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_output_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
__syncwarp(flush_mask);
if (0 == lane_id) { current_idx_shared[warp_id] = 0; }
}
__syncwarp(activemask);
}

if ((join_type == join_kind::LEFT_ANTI_JOIN) && (!found_match)) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] > 0;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_output_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
}
if (found_match) break;
}
}

} // namespace detail

} // namespace cudf
39 changes: 37 additions & 2 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,21 @@ __inline__ __device__ void add_pair_to_cache(size_type const first,
size_type* joined_shared_r)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

// its guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
Expand All @@ -300,7 +309,7 @@ __device__ void flush_output_cache(unsigned int const activemask,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int num_threads = __popc(activemask);
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }
Expand All @@ -322,6 +331,32 @@ __device__ void flush_output_cache(unsigned int const activemask,
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

} // namespace detail

} // namespace cudf

0 comments on commit dd346dd

Please sign in to comment.