Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into add_fabric_handles
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina authored Dec 9, 2024
2 parents 8d27a92 + 0f5d4b9 commit 63340ef
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 93 deletions.
13 changes: 8 additions & 5 deletions cpp/src/strings/search/find.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <cooperative_groups.h>
#include <cuda/atomic>
#include <thrust/binary_search.h>
#include <thrust/fill.h>
Expand Down Expand Up @@ -347,13 +348,15 @@ CUDF_KERNEL void contains_warp_parallel_fn(column_device_view const d_strings,
string_view const d_target,
bool* d_results)
{
auto const idx = cudf::detail::grid_1d::global_thread_id();
using warp_reduce = cub::WarpReduce<bool>;
__shared__ typename warp_reduce::TempStorage temp_storage;
auto const idx = cudf::detail::grid_1d::global_thread_id();

auto const str_idx = idx / cudf::detail::warp_size;
if (str_idx >= d_strings.size()) { return; }
auto const lane_idx = idx % cudf::detail::warp_size;

namespace cg = cooperative_groups;
auto const warp = cg::tiled_partition<cudf::detail::warp_size>(cg::this_thread_block());
auto const lane_idx = warp.thread_rank();

if (d_strings.is_null(str_idx)) { return; }
// get the string for this warp
auto const d_str = d_strings.element<string_view>(str_idx);
Expand All @@ -373,7 +376,7 @@ CUDF_KERNEL void contains_warp_parallel_fn(column_device_view const d_strings,
}
}

auto const result = warp_reduce(temp_storage).Reduce(found, cub::Max());
auto const result = warp.any(found);
if (lane_idx == 0) { d_results[str_idx] = result; }
}

Expand Down
6 changes: 2 additions & 4 deletions python/cudf/cudf/_lib/io/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ from pylibcudf.libcudf.io.types cimport (
from cudf._lib.column cimport Column


cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & data) except*
cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except*
cdef add_df_col_struct_names(
df,
child_names_dict
Expand All @@ -26,7 +23,8 @@ cdef update_col_struct_field_names(
)
cdef update_struct_field_names(
table,
vector[column_name_info]& schema_info)
vector[column_name_info]& schema_info
)
cdef Column update_column_struct_field_names(
Column col,
column_name_info& info
Expand Down
87 changes: 3 additions & 84 deletions python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
@@ -1,97 +1,16 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from cpython.buffer cimport PyBUF_READ
from cpython.memoryview cimport PyMemoryView_FromMemory
from libcpp.memory cimport unique_ptr

from libcpp.string cimport string
from libcpp.utility cimport move

from libcpp.vector cimport vector

from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport (
column_name_info,
sink_info,
)
from pylibcudf.libcudf.io.types cimport column_name_info

from cudf._lib.column cimport Column

import codecs
import io
import os

from cudf.core.dtypes import StructDtype

# Converts the Python sink input to libcudf IO sink_info.
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & sink
) except*:
cdef vector[data_sink *] data_sinks
cdef vector[string] paths
if isinstance(src[0], io.StringIO):
data_sinks.reserve(len(src))
for s in src:
sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s)))
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], io.TextIOBase):
data_sinks.reserve(len(src))
for s in src:
# Files opened in text mode expect writes to be str rather than
# bytes, which requires conversion from utf-8. If the underlying
# buffer is utf-8, we can bypass this conversion by writing
# directly to it.
if codecs.lookup(s.encoding).name not in {"utf-8", "ascii"}:
raise NotImplementedError(f"Unsupported encoding {s.encoding}")
sink.push_back(
unique_ptr[data_sink](new iobase_data_sink(s.buffer))
)
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], io.IOBase):
data_sinks.reserve(len(src))
for s in src:
sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s)))
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], (basestring, os.PathLike)):
paths.reserve(len(src))
for s in src:
paths.push_back(<string> os.path.expanduser(s).encode())
return sink_info(move(paths))
else:
raise TypeError("Unrecognized input type: {}".format(type(src)))


cdef sink_info make_sink_info(src, unique_ptr[data_sink] & sink) except*:
cdef vector[unique_ptr[data_sink]] datasinks
cdef sink_info info = make_sinks_info([src], datasinks)
if not datasinks.empty():
sink.swap(datasinks[0])
return info


# Adapts a python io.IOBase object as a libcudf IO data_sink. This lets you
# write from cudf to any python file-like object (File/BytesIO/SocketIO etc)
cdef cppclass iobase_data_sink(data_sink):
object buf

iobase_data_sink(object buf_):
this.buf = buf_

void host_write(const void * data, size_t size) with gil:
if isinstance(buf, io.StringIO):
buf.write(PyMemoryView_FromMemory(<char*>data, size, PyBUF_READ)
.tobytes().decode())
else:
buf.write(PyMemoryView_FromMemory(<char*>data, size, PyBUF_READ))

void flush() with gil:
buf.flush()

size_t bytes_written() with gil:
return buf.tell()


cdef add_df_col_struct_names(df, child_names_dict):
for name, child_names in child_names_dict.items():
col = df._data[name]
Expand Down

0 comments on commit 63340ef

Please sign in to comment.