Skip to content

Commit

Permalink
[NativeIO] Use rust block api in file read (#377)
Browse files Browse the repository at this point in the history
* use rust block api in file read

Signed-off-by: chenxu <[email protected]>

* fix clippy issue

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Dec 21, 2023
1 parent 3fbf17d commit a0d93af
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ private ArrowReader nextBatch() throws IOException {
if (lakesoulArrowReader == null) return null;

currentVSR = lakesoulArrowReader.nextResultVectorSchemaRoot();
if (this.currentVSR == null) {
throw new IOException("nextVectorSchemaRoot not ready");
}
curRecordId = 0;
return ArrowUtils.createArrowReader(currentVSR, this.schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
curRecordIdx = -1;
} else {
close();
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,15 @@ public boolean nextBatch() throws IOException {
closeCurrentBatch();
if (nativeReader.hasNext()) {
VectorSchemaRoot nextVectorSchemaRoot = nativeReader.nextResultVectorSchemaRoot();
if (nextVectorSchemaRoot == null) {
throw new IOException("nextVectorSchemaRoot not ready");
} else {
int rowCount = nextVectorSchemaRoot.getRowCount();
if (nextVectorSchemaRoot.getSchema().getFields().isEmpty()) {
if (partitionColumnVectors == null) {
throw new IOException("NativeVectorizedReader has not been initialized");
}
columnarBatch = new ColumnarBatch(partitionColumnVectors, rowCount);
} else {
nativeColumnVector = NativeIOUtils.asArrayColumnVector(nextVectorSchemaRoot);
columnarBatch = new ColumnarBatch(nativeColumnVector, rowCount);
int rowCount = nextVectorSchemaRoot.getRowCount();
if (nextVectorSchemaRoot.getSchema().getFields().isEmpty()) {
if (partitionColumnVectors == null) {
throw new IOException("NativeVectorizedReader has not been initialized");
}
columnarBatch = new ColumnarBatch(partitionColumnVectors, rowCount);
} else {
nativeColumnVector = NativeIOUtils.asArrayColumnVector(nextVectorSchemaRoot);
columnarBatch = new ColumnarBatch(nativeColumnVector, rowCount);
}
return true;
} else {
Expand All @@ -355,7 +351,6 @@ public boolean nextBatch() throws IOException {
}

private void initializeInternal() throws IOException, UnsupportedOperationException {
recreateNativeReader();
initBatch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE of rows will be used to write a new row group
""".stripMargin)
.intConf
.createWithDefault(250000)
.createWithDefault(1000000)

val NATIVE_IO_THREAD_NUM: ConfigEntry[Int] =
buildConf("native.io.thread.num")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object GlutenUtils {

def setArrowAllocator(io: NativeIOBase): Unit = {
if (isGlutenEnabled) {
io.setExternalAllocator(getGlutenAllocator)
io.setExternalAllocator(getGlutenAllocator.newChildAllocator("gluten", 32 * 1024 * 1024, Long.MaxValue))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ public class NativeIOBase implements AutoCloseable {

protected CDataDictionaryProvider provider;

protected boolean hasExternalAllocator;

public static boolean isNativeIOLibExist() {
return JnrLoader.get() != null;
}

public NativeIOBase(String allocatorName) {
this.allocator = ArrowMemoryUtils.rootAllocator.newChildAllocator(allocatorName, 0, Long.MAX_VALUE);
this.hasExternalAllocator = false;
this.allocator = ArrowMemoryUtils.rootAllocator.newChildAllocator(allocatorName, 32 * 1024 * 1024, Long.MAX_VALUE);
this.provider = new CDataDictionaryProvider();

libLakeSoulIO = JnrLoader.get();
Expand All @@ -55,13 +52,12 @@ public NativeIOBase(String allocatorName) {
intReferenceManager = Runtime.getRuntime(libLakeSoulIO).newObjectReferenceManager();
ioConfigBuilder = libLakeSoulIO.new_lakesoul_io_config_builder();
tokioRuntimeBuilder = libLakeSoulIO.new_tokio_runtime_builder();
setBatchSize(8192);
setBatchSize(10240);
setThreadNum(2);
}

public void setExternalAllocator(BufferAllocator allocator) {
this.allocator = allocator;
this.hasExternalAllocator = true;
}

public void addFile(String file) {
Expand Down Expand Up @@ -139,8 +135,7 @@ public void close() throws Exception {
provider = null;
}
if (allocator != null) {
if (!hasExternalAllocator)
allocator.close();
allocator.close();
allocator = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO;
import jnr.ffi.Pointer;
import jnr.ffi.byref.IntByReference;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -18,7 +19,6 @@
public class NativeIOReader extends NativeIOBase implements AutoCloseable {
private Pointer reader = null;


private Schema readerSchema = null;

public NativeIOReader() {
Expand Down Expand Up @@ -123,4 +123,13 @@ public void nextBatch(BiConsumer<Integer, String> callback, long schemaAddr, lon
}
libLakeSoulIO.next_record_batch(reader, schemaAddr, arrayAddr, nativeIntegerCallback);
}

public int nextBatchBlocked(long arrayAddr) throws IOException {
IntByReference count = new IntByReference();
String err = libLakeSoulIO.next_record_batch_blocked(reader, arrayAddr, count);
if (err != null) {
throw new IOException(err);
}
return count.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public synchronized static void tryLoad() {
// so disable them
System.setProperty("arrow.enable_unsafe_memory_access", "true");
System.setProperty("arrow.enable_null_check_for_get", "false");
System.setProperty("arrow.allocation.manager.type", "Netty");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import jnr.ffi.Runtime;
import jnr.ffi.annotations.Delegate;
import jnr.ffi.annotations.LongLong;
import jnr.ffi.annotations.Out;
import jnr.ffi.byref.IntByReference;

public interface LibLakeSoulIO {

Expand Down Expand Up @@ -79,6 +81,8 @@ interface IntegerCallback { // type representing callback

void next_record_batch(Pointer reader, @LongLong long schemaAddr, @LongLong long arrayAddr, IntegerCallback callback);

String next_record_batch_blocked(Pointer reader, @LongLong long arrayAddr, @Out IntByReference count);

void write_record_batch(Pointer writer, @LongLong long schemaAddr, @LongLong long arrayAddr, BooleanCallback callback);

void free_lakesoul_reader(Pointer reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@

public class ArrowMemoryUtils {
public final static BufferAllocator rootAllocator = new RootAllocator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,108 +18,57 @@ case class LakeSoulArrowReader(reader: NativeIOReader,

var ex: Option[Throwable] = None

def next(): Option[VectorSchemaRoot] = iterator.next()
def next(): VectorSchemaRoot = iterator.next()

def hasNext: Boolean = {
val result = iterator.hasNext
result
iterator.hasNext
}

def nextResultVectorSchemaRoot(): VectorSchemaRoot = {
val result = next()
result match {
case Some(vsr) =>
vsr
case _ =>
null
}
next()
}

val iterator = new BatchIterator

class BatchIterator extends Iterator[Option[VectorSchemaRoot]] {
private var vsr: Option[VectorSchemaRoot] = _
class BatchIterator extends Iterator[VectorSchemaRoot] {
var finished = false
val provider = new CDataDictionaryProvider
val root: VectorSchemaRoot = VectorSchemaRoot.create(reader.getSchema, reader.getAllocator)

override def hasNext: Boolean = {
if (!finished) {
clean()
val p = Promise[Option[Int]]()
val consumerSchema = ArrowSchema.allocateNew(reader.getAllocator)
val consumerArray = ArrowArray.allocateNew(reader.getAllocator)
val provider = new CDataDictionaryProvider
reader.nextBatch((rowCount, err) => {
val rowCount = reader.nextBatchBlocked(consumerArray.memoryAddress());
try {
if (rowCount > 0) {
p.success(Some(rowCount))
Data.importIntoVectorSchemaRoot(reader.getAllocator, consumerArray, root, provider)
root.setRowCount(rowCount)
true
} else {
if (err == null) {
p.success(None)
finish()
} else {
p.failure(new IOException(err))
}
false
}
}, consumerSchema.memoryAddress, consumerArray.memoryAddress)
try {
Await.result(p.future, timeout milli) match {
case Some(rowCount) =>
val root: VectorSchemaRoot = {
Data.importVectorSchemaRoot(reader.getAllocator, consumerArray, consumerSchema, provider)
}
root.setRowCount(rowCount)
vsr = Some(root)
true
case _ =>
vsr = None
false
}
} catch {
case e: java.util.concurrent.TimeoutException =>
ex = Some(e)
println("[ERROR][org.apache.arrow.lakesoul.io.read.LakeSoulArrowReader] native reader fetching timeout," +
"please try a larger number with LakeSoulSQLConf.NATIVE_IO_READER_AWAIT_TIMEOUT")
throw e
case e: Throwable =>
ex = Some(e)
throw e
} finally {
provider.close()
consumerArray.close()
consumerSchema.close()
}
} else {
clean()
false
}
}

override def next(): Option[VectorSchemaRoot] = {
if (ex.isDefined) {
throw ex.get
}
vsr
override def next(): VectorSchemaRoot = {
root
}

private def finish(): Unit = {
if (!finished) {
finished = true
}
}

def clean(): Unit = {
if (vsr != null) {
vsr match {
case Some(root) =>
root.close()
vsr = None
case _ =>
}
}
}
}

override def close(): Unit = {
iterator.clean()
iterator.root.close()
iterator.provider.close()
reader.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,71 @@
package com.dmetasoul.lakesoul

import com.dmetasoul.lakesoul.lakesoul.io.{NativeIOReader, NativeIOWriter}
import org.apache.arrow.vector.types.FloatingPointPrecision
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}

import scala.collection.JavaConverters.asJavaIterableConverter

case class TestLakeSoulNativeReaderWriter() extends org.scalatest.funsuite.AnyFunSuite with org.scalatest.BeforeAndAfterAll with org.scalatest.BeforeAndAfterEach {
val projectDir: String = System.getProperty("user.dir")

test("test native reader writer with single file") {

val reader = new NativeIOReader()
val filePath = projectDir + "/native-io/lakesoul-io-java/src/test/resources/sample-parquet-files/part-00000-a9e77425-5fb4-456f-ba52-f821123bd193-c000.snappy.parquet"
reader.addFile(filePath)
reader.setThreadNum(2)
reader.setBatchSize(512)
reader.initializeReader()
for (i <- 1 to 10) {
val reader = new NativeIOReader()
// val filePath = projectDir + "/native-io/lakesoul-io-java/src/test/resources/sample-parquet-files/part-00000-a9e77425-5fb4-456f-ba52-f821123bd193-c000.snappy.parquet"
// val filePath = "/data/presto-parquet/parquet/hive_data/tpch/orders/20231207_090334_00001_d3cs8_4bc28274-73fb-49eb-a3cc-389a3a4aed94.parquet"
val filePath = "/home/chenxu/program/data/lakesoul-test-orders/part-00000-00e25436-9e3a-483e-b0d6-74bc2f60a1bd-c000.parquet"
reader.addFile(filePath)
reader.setThreadNum(2)
reader.setBatchSize(20480)
reader.setBufferSize(4)
reader.addFilter("eq(orderpriority,String('2-HIGH'))")
val schema = new Schema(Seq(
Field.nullable("orderkey", new ArrowType.Int(64, true)),
Field.nullable("custkey", new ArrowType.Int(64, true)),
Field.nullable("orderstatus", ArrowType.Utf8.INSTANCE),
Field.nullable("totalprice", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
Field.nullable("orderdate", ArrowType.Utf8.INSTANCE),
Field.nullable("orderpriority", ArrowType.Utf8.INSTANCE),
Field.nullable("clerk", ArrowType.Utf8.INSTANCE),
Field.nullable("shippriority", new ArrowType.Int(32, true)),
Field.nullable("comment", ArrowType.Utf8.INSTANCE),
).asJava)
reader.setSchema(schema)
reader.initializeReader()

val schema = reader.getSchema
// val schema = reader.getSchema

val lakesoulReader = LakeSoulArrowReader(reader)
val lakesoulReader = LakeSoulArrowReader(reader)

val writer = new NativeIOWriter(schema)
writer.addFile(System.getProperty("java.io.tmpdir") + "/" + "temp.parquet")
writer.setPrimaryKeys(java.util.Arrays.asList("email", "first_name", "last_name"))
writer.setAuxSortColumns(java.util.Arrays.asList("country"))
writer.initializeWriter()
// val writer = new NativeIOWriter(schema)
// writer.addFile(System.getProperty("java.io.tmpdir") + "/" + "temp.parquet")
// writer.setPrimaryKeys(java.util.Arrays.asList("email", "first_name", "last_name"))
// writer.setAuxSortColumns(java.util.Arrays.asList("country"))
// writer.initializeWriter()

while (lakesoulReader.hasNext) {
val batch = lakesoulReader.next()
println(batch.get.contentToTSVString())
writer.write(batch.get)
}
val t0 = System.nanoTime()
var rows: Long = 0
var batches: Long = 0
var cols: Long = 0
while (lakesoulReader.hasNext) {
val batch = lakesoulReader.next()
rows += batch.getRowCount
batches += 1
cols += batch.getFieldVectors.size()
// println(batch.get.contentToTSVString())
// writer.write(batch.get)
}
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
println(rows, batches, cols)

writer.flush()
writer.close()
// writer.flush()
// writer.close()

lakesoulReader.close()
lakesoulReader.close()
}
}

}
Loading

0 comments on commit a0d93af

Please sign in to comment.