Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
traversc committed Aug 30, 2024
1 parent db50a1f commit 728e681
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
42 changes: 39 additions & 3 deletions src/io/multithreaded_block_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@
#include <tbb/enumerable_thread_specific.h>
#include <atomic>

// single argument macro
#define _SA_(...) __VA_ARGS__

#if __cplusplus >= 201703L
#define SUPPORTS_IF_CONSTEXPR 1
#define DIRECT_MEM_SWITCH(if_true, if_false) \
if constexpr (direct_mem) { \
if_true; \
} else { \
if_false; \
}
#else
#define SUPPORTS_IF_CONSTEXPR 0
#define DIRECT_MEM_SWITCH(if_true, if_false) if_true;
#endif

struct OrderedBlock {
std::shared_ptr<char[]> block;
uint32_t blocksize;
Expand All @@ -29,7 +45,7 @@ struct OrderedPtr {
// sequencer requires copy constructor for message, which means using shared_ptr
// would be better if we could use unique_ptr
// nthreads must be >= 2
template <class stream_writer, class compressor, class hasher, ErrorType E>
template <class stream_writer, class compressor, class hasher, ErrorType E, bool direct_mem>
struct BlockCompressWriterMT {
// template class objects
stream_writer & myFile;
Expand Down Expand Up @@ -123,7 +139,14 @@ struct BlockCompressWriterMT {
{
// connect computation graph
tbb::flow::make_edge(compressor_node, sequencer_node);
tbb::flow::make_edge(compressor_node_direct, sequencer_node);
DIRECT_MEM_SWITCH(
_SA_(
tbb::flow::make_edge(compressor_node_direct, sequencer_node);
),
_SA_(
// compressor_node_direct not connected
)
)
tbb::flow::make_edge(sequencer_node, writer_node);
}
private:
Expand Down Expand Up @@ -182,7 +205,20 @@ struct BlockCompressWriterMT {
// True bc either inbuffer was fully consumed already (no more data to push) or block was filled and flushed (sets current_blocksize to zero)
if(current_blocksize >= MAX_BLOCKSIZE) { flush(); }
while(len - current_pointer_consumed >= MAX_BLOCKSIZE) {
compressor_node_direct.try_put(OrderedPtr(inbuffer + current_pointer_consumed, current_blocknumber));
DIRECT_MEM_SWITCH(
_SA_(
compressor_node_direct.try_put(OrderedPtr(inbuffer + current_pointer_consumed, current_blocknumber));
),
_SA_(
// Different from ST, memcpy segment of inbuffer to block and then send to compress_node
std::shared_ptr<char[]> full_block;
if(!available_blocks.try_pop(full_block)) {
full_block = MAKE_SHARED_BLOCK_ASSIGNMENT(MAX_BLOCKSIZE);
}
std::memcpy(full_block.get(), inbuffer + current_pointer_consumed, MAX_BLOCKSIZE);
compressor_node.try_put(OrderedBlock(full_block, MAX_BLOCKSIZE, current_blocknumber));
)
)
current_blocknumber++;
// current_blocksize = 0; // If we are in this loop, current_blocksize is already zero
current_pointer_consumed += MAX_BLOCKSIZE;
Expand Down
3 changes: 1 addition & 2 deletions vignettes/vignette.rmd
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ cat('
`qs2` is the successor to the `qs` package. The goal is to have relaible and fast performance for saving and loading objects in R.

The `qs2` format directly uses R serialization (via the `R_Serialize`/`R_Unserialize` C API) while improving compression and writing/reading to disk.
If you are familiar with the `qs` package, the benefits and usage are the same. Compared to `saveRDS` it can be an order of magnitude faster
while having similar levels of compression.
If you are familiar with the `qs` package, the benefits and usage are the same.

```{r eval=FALSE}
qs_save(data, "myfile.qs2")
Expand Down

0 comments on commit 728e681

Please sign in to comment.