Skip to content

Commit

Permalink
buffers refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Dermody committed Jul 9, 2024
1 parent 48eec47 commit eaf9c66
Show file tree
Hide file tree
Showing 38 changed files with 887 additions and 374 deletions.
2 changes: 1 addition & 1 deletion BrightData.Cuda/CudaTensorSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public unsafe void CopyTo(float* destination, int offset, int stride, int count)
throw new NotImplementedException();
}

public IHaveReadOnlyContiguousSpan<float>? Contiguous => null;
public IHaveReadOnlyContiguousMemory<float>? Contiguous => null;

public void Clear()
{
Expand Down
390 changes: 277 additions & 113 deletions BrightData/BrightData.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
using System;
using System.Buffers;

namespace BrightData.Buffer.Composite
namespace BrightData.Buffer
{
/// <summary>
/// Adapts composite buffers to buffer writers
/// Adapts appendable buffers to buffer writers
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="buffer"></param>
/// <param name="defaultBufferSize"></param>
internal class CompositeBufferWriter<T>(IAppendBlocks<T> buffer, int defaultBufferSize = 256) : IBufferWriter<T>
internal class BlockBufferWriter<T>(IAppendBlocks<T> buffer, int defaultBufferSize = 256) : IBufferWriter<T>
where T : notnull
{
int _pos = 0;
Expand Down
2 changes: 1 addition & 1 deletion BrightData/Buffer/Composite/CompositeBufferBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace BrightData.Buffer.Composite
/// <typeparam name="BT"></typeparam>
internal abstract class CompositeBufferBase<T, BT> : TypedBufferBase<T>, ICompositeBuffer<T>
where T : notnull
where BT : ICompositeBufferBlock<T>
where BT : IMutableBufferBlock<T>
{
protected readonly int _blockSize;
protected readonly uint? _maxInMemoryBlocks, _maxDistinctItems;
Expand Down
55 changes: 8 additions & 47 deletions BrightData/Buffer/Composite/ManagedCompositeBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Buffers;
using System.Buffers.Binary;
using System.Threading.Tasks;
using BrightData.Buffer.MutableBlocks;
using CommunityToolkit.HighPerformance.Buffers;

namespace BrightData.Buffer.Composite
Expand All @@ -22,55 +23,15 @@ internal class ManagedCompositeBuffer<T>(
uint? maxInMemoryBlocks = null,
uint? maxDistinctItems = null
)
: CompositeBufferBase<T, ManagedCompositeBuffer<T>.Block>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
: CompositeBufferBase<T, MutableManagedBufferBlock<T>>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
where T : IHaveDataAsReadOnlyByteSpan
{
public const int HeaderSize = 8;
internal record Block(T[] Data) : ICompositeBufferBlock<T>
{
public Block(T[] data, bool existing) : this(data)
{
if (existing)
Size = (uint)data.Length;
}

public uint Size { get; private set; }
public ref T GetNext() => ref Data[Size++];
public bool HasFreeCapacity => Size < Data.Length;
public ReadOnlySpan<T> WrittenSpan => new(Data, 0, (int)Size);
public ReadOnlyMemory<T> WrittenMemory => new(Data, 0, (int)Size);

public async Task<uint> WriteTo(IByteBlockSource file)
{
var offset = file.Size;
using var writer = new ArrayPoolBufferWriter<byte>();
var memoryOwner = (IMemoryOwner<byte>)writer;
writer.Advance(HeaderSize);
uint size = 0;
for (uint i = 0; i < Size; i++)
size += WriteBlock(Data[i].DataAsBytes, writer);
BinaryPrimitives.WriteUInt32LittleEndian(memoryOwner.Memory.Span, Size);
BinaryPrimitives.WriteUInt32LittleEndian(memoryOwner.Memory.Span[4..], size);
await file.WriteAsync(writer.WrittenMemory, offset);
return size + HeaderSize;
}

static uint WriteBlock(ReadOnlySpan<byte> itemData, ArrayPoolBufferWriter<byte> writer)
{
var span = writer.GetSpan(itemData.Length + 4);
BinaryPrimitives.WriteUInt32LittleEndian(span, (uint)itemData.Length);
itemData.CopyTo(span[4..]);
writer.Advance(itemData.Length + 4);
return (uint)itemData.Length + 4;
}
}

protected override async Task<uint> SkipFileBlock(IByteBlockSource file, uint offset)
{
var lengthBytes = new byte[HeaderSize];
var lengthBytes = new byte[MutableManagedBufferBlock<T>.HeaderSize];
await file.ReadAsync(lengthBytes, offset);
var blockSize = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes);
return blockSize + HeaderSize;
return blockSize + MutableManagedBufferBlock<T>.HeaderSize;
}

protected override async Task<(uint, ReadOnlyMemory<T>)> GetBlockFromFile(IByteBlockSource file, uint offset)
Expand All @@ -80,7 +41,7 @@ protected override async Task<uint> SkipFileBlock(IByteBlockSource file, uint of
{
var buffer2 = new Memory<T>(new T[_blockSize]);
Copy(buffer.Span, buffer2.Span);
return (blockSize + HeaderSize, buffer2);
return (blockSize + MutableManagedBufferBlock<T>.HeaderSize, buffer2);
}
finally
{
Expand All @@ -96,7 +57,7 @@ protected override async Task<uint> GetBlockFromFile(IByteBlockSource file, uint
using var buffer2 = MemoryOwner<T>.Allocate(_blockSize);
Copy(buffer.Span, buffer2.Span);
callback(buffer2.Span);
return blockSize + HeaderSize;
return blockSize + MutableManagedBufferBlock<T>.HeaderSize;
}
finally
{
Expand All @@ -117,10 +78,10 @@ void Copy(ReadOnlySpan<byte> inputSpan, Span<T> outputSpan)

static async Task<(uint BlockSize, MemoryOwner<byte> Buffer)> ReadBuffer(IByteBlockSource file, uint offset)
{
var lengthBytes = new byte[HeaderSize];
var lengthBytes = new byte[MutableManagedBufferBlock<T>.HeaderSize];
await file.ReadAsync(lengthBytes, offset);
var blockSize = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes);
offset += HeaderSize;
offset += MutableManagedBufferBlock<T>.HeaderSize;
var buffer = MemoryOwner<byte>.Allocate((int)blockSize);
await file.ReadAsync(buffer.Memory, offset);
return (blockSize, buffer);
Expand Down
95 changes: 9 additions & 86 deletions BrightData/Buffer/Composite/StringCompositeBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Buffers.Binary;
using System.Text;
using System.Threading.Tasks;
using BrightData.Buffer.MutableBlocks;
using CommunityToolkit.HighPerformance.Buffers;

namespace BrightData.Buffer.Composite
Expand All @@ -19,43 +20,8 @@ internal class StringCompositeBuffer(
uint? maxInMemoryBlocks = null,
uint? maxDistinctItems = null
)
: CompositeBufferBase<string, StringCompositeBuffer.Block>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
: CompositeBufferBase<string, MutableStringBufferBlock>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
{
internal const int HeaderSize = 8;
internal record Block(string[] Data) : ICompositeBufferBlock<string>
{
public Block(string[] data, bool existing) : this(data)
{
if (existing)
Size = (uint)data.Length;
}

public uint Size { get; private set; }
public ref string GetNext() => ref Data[Size++];
public bool HasFreeCapacity => Size < Data.Length;
public ReadOnlySpan<string> WrittenSpan => new(Data, 0, (int)Size);
public ReadOnlyMemory<string> WrittenMemory => new(Data, 0, (int)Size);

public async Task<uint> WriteTo(IByteBlockSource file)
{
var offset = file.Size;
var startOffset = offset += HeaderSize;
for (uint i = 0; i < Size; i++)
{
Encode(Data[i], bytes =>
{
file.Write(bytes, offset);
offset += (uint)bytes.Length;
});
}
var blockSize = offset - startOffset;
Memory<byte> lengthBytes = new byte[8];
BinaryPrimitives.WriteUInt32LittleEndian(lengthBytes.Span, blockSize);
BinaryPrimitives.WriteUInt32LittleEndian(lengthBytes.Span[4..], Size);
await file.WriteAsync(lengthBytes, startOffset - HeaderSize);
return blockSize + HeaderSize;
}
}

public override Task<ReadOnlyMemory<string>> GetTypedBlock(uint blockIndex)
{
Expand All @@ -75,7 +41,7 @@ protected override async Task<uint> SkipFileBlock(IByteBlockSource file, uint of
{
var lengthBytes = new byte[4];
await file.ReadAsync(lengthBytes, offset);
return HeaderSize + BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes);
return MutableStringBufferBlock.HeaderSize + BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes);
}

protected override async Task<(uint, ReadOnlyMemory<string>)> GetBlockFromFile(IByteBlockSource file, uint offset)
Expand All @@ -85,12 +51,12 @@ protected override async Task<uint> SkipFileBlock(IByteBlockSource file, uint of
{
var index = 0;
var buffer = new Memory<string>(new string[(int)numStrings]);
Decode(block.Span, chars =>
MutableStringBufferBlock.Decode(block.Span, chars =>
{
// ReSharper disable once AccessToDisposedClosure
buffer.Span[index++] = new string(chars);
});
return ((uint)block.Length + HeaderSize, buffer);
return ((uint)block.Length + MutableStringBufferBlock.HeaderSize, buffer);
}
finally
{
Expand All @@ -105,13 +71,13 @@ protected override async Task<uint> GetBlockFromFile(IByteBlockSource file, uint
{
var index = 0;
using var buffer = MemoryOwner<string>.Allocate((int)numStrings);
Decode(block.Span, chars =>
MutableStringBufferBlock.Decode(block.Span, chars =>
{
// ReSharper disable once AccessToDisposedClosure
buffer.Span[index++] = new string(chars);
});
callback(buffer.Span);
return (uint)block.Length + HeaderSize;
return (uint)block.Length + MutableStringBufferBlock.HeaderSize;
}
finally
{
Expand All @@ -121,9 +87,9 @@ protected override async Task<uint> GetBlockFromFile(IByteBlockSource file, uint

static async Task<(uint NumStrings, MemoryOwner<byte> Block)> ReadBlock(IByteBlockSource file, uint offset)
{
var lengthBytes = new byte[HeaderSize];
var lengthBytes = new byte[MutableStringBufferBlock.HeaderSize];
await file.ReadAsync(lengthBytes, offset);
offset += HeaderSize;
offset += MutableStringBufferBlock.HeaderSize;
var blockSize = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes);
var numStrings = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes.AsSpan(4));
var block = MemoryOwner<byte>.Allocate((int)blockSize);
Expand All @@ -135,48 +101,5 @@ protected override async Task<uint> GetBlockFromFile(IByteBlockSource file, uint

return (numStrings, block);
}

public static void Encode(string str, BlockCallback<byte> callback)
{
if (str.Length <= 124 / 3)
{
Span<byte> buffer = stackalloc byte[128];
var actualByteCount = Encoding.UTF8.GetBytes(str, buffer[4..]);
BinaryPrimitives.WriteUInt16LittleEndian(buffer, (ushort)str.Length);
BinaryPrimitives.WriteUInt16LittleEndian(buffer[2..], (ushort)actualByteCount);
callback(buffer[..(actualByteCount + 4)]);
}
else
{
using var buffer = SpanOwner<byte>.Allocate(str.Length * 3 + 2);
var actualByteCount = Encoding.UTF8.GetBytes(str, buffer.Span[4..]);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.Span, (ushort)str.Length);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.Span[2..], (ushort)actualByteCount);
callback(buffer.Span[..(actualByteCount + 4)]);
}
}

public static void Decode(ReadOnlySpan<byte> data, BlockCallback<char> callback)
{
Span<char> localBuffer = stackalloc char[128];
do
{
var charSize = BinaryPrimitives.ReadUInt16LittleEndian(data);
var byteSize = BinaryPrimitives.ReadUInt16LittleEndian(data[2..]);
data = data[4..];
if (charSize <= 128)
{
Encoding.UTF8.GetChars(data[..byteSize], localBuffer[..charSize]);
callback(localBuffer[..charSize]);
}
else
{
using var buffer = SpanOwner<char>.Allocate(charSize);
Encoding.UTF8.GetChars(data[..byteSize], buffer.Span);
callback(buffer.Span);
}
data = data[byteSize..];
} while (data.Length > 0);
}
}
}
31 changes: 2 additions & 29 deletions BrightData/Buffer/Composite/UnmanagedCompositeBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using BrightData.Buffer.MutableBlocks;
using CommunityToolkit.HighPerformance;
using CommunityToolkit.HighPerformance.Buffers;

Expand All @@ -15,37 +16,9 @@ internal class UnmanagedCompositeBuffer<T>(
int blockSize = Consts.DefaultBlockSize,
uint? maxInMemoryBlocks = null,
uint? maxDistinctItems = null)
: CompositeBufferBase<T, UnmanagedCompositeBuffer<T>.Block>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
: CompositeBufferBase<T, MutableUnmanagedBufferBlock<T>>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems)
where T : unmanaged
{
internal record Block(T[] Data) : ICompositeBufferBlock<T>
{
public Block(T[] data, bool existing) : this(data)
{
if (existing)
Size = (uint)data.Length;
}

public uint Size { get; private set; }
public ref T GetNext() => ref Data[Size++];
public bool HasFreeCapacity => Size < Data.Length;
public uint AvailableCapacity => (uint)Data.Length - Size;
public ReadOnlySpan<T> WrittenSpan => new(Data, 0, (int)Size);
public ReadOnlyMemory<T> WrittenMemory => new(Data, 0, (int)Size);

public async Task<uint> WriteTo(IByteBlockSource file)
{
var bytes = WrittenMemory.Cast<T, byte>();
await file.WriteAsync(bytes, file.Size);
return (uint)bytes.Length;
}

public void Write(ReadOnlySpan<T> data)
{
data.CopyTo(Data.AsSpan((int)Size, (int)AvailableCapacity));
Size += (uint)data.Length;
}
}
readonly int _sizeOfT = Unsafe.SizeOf<T>();

public override async Task<ReadOnlyMemory<T>> GetTypedBlock(uint blockIndex)
Expand Down
Loading

0 comments on commit eaf9c66

Please sign in to comment.