Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zero-copy serialization APIs. #357

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
52 changes: 51 additions & 1 deletion buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "C"
import (
"bytes"
"fmt"
"io"
"math"
"runtime"
"unsafe"
Expand Down Expand Up @@ -94,6 +95,8 @@ func (b *Buffer) Type() (Datatype, error) {
}

// Serialize returns a copy of the bytes in the buffer.
//
// Deprecated: Use WriteTo instead for increased performance.
func (b *Buffer) Serialize(serializationType SerializationType) ([]byte, error) {
bs, err := b.dataCopy()
if err != nil {
Expand All @@ -103,14 +106,61 @@ func (b *Buffer) Serialize(serializationType SerializationType) ([]byte, error)
case TILEDB_CAPNP:
// The entire byte array contains Cap'nP data. Don't bother it.
case TILEDB_JSON:
// The data is a null-terminated string. Strip off the terminator.
// The data might be a null-terminated string. Strip off the terminator.
bs = bytes.TrimSuffix(bs, []byte{0})
default:
return nil, fmt.Errorf("unsupported serialization type: %v", serializationType)
}
return bs, nil
}

// WriteTo writes the contents of a Buffer to an io.Writer.
func (b *Buffer) WriteTo(w io.Writer) (int64, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t

ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}

if cbuffer == nil || csize == 0 {
return 0, nil
}

remaining := int64(csize)

// Because io.Writer supports writing up to 2GB of data at a time, we have to use a loop
// for the bigger buffers.
for remaining > 0 {
// TODO: Use min on Go 1.21+
var writeSize int32
if remaining > math.MaxInt32 {
writeSize = math.MaxInt32
} else {
writeSize = int32(remaining)
}

// Construct a slice from the buffer's data without copying it.
// Keep the buffer alive during the write, to prevent the GC from
// collecting the memory while it's being used.
n, err := w.Write(unsafe.Slice((*byte)(cbuffer), writeSize))
runtime.KeepAlive(b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since b is the object that implements this method the keepalive isn't needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. I was being extra cautious, but if we call b.WriteTo(w), it becomes clear that b is being kept somewhere reachable.


cbuffer = unsafe.Pointer(uintptr(cbuffer) + uintptr(n))
remaining -= int64(n)

if err != nil {
return int64(csize) - remaining, fmt.Errorf("error writing buffer to writer: %w", err)
}
}

return int64(csize), nil
}

// Static assert that Buffer implements io.WriterTo.
var _ io.WriterTo = (*Buffer)(nil)

// SetBuffer sets the buffer to point at the given Go slice. The memory is now
// Go-managed.
func (b *Buffer) SetBuffer(buffer []byte) error {
Expand Down
33 changes: 33 additions & 0 deletions buffer_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package tiledb
import "C"
import (
"fmt"
"io"
)

// BufferList A list of TileDB BufferList objects
Expand Down Expand Up @@ -44,6 +45,36 @@ func (b *BufferList) Context() *Context {
return b.context
}

// WriteTo writes the contents of a BufferList to an io.Writer.
func (b *BufferList) WriteTo(w io.Writer) (n int64, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were the named return values intentional? Feel free to disregard, just pointing it out since I don't see these names being directly used in the function body.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must have been from a previous iteration of the code. Removed.

nbuffs, err := b.NumBuffers()
if err != nil {
return 0, err
}

written := int64(0)

for i := uint(0); i < uint(nbuffs); i++ {
buff, err := b.GetBuffer(i)
if err != nil {
return 0, err
}
n, err := buff.WriteTo(w)
written += n

buff.Free()

if err != nil {
return written, err
}
}

return written, nil
}

// Static assert that BufferList implements io.WriterTo.
var _ io.WriterTo = (*BufferList)(nil)

// NumBuffers returns number of buffers in the list.
func (b *BufferList) NumBuffers() (uint64, error) {
var numBuffers C.uint64_t
Expand Down Expand Up @@ -82,6 +113,8 @@ func (b *BufferList) TotalSize() (uint64, error) {
}

// Flatten copies and concatenates all buffers in the list into a new buffer.
//
// Deprecated: Use WriteTo instead for increased performance.
func (b *BufferList) Flatten() (*Buffer, error) {
buffer := Buffer{context: b.context}
freeOnGC(&buffer)
Expand Down
27 changes: 27 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,30 @@ func TestBufferSafety(t *testing.T) {
t.Log("post gc 2")
verify()
}

type ByteCounter struct {
BytesWritten int64
}

func (b *ByteCounter) Write(x []byte) (int, error) {
b.BytesWritten = b.BytesWritten + int64(len(x))
return len(x), nil
}

func TestWriteTo(t *testing.T) {
context, err := NewContext(nil)
require.NoError(t, err)
buffer, err := NewBuffer(context)
require.NoError(t, err)

testSizes := [5]int{0, 16, 256, 65536, 268435456}
for _, size := range testSizes {
err := buffer.SetBuffer(make([]byte, size))
require.NoError(t, err)

counter := &ByteCounter{BytesWritten: 0}
n, err := buffer.WriteTo(counter)
require.NoError(t, err)
assert.Equal(t, size, int(n))
}
}
Loading