Skip to content

Commit

Permalink
add support for TBB input_node
Browse files Browse the repository at this point in the history
  • Loading branch information
traversc committed Sep 18, 2024
1 parent 177844e commit df9e1cd
Showing 1 changed file with 59 additions and 29 deletions.
88 changes: 59 additions & 29 deletions src/io/multithreaded_block_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
#include <tbb/enumerable_thread_specific.h>
#include <atomic>

// check if TBB_INTERFACE_VERSION variable exists and is >= 11102
// source_node is deprecated in later versions of TBB, but does not exist in the default RcppParallel build
#if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 11102
#define USE_INPUT_NODE 1
#else
#define USE_INPUT_NODE 0 // source_node
#endif

// single argument macro
#define _SA_(...) __VA_ARGS__

Expand Down Expand Up @@ -277,7 +285,11 @@ struct BlockCompressReaderMT {
// flow graph
tbb::task_group_context tgc;
tbb::flow::graph myGraph;
tbb::flow::source_node<OrderedBlock> reader_node;
#if USE_INPUT_NODE == 1
tbb::flow::input_node<OrderedBlock> reader_node;
#else // source_node
tbb::flow::source_node<OrderedBlock> reader_node;
#endif
tbb::flow::function_node<OrderedBlock, OrderedBlock> decompressor_node;
tbb::flow::sequencer_node<OrderedBlock> sequencer_node;

Expand All @@ -303,34 +315,52 @@ struct BlockCompressReaderMT {
// flow graph
tgc(),
myGraph(this->tgc),
reader_node(this->myGraph,
[this](OrderedBlock & zblock) {
// read size of next zblock. if insufficient bytes read into uint32_t, end of file
uint32_t zsize;
bool ok = this->myFile.readInteger(zsize);
if(!ok) {
end_of_file.store(true);
return false;
}

// get zblock from available_zblocks or make new
if(!available_zblocks.try_pop(zblock.block)) {
zblock.block = MAKE_SHARED_BLOCK_ASSIGNMENT(MAX_ZBLOCKSIZE);
}

// read zblock. if bytes_read is less than zsize, end of file
uint32_t bytes_read = this->myFile.read(zblock.block.get(), zsize & (~BLOCK_METADATA));
if(bytes_read != (zsize & (~BLOCK_METADATA))) {
end_of_file.store(true);
return false;
}

// set zblock size and block number and push to ordered_zblocks queue
// Also incremenet blocks_to_process BEFORE zblock is added to ordered_zblocks
zblock.blocksize = zsize;
zblock.blocknumber = blocks_to_process.fetch_add(1);
return true;
}, false),
#if USE_INPUT_NODE == 1
reader_node(this->myGraph,
[this](tbb::flow_control & fc) {
OrderedBlock zblock;
uint32_t zsize;
bool ok = this->myFile.readInteger(zsize);
if(!ok) {
end_of_file.store(true);
fc.stop();
return zblock;
}
if(!available_zblocks.try_pop(zblock.block)) {
zblock.block = MAKE_SHARED_BLOCK_ASSIGNMENT(MAX_ZBLOCKSIZE);
}
uint32_t bytes_read = this->myFile.read(zblock.block.get(), zsize & (~BLOCK_METADATA));
if(bytes_read != (zsize & (~BLOCK_METADATA))) {
end_of_file.store(true);
fc.stop();
return zblock;
}
zblock.blocksize = zsize;
zblock.blocknumber = blocks_to_process.fetch_add(1);
return zblock;
}),
#else
reader_node(this->myGraph,
[this](OrderedBlock & zblock) {
uint32_t zsize;
bool ok = this->myFile.readInteger(zsize);
if(!ok) {
end_of_file.store(true);
return false;
}
if(!available_zblocks.try_pop(zblock.block)) {
zblock.block = MAKE_SHARED_BLOCK_ASSIGNMENT(MAX_ZBLOCKSIZE);
}
uint32_t bytes_read = this->myFile.read(zblock.block.get(), zsize & (~BLOCK_METADATA));
if(bytes_read != (zsize & (~BLOCK_METADATA))) {
end_of_file.store(true);
return false;
}
zblock.blocksize = zsize;
zblock.blocknumber = blocks_to_process.fetch_add(1);
return true;
}, false),
#endif
decompressor_node(this->myGraph, tbb::flow::unlimited,
[this](OrderedBlock zblock) {
typename tbb::enumerable_thread_specific<decompressor>::reference dp_local = dp.local();
Expand Down

0 comments on commit df9e1cd

Please sign in to comment.