Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share PyArrow table data with the server through ByteBuffer #4936

Merged
merged 7 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public class ArrowToTableConverter {

private volatile boolean completed = false;

private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException {
final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo();

final ByteBuffer bb = ByteBuffer.wrap(ipcMessage);
bb.order(ByteOrder.LITTLE_ENDIAN);
final int continuation = bb.getInt();
final int metadata_size = bb.getInt();
Expand All @@ -70,7 +69,12 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip
}

@ScriptApi
public synchronized void setSchema(final byte[] ipcMessage) {
public synchronized void setSchema(final ByteBuffer ipcMessage) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think we need to add a big warning here and link to jpy-consortium/jpy#126; no strong references to this buffer can exist by the end of this method. I think we need to extend this warning into the implementation of parseSchema as well - it can't hold any references to Schema header (since that has references to the byte buffer).

// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return
// of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data
// out of
// the input ByteBuffer to use after the return of this method.
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand All @@ -82,7 +86,24 @@ public synchronized void setSchema(final byte[] ipcMessage) {
}

@ScriptApi
public synchronized void addRecordBatch(final byte[] ipcMessage) {
public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return
// of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data
// out of
// the input ByteBuffer to use after the return of this method.
for (final ByteBuffer ipcMessage : ipcMessages) {
addRecordBatch(ipcMessage);
}
}

@ScriptApi
public synchronized void addRecordBatch(final ByteBuffer ipcMessage) {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return
// of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data
// out of
// the input ByteBuffer to use after the return of this method.
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand Down Expand Up @@ -121,6 +142,9 @@ public synchronized void onCompleted() throws InterruptedException {
}

protected void parseSchema(final Schema header) {
// The Schema instance (especially originated from Python) can't be assumed to be valid after the return
// of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of
// the header to use after the return of this method.
if (resultTable != null) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported");
}
Expand All @@ -139,6 +163,9 @@ protected void parseSchema(final Schema header) {
}

protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, int numColumns) {
// The BarrageProtoUtil.MessageInfo instance (especially originated from Python) can't be assumed to be valid
// after the return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need
// to make a copy of it to use after the return of this method.
final BarrageMessage msg = new BarrageMessage();
final RecordBatch batch = (RecordBatch) mi.header.header(new RecordBatch());

Expand Down Expand Up @@ -192,7 +219,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i
return msg;
}

private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flatbuffer almost certainly retains references to ByteBuffer. We need to be very careful that nothing retains this flatbuffer object. I think we need to add a big warning in io.deephaven.extensions.barrage.util.ArrowToTableConverter#createBarrageMessage that nothing can retain reference to BarrageProtoUtil.MessageInfo mi

final BarrageProtoUtil.MessageInfo mi;
try {
mi = parseArrowIpcMessage(ipcMessage);
Expand All @@ -201,4 +228,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
}
return mi;
}


}
7 changes: 2 additions & 5 deletions py/server/deephaven/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,10 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table:
dh_schema = pa.schema(dh_fields)

try:
pa_buffer = dh_schema.serialize()
j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize()))

record_batches = pa_table.to_batches()
for rb in record_batches:
pa_buffer = rb.serialize()
j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches])
j_barrage_table_builder.onCompleted()

return Table(j_table=j_barrage_table_builder.getResultTable())
Expand Down
1 change: 1 addition & 0 deletions py/server/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from deephaven.table import Table
from tests.testbase import BaseTestCase


class ArrowTestCase(BaseTestCase):
test_table: Table

Expand Down
Loading