diff --git a/BrightData.UnitTests/BufferTests.cs b/BrightData.UnitTests/BufferTests.cs index ecec062f..f298c4ba 100644 --- a/BrightData.UnitTests/BufferTests.cs +++ b/BrightData.UnitTests/BufferTests.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Threading.Tasks; using BrightData.Types; using BrightData.UnitTests.Helper; @@ -47,7 +48,7 @@ public async Task ReadAsync(Memory data1, uint offset) return (uint)ret; } } - public class InMemoryStreamProvider : IProvideDataBlocks + public class InMemoryStreamProvider : IProvideByteBlocks { readonly Dictionary _data = []; @@ -89,20 +90,47 @@ public override string ToString() } } - struct TestStruct + [Fact] + public void TestIncrementingBlockSizes() { - public int Property { get; set; } + var uintBuffer = _streamProvider.CreateCompositeBuffer(blockSize: 2, maxBlockSize: 32, maxInMemoryBlocks:128); + for (var i = 0U; i < 1024; i++) + uintBuffer.Append(i); + var blockSizes = uintBuffer.BlockSizes; + blockSizes.Sum(x => x).Should().Be(1024); + blockSizes[0].Should().Be(2); + blockSizes[1].Should().Be(4); + blockSizes[2].Should().Be(8); + blockSizes[3].Should().Be(16); + blockSizes[4].Should().Be(32); + blockSizes[5].Should().Be(32); + } + + [Fact] + public void TestDistinctCount() + { + var uintBuffer = _streamProvider.CreateCompositeBuffer(32, 32, 32, 32); + for (var i = 0U; i < 8; i++) + uintBuffer.Append(i); + uintBuffer.DistinctItems.Should().Be(8); + + for (var i = 8U; i < 32; i++) + uintBuffer.Append(i); + uintBuffer.DistinctItems.Should().Be(32); + + uintBuffer.Append(32U); + uintBuffer.DistinctItems.Should().BeNull(); } [Fact] public void VectorBuffer() { var data = new[] { - new TestClass(new byte[] { 1, 2, 3 }), - new TestClass(new byte[] { 4, 5, 6 }), - new TestClass(new byte[] { 7, 8, 9 }) + new TestClass([1, 2, 3]), + new TestClass([4, 5, 6]), + new TestClass([7, 8, 9]) }; - var vectorBuffer = _streamProvider.CreateCompositeBuffer(x => new(x), 2, 0); + var vectorBuffer = _streamProvider.CreateCompositeBuffer(x => new(x), blockSize: 2, maxBlockSize: 2, maxInMemoryBlocks:0); vectorBuffer.Append(data); var index = 0; vectorBuffer.ForEachBlock(x => { @@ -121,7 +149,7 @@ public async Task StringBuffer() "this is a final test", "this is a test", }; - var stringBuffer = _streamProvider.CreateCompositeBuffer(2, 0, 128); + var stringBuffer = _streamProvider.CreateCompositeBuffer(blockSize: 2, maxBlockSize: 2, maxInMemoryBlocks:0, 128); stringBuffer.Append(data); var index = 0; for (uint i = 0, len = (uint)stringBuffer.BlockSizes.Length; i < len; i++) { @@ -142,7 +170,7 @@ public async Task StringBuffer() [Fact] public async Task IntBuffer() { - var intBuffer = _streamProvider.CreateCompositeBuffer(2, 0); + var intBuffer = _streamProvider.CreateCompositeBuffer(blockSize: 2, maxBlockSize: 2, maxInMemoryBlocks:0); intBuffer.Append(1); intBuffer.Append(new ReadOnlySpan([2, 3])); var index = 0; @@ -175,10 +203,10 @@ await intBuffer.ForEachBlock(block => { /// /// Buffer size configurations to test /// - public static readonly (int numItems, int bufferSize, int inMemoryReadSize, int numDistinct)[] Configurations = [ - (32768, 1024, 256, 4), - (32768, 32768, 1024, 1024), - (32768, 128, 32768, 32768) + public static readonly (int numItems, int bufferSize, int maxBufferSize, int inMemoryReadSize, int numDistinct)[] Configurations = [ + (32768, 1024, 32768, 256, 4), + (32768, 32768, 32768, 1024, 1024), + (32768, 128, 1024, 32768, 32768) ]; [Fact] @@ -208,25 +236,25 @@ public Task WeightedIndexListBuffer2() [Fact] public async Task StringBuffer2() { - foreach (var (numItems, bufferSize, inMemoryReadSize, numDistinct) in Configurations) - await StringBufferReadWriteTest((uint)numItems, bufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, i => i.ToString()); + foreach (var (numItems, bufferSize, maxBufferSize, inMemoryReadSize, numDistinct) in Configurations) + await StringBufferReadWriteTest((uint)numItems, bufferSize, maxBufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, i => i.ToString()); } async Task ObjectTests(Func indexTranslator, CreateFromReadOnlyByteSpan createItem) where T : IHaveDataAsReadOnlyByteSpan { - foreach (var (numItems, bufferSize, inMemoryReadSize, numDistinct) in Configurations) - await ObjectBufferReadWriteTest((uint)numItems, bufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, indexTranslator, createItem); + foreach (var (numItems, bufferSize, maxBufferSize, inMemoryReadSize, numDistinct) in Configurations) + await ObjectBufferReadWriteTest((uint)numItems, bufferSize, maxBufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, indexTranslator, createItem); } async Task StructTests(Func indexTranslator) where T : unmanaged { - foreach (var (numItems, bufferSize, inMemoryReadSize, numDistinct) in Configurations) - await StructBufferReadWriteTest((uint)numItems, bufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, indexTranslator); + foreach (var (numItems, bufferSize, maxBufferSize, inMemoryReadSize, numDistinct) in Configurations) + await StructBufferReadWriteTest((uint)numItems, bufferSize, maxBufferSize, (uint)inMemoryReadSize, (ushort)numDistinct, indexTranslator); } - async Task StringBufferReadWriteTest(uint numItems, int bufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator) + async Task StringBufferReadWriteTest(uint numItems, int bufferSize, int maxBufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator) { - var buffer = _streamProvider.CreateCompositeBuffer(bufferSize, inMemorySize, numDistinct); + var buffer = _streamProvider.CreateCompositeBuffer(bufferSize, maxBufferSize, inMemorySize, numDistinct); for (uint i = 0; i < numItems; i++) buffer.Append(indexTranslator(i)); @@ -240,10 +268,10 @@ async Task StringBufferReadWriteTest(uint numItems, int bufferSize, uint inMemor item.Should().Be(indexTranslator(index++)); } - async Task ObjectBufferReadWriteTest(uint numItems, int bufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator, CreateFromReadOnlyByteSpan createItem) + async Task ObjectBufferReadWriteTest(uint numItems, int bufferSize, int maxBufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator, CreateFromReadOnlyByteSpan createItem) where T : IHaveDataAsReadOnlyByteSpan { - var buffer = _streamProvider.CreateCompositeBuffer(createItem, bufferSize, inMemorySize, numDistinct); + var buffer = _streamProvider.CreateCompositeBuffer(createItem, bufferSize, maxBufferSize, inMemorySize, numDistinct); for (uint i = 0; i < numItems; i++) buffer.Append(indexTranslator(i)); @@ -259,9 +287,9 @@ async Task ObjectBufferReadWriteTest(uint numItems, int bufferSize, uint inMe } } - async Task StructBufferReadWriteTest(uint numItems, int bufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator) where T : unmanaged + async Task StructBufferReadWriteTest(uint numItems, int bufferSize, int maxBufferSize, uint inMemorySize, ushort numDistinct, Func indexTranslator) where T : unmanaged { - var buffer = _streamProvider.CreateCompositeBuffer(bufferSize, inMemorySize, numDistinct); + var buffer = _streamProvider.CreateCompositeBuffer(bufferSize, maxBufferSize, inMemorySize, numDistinct); for (uint i = 0; i < numItems; i++) buffer.Append(indexTranslator(i)); diff --git a/BrightData/BrightData.xml b/BrightData/BrightData.xml index 4cf5e2b2..0f8b6f53 100644 --- a/BrightData/BrightData.xml +++ b/BrightData/BrightData.xml @@ -12669,13 +12669,12 @@ Distance metric - + Creates a vector set that uses a KNN search provider - diff --git a/BrightData/BrightDataContext.cs b/BrightData/BrightDataContext.cs index 181f539f..72de5afe 100644 --- a/BrightData/BrightDataContext.cs +++ b/BrightData/BrightDataContext.cs @@ -71,7 +71,7 @@ public LinearAlgebraProvider LinearAlgebraProvider /// Creates a new temp stream provider /// /// - public IProvideDataBlocks CreateTempDataBlockProvider() => new TempFileProvider(Get(Consts.BaseTempPath)); + public IProvideByteBlocks CreateTempDataBlockProvider() => new TempFileProvider(Get(Consts.BaseTempPath)); /// /// Returns a typed property from the context diff --git a/BrightData/Buffer/Composite/CompositeBufferBase.cs b/BrightData/Buffer/Composite/CompositeBufferBase.cs index fd9bf0df..03fe9fe3 100644 --- a/BrightData/Buffer/Composite/CompositeBufferBase.cs +++ b/BrightData/Buffer/Composite/CompositeBufferBase.cs @@ -19,28 +19,40 @@ internal abstract class CompositeBufferBase : TypedBufferBase, ICompos where T : notnull where BT : IMutableBufferBlock { - protected readonly int _blockSize; + protected delegate BT NewBlockFactory(Memory block); + protected delegate BT ExistingBlockFactory(ReadOnlyMemory block); + + int _blockSize; + readonly int _maxBlockSize; protected readonly uint? _maxInMemoryBlocks, _maxDistinctItems; - readonly Func _blockFactory; - IProvideDataBlocks? _dataBlockProvider; + readonly NewBlockFactory _newBlockFactory; + readonly ExistingBlockFactory _existingBlockFactory; + protected List? _fileBlockSizes = null; + IProvideByteBlocks? _dataBlockProvider; protected IByteBlockSource? _currentDataBlock; protected List? _inMemoryBlocks; protected BT? _currBlock; protected HashSet? _distinct; - protected uint _blocksInFile; protected CompositeBufferBase( - Func blockFactory, - IProvideDataBlocks? dataBlockProvider = null, - int blockSize = Consts.DefaultBlockSize, + NewBlockFactory newBlockFactory, + ExistingBlockFactory existingBlockFactory, + IProvideByteBlocks? dataBlockProvider = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - _blockFactory = blockFactory; - _dataBlockProvider = dataBlockProvider; - _blockSize = blockSize; - _maxInMemoryBlocks = maxInMemoryBlocks; + if (maxBlockSize < blockSize) + throw new ArgumentException($"Expected max block size to be greater or equal to block size: block-size:{blockSize}, max-block-size:{maxBlockSize}"); + + _newBlockFactory = newBlockFactory; + _existingBlockFactory = existingBlockFactory; + _dataBlockProvider = dataBlockProvider; + _blockSize = blockSize; + _maxInMemoryBlocks = maxInMemoryBlocks; + _maxBlockSize = maxBlockSize; if ((_maxDistinctItems = maxDistinctItems) > 0) _distinct = new HashSet((int)maxDistinctItems!.Value / 32); @@ -50,7 +62,6 @@ protected CompositeBufferBase( public MetaData MetaData { get; set; } = new(); public uint Size { get; protected set; } public uint BlockCount { get; private set; } - public uint BlockSize => (uint)_blockSize; public uint? DistinctItems => (uint?)_distinct?.Count; public Type DataType => typeof(T); @@ -62,11 +73,16 @@ public uint[] BlockSizes var index = 0; if (_inMemoryBlocks is not null) { foreach (var block in _inMemoryBlocks) { - ret[index++] = BlockSize; + ret[index++] = block.Size; } } - for(uint i = 0; i < _blocksInFile; i++) - ret[index++] = BlockSize; + + if (_fileBlockSizes is not null) { + foreach (var blockSize in _fileBlockSizes) + ret[index++] = blockSize; + } + + if (_currBlock is not null) ret[index] = _currBlock.Size; return ret; @@ -80,8 +96,7 @@ public async Task ForEachBlock(BlockCallback callback, INotifyOperationProgre var count = 0; // read from in memory blocks - if (_inMemoryBlocks is not null) - { + if (_inMemoryBlocks is not null) { foreach (var block in _inMemoryBlocks) { if (ct.IsCancellationRequested) break; @@ -91,12 +106,13 @@ public async Task ForEachBlock(BlockCallback callback, INotifyOperationProgre } // read from the file - if (_currentDataBlock != null) - { - uint fileLength = _currentDataBlock.Size, offset = 0; - while (offset < fileLength && !ct.IsCancellationRequested) - { - offset += await GetBlockFromFile(_currentDataBlock, offset, callback); + if (_currentDataBlock != null && _fileBlockSizes != null) { + var fileBlockIndex = 0; + uint fileLength = _currentDataBlock.Size, byteOffset = 0; + while (byteOffset < fileLength && !ct.IsCancellationRequested) { + var (size, block) = await GetBlockFromFile(_currentDataBlock, byteOffset, _fileBlockSizes[fileBlockIndex++]); + callback(block.Span); + byteOffset += size; notify?.OnOperationProgress(guid, (float)++count / BlockCount); } } @@ -112,10 +128,8 @@ public async Task ForEachBlock(BlockCallback callback, INotifyOperationProgre public override async IAsyncEnumerable EnumerateAllTyped() { // read from in memory blocks - if (_inMemoryBlocks is not null) - { - foreach (var block in _inMemoryBlocks) - { + if (_inMemoryBlocks is not null) { + foreach (var block in _inMemoryBlocks) { var data = block.WrittenMemory; for (var i = 0; i < data.Length; i++) { yield return data.Span[i]; @@ -124,16 +138,15 @@ public override async IAsyncEnumerable EnumerateAllTyped() } // read from the file - if (_currentDataBlock != null) - { - uint fileLength = _currentDataBlock.Size, offset = 0; - while (offset < fileLength) - { - var (size, block) = await GetBlockFromFile(_currentDataBlock, offset); + if (_currentDataBlock != null && _fileBlockSizes != null) { + var fileBlockIndex = 0; + uint fileLength = _currentDataBlock.Size, byteOffset = 0; + while (byteOffset < fileLength) { + var (size, block) = await GetBlockFromFile(_currentDataBlock, byteOffset, _fileBlockSizes[fileBlockIndex++]); for (var i = 0; i < block.Length; i++) { yield return block.Span[i]; } - offset += size; + byteOffset += size; } } @@ -147,39 +160,34 @@ public override async IAsyncEnumerable EnumerateAllTyped() public override async Task> GetTypedBlock(uint blockIndex) { uint currentIndex = 0; + var blocksInFile = (uint)(_fileBlockSizes?.Count ?? 0); // read from in memory blocks - if (_inMemoryBlocks is not null) - { - if (blockIndex < _blocksInFile + (uint)_inMemoryBlocks.Count) - { - foreach (var block in _inMemoryBlocks) - { - if (currentIndex++ == blockIndex) - { + if (_inMemoryBlocks is not null) { + if (blockIndex < blocksInFile + (uint)_inMemoryBlocks.Count) { + foreach (var block in _inMemoryBlocks) { + if (currentIndex++ == blockIndex) { return block.WrittenMemory; } } } else - currentIndex = _blocksInFile + (uint)_inMemoryBlocks.Count; + currentIndex = blocksInFile + (uint)_inMemoryBlocks.Count; } // read from the file - if (_currentDataBlock != null) - { - if (blockIndex < _blocksInFile) - { - uint fileLength = _currentDataBlock.Size, offset = 0; - while (offset < fileLength) - { + if (_currentDataBlock != null && _fileBlockSizes != null) { + var fileBlockIndex = 0; + if (blockIndex < blocksInFile) { + uint fileLength = _currentDataBlock.Size, byteOffset = 0; + while (byteOffset < fileLength) { if (currentIndex++ == blockIndex) - return (await GetBlockFromFile(_currentDataBlock, offset)).Block; - offset += await SkipFileBlock(_currentDataBlock, offset); + return (await GetBlockFromFile(_currentDataBlock, byteOffset, _fileBlockSizes[fileBlockIndex])).Block; + byteOffset += await SkipFileBlock(_currentDataBlock, byteOffset, _fileBlockSizes[fileBlockIndex++]); } } else - currentIndex = _blocksInFile; + currentIndex = blocksInFile; } // then from the current block @@ -195,7 +203,7 @@ public virtual void Append(in T item) var block = EnsureCurrentBlock().GetAwaiter().GetResult(); block.GetNext() = item; - if (_distinct?.Add(item) == true && _distinct.Count >= _maxDistinctItems) + if (_distinct?.Add(item) == true && _distinct.Count > _maxDistinctItems) _distinct = null; ++Size; } @@ -211,21 +219,24 @@ public virtual void Append(ReadOnlySpan inputBlock) protected async Task EnsureCurrentBlock() { - if (_currBlock?.HasFreeCapacity != true) - { - if (_currBlock is not null) - { - if (_maxInMemoryBlocks.HasValue && (_inMemoryBlocks?.Count ?? 0) >= _maxInMemoryBlocks.Value) - { + if (_currBlock?.HasFreeCapacity != true) { + if (_currBlock is not null) { + if (_maxInMemoryBlocks.HasValue && (_inMemoryBlocks?.Count ?? 0) >= _maxInMemoryBlocks.Value) { _currentDataBlock ??= (_dataBlockProvider ??= new TempFileProvider()).Get(Id); + (_fileBlockSizes ??= new()).Add(_currBlock.Size); await _currBlock.WriteTo(_currentDataBlock); - ++_blocksInFile; } else (_inMemoryBlocks ??= []).Add(_currBlock); } - _currBlock = _blockFactory(new T[_blockSize], false); + _currBlock = _newBlockFactory(new T[_blockSize]); ++BlockCount; + + // increase the size of the next block + var nextBlockSize = _blockSize * 2; + if (nextBlockSize > _maxBlockSize) + nextBlockSize = _maxBlockSize; + _blockSize = nextBlockSize; } return _currBlock!; } @@ -237,7 +248,6 @@ public async Task WriteTo(Stream stream) // write the header await using var writer = new BinaryWriter(stream, Encoding.UTF8, true); writer.Write(Size); - writer.Write(BlockSize); writer.Write(BlockCount); writer.Flush(); @@ -252,8 +262,7 @@ public async Task WriteTo(Stream stream) var pos = (uint)stream.Position; // write from in memory blocks - if (_inMemoryBlocks is not null) - { + if (_inMemoryBlocks is not null) { foreach (var block in _inMemoryBlocks) { var blockSize = await block.WriteTo(dataBlock); blockPositions[index++] = (pos, blockSize, block.Size); @@ -262,17 +271,17 @@ public async Task WriteTo(Stream stream) } // write from the file - if (_currentDataBlock != null) - { - uint fileLength = _currentDataBlock.Size, offset = 0; - while (offset < fileLength) - { - var (size, blockData) = await GetBlockFromFile(_currentDataBlock, offset); - var block = _blockFactory(blockData.ToArray(), true); - var blockSize = await block.WriteTo(dataBlock); - offset += size; - blockPositions[index++] = (pos, blockSize, block.Size); - pos += blockSize; + if (_currentDataBlock != null && _fileBlockSizes != null) { + var fileBlockIndex = 0; + uint fileLength = _currentDataBlock.Size, byteOffset = 0; + while (byteOffset < fileLength) { + var (size, blockData) = await GetBlockFromFile(_currentDataBlock, byteOffset, _fileBlockSizes[fileBlockIndex++]); + byteOffset += size; + + var block = _existingBlockFactory(blockData); + var writeSize = await block.WriteTo(dataBlock); + blockPositions[index++] = (pos, writeSize, block.Size); + pos += writeSize; } } @@ -282,6 +291,7 @@ public async Task WriteTo(Stream stream) blockPositions[index] = (pos, blockSize, _currBlock.Size); } + // write the block header stream.Seek(headerPosition, SeekOrigin.Begin); foreach (var (startPos, byteSize, count) in blockPositions) { writer.Write(startPos); @@ -293,9 +303,8 @@ public async Task WriteTo(Stream stream) stream.Seek(0, SeekOrigin.End); } - protected abstract Task SkipFileBlock(IByteBlockSource file, uint offset); - protected abstract Task<(uint Offset, ReadOnlyMemory Block)> GetBlockFromFile(IByteBlockSource file, uint offset); - protected abstract Task GetBlockFromFile(IByteBlockSource file, uint offset, BlockCallback callback); + protected abstract Task SkipFileBlock(IByteBlockSource file, uint byteOffset, uint numItemsInBlock); + protected abstract Task<(uint BlockSizeBytes, ReadOnlyMemory Block)> GetBlockFromFile(IByteBlockSource file, uint byteOffset, uint numItemsInBlock); public override string ToString() => $"Composite buffer ({typeof(T).Name})|{MetaData.GetName(Id.ToString("n"))}|count={Size:N0}"; } } diff --git a/BrightData/Buffer/Composite/ManagedCompositeBuffer.cs b/BrightData/Buffer/Composite/ManagedCompositeBuffer.cs index b959b928..74d5c80f 100644 --- a/BrightData/Buffer/Composite/ManagedCompositeBuffer.cs +++ b/BrightData/Buffer/Composite/ManagedCompositeBuffer.cs @@ -18,15 +18,16 @@ namespace BrightData.Buffer.Composite /// internal class ManagedCompositeBuffer( CreateFromReadOnlyByteSpan createItem, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) - : CompositeBufferBase>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems) + : CompositeBufferBase>(x => new(x), x => new(x), tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems) where T : IHaveDataAsReadOnlyByteSpan { - protected override async Task SkipFileBlock(IByteBlockSource file, uint offset) + protected override async Task SkipFileBlock(IByteBlockSource file, uint offset, uint numItemsInBlock) { var lengthBytes = new byte[MutableManagedBufferBlock.HeaderSize]; await file.ReadAsync(lengthBytes, offset); @@ -34,14 +35,14 @@ protected override async Task SkipFileBlock(IByteBlockSource file, uint of return blockSize + MutableManagedBufferBlock.HeaderSize; } - protected override async Task<(uint, ReadOnlyMemory)> GetBlockFromFile(IByteBlockSource file, uint offset) + protected override async Task<(uint, ReadOnlyMemory)> GetBlockFromFile(IByteBlockSource file, uint offset, uint numItemsInBlock) { - var (blockSize, buffer) = await ReadBuffer(file, offset); + var (blockByteSize, buffer) = await ReadBuffer(file, offset); try { - var buffer2 = new Memory(new T[_blockSize]); - Copy(buffer.Span, buffer2.Span); - return (blockSize + MutableManagedBufferBlock.HeaderSize, buffer2); + var buffer2 = new Memory(new T[numItemsInBlock]); + Copy(buffer.Span, buffer2.Span, numItemsInBlock); + return (blockByteSize + MutableManagedBufferBlock.HeaderSize, buffer2); } finally { @@ -49,25 +50,9 @@ protected override async Task SkipFileBlock(IByteBlockSource file, uint of } } - protected override async Task GetBlockFromFile(IByteBlockSource file, uint offset, BlockCallback callback) + void Copy(ReadOnlySpan inputSpan, Span outputSpan, uint numItemsInBlock) { - var (blockSize, buffer) = await ReadBuffer(file, offset); - try - { - using var buffer2 = MemoryOwner.Allocate(_blockSize); - Copy(buffer.Span, buffer2.Span); - callback(buffer2.Span); - return blockSize + MutableManagedBufferBlock.HeaderSize; - } - finally - { - buffer.Dispose(); - } - } - - void Copy(ReadOnlySpan inputSpan, Span outputSpan) - { - for (var i = 0; i < _blockSize; i++) + for (var i = 0; i < numItemsInBlock; i++) { var itemSize = BinaryPrimitives.ReadUInt32LittleEndian(inputSpan); var itemData = inputSpan[4..(int)(itemSize + 4)]; @@ -76,7 +61,7 @@ void Copy(ReadOnlySpan inputSpan, Span outputSpan) } } - static async Task<(uint BlockSize, MemoryOwner Buffer)> ReadBuffer(IByteBlockSource file, uint offset) + static async Task<(uint BlockByteSize, MemoryOwner Buffer)> ReadBuffer(IByteBlockSource file, uint offset) { var lengthBytes = new byte[MutableManagedBufferBlock.HeaderSize]; await file.ReadAsync(lengthBytes, offset); diff --git a/BrightData/Buffer/Composite/StringCompositeBuffer.cs b/BrightData/Buffer/Composite/StringCompositeBuffer.cs index 3ba4f9b7..c8e74877 100644 --- a/BrightData/Buffer/Composite/StringCompositeBuffer.cs +++ b/BrightData/Buffer/Composite/StringCompositeBuffer.cs @@ -15,12 +15,13 @@ namespace BrightData.Buffer.Composite /// /// internal class StringCompositeBuffer( - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) - : CompositeBufferBase((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems) + : CompositeBufferBase(x => new(x), x => new(x), tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems) { public override Task> GetTypedBlock(uint blockIndex) @@ -37,16 +38,16 @@ public override void Append(in string item) base.Append(item); } - protected override async Task SkipFileBlock(IByteBlockSource file, uint offset) + protected override async Task SkipFileBlock(IByteBlockSource file, uint byteOffset, uint numItemsInBlock) { var lengthBytes = new byte[4]; - await file.ReadAsync(lengthBytes, offset); + await file.ReadAsync(lengthBytes, byteOffset); return MutableStringBufferBlock.HeaderSize + BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes); } - protected override async Task<(uint, ReadOnlyMemory)> GetBlockFromFile(IByteBlockSource file, uint offset) + protected override async Task<(uint BlockSizeBytes, ReadOnlyMemory Block)> GetBlockFromFile(IByteBlockSource file, uint byteOffset, uint numItemsInBlock) { - var (numStrings, block) = await ReadBlock(file, offset); + var (numStrings, block) = await ReadBlock(file, byteOffset); try { var index = 0; @@ -64,27 +65,6 @@ protected override async Task SkipFileBlock(IByteBlockSource file, uint of } } - protected override async Task GetBlockFromFile(IByteBlockSource file, uint offset, BlockCallback callback) - { - var (numStrings, block) = await ReadBlock(file, offset); - try - { - var index = 0; - using var buffer = MemoryOwner.Allocate((int)numStrings); - MutableStringBufferBlock.Decode(block.Span, chars => - { - // ReSharper disable once AccessToDisposedClosure - buffer.Span[index++] = new string(chars); - }); - callback(buffer.Span); - return (uint)block.Length + MutableStringBufferBlock.HeaderSize; - } - finally - { - block.Dispose(); - } - } - static async Task<(uint NumStrings, MemoryOwner Block)> ReadBlock(IByteBlockSource file, uint offset) { var lengthBytes = new byte[MutableStringBufferBlock.HeaderSize]; diff --git a/BrightData/Buffer/Composite/UnmanagedCompositeBuffer.cs b/BrightData/Buffer/Composite/UnmanagedCompositeBuffer.cs index 311ffc16..08273803 100644 --- a/BrightData/Buffer/Composite/UnmanagedCompositeBuffer.cs +++ b/BrightData/Buffer/Composite/UnmanagedCompositeBuffer.cs @@ -12,11 +12,12 @@ namespace BrightData.Buffer.Composite /// /// internal class UnmanagedCompositeBuffer( - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null) - : CompositeBufferBase>((x, existing) => new(x, existing), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems) + : CompositeBufferBase>(x => new(x), x => new(x), tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems) where T : unmanaged { readonly int _sizeOfT = Unsafe.SizeOf(); @@ -26,11 +27,12 @@ public override async Task> GetTypedBlock(uint blockIndex) if (blockIndex >= BlockCount) throw new ArgumentOutOfRangeException(nameof(blockIndex), $"Must be less than {BlockCount}"); uint currentIndex = 0; + var blocksInFile = (uint)(_fileBlockSizes?.Count ?? 0); // read from the in memory blocks if (_inMemoryBlocks is not null) { - if (blockIndex < _blocksInFile + (uint)_inMemoryBlocks.Count) + if (blockIndex < blocksInFile + (uint)_inMemoryBlocks.Count) { foreach (var block in _inMemoryBlocks) { @@ -39,24 +41,24 @@ public override async Task> GetTypedBlock(uint blockIndex) } } else - currentIndex = _blocksInFile + (uint)_inMemoryBlocks.Count; + currentIndex = blocksInFile + (uint)_inMemoryBlocks.Count; } // read from the file - if (_currentDataBlock != null) - { - if (blockIndex < _blocksInFile) + if (_currentDataBlock != null && _fileBlockSizes != null) { + var fileBlockIndex = 0; + if (blockIndex < blocksInFile) { uint fileLength = _currentDataBlock.Size, offset = 0; - while (offset < fileLength) - { + while (offset < fileLength) { + var blockSize = _fileBlockSizes[fileBlockIndex++]; if (currentIndex++ == blockIndex) - return (await GetBlockFromFile(_currentDataBlock, offset)).Item2; - offset += (uint)(_blockSize * _sizeOfT); + return (await GetBlockFromFile(_currentDataBlock, offset, blockSize)).Item2; + offset += (uint)(blockSize * _sizeOfT); } } else - currentIndex = _blocksInFile; + currentIndex = blocksInFile; } // then from the current block @@ -89,33 +91,21 @@ public override void Append(ReadOnlySpan inputBlock) } } - protected override Task SkipFileBlock(IByteBlockSource file, uint offset) + protected override Task SkipFileBlock(IByteBlockSource file, uint offset, uint numItemsInBlock) { - return Task.FromResult((uint)_blockSize * (uint)_sizeOfT); + return Task.FromResult(numItemsInBlock * (uint)_sizeOfT); } - protected override async Task<(uint, ReadOnlyMemory)> GetBlockFromFile(IByteBlockSource file, uint offset) + protected override async Task<(uint, ReadOnlyMemory)> GetBlockFromFile(IByteBlockSource file, uint offset, uint numItemsInBlock) { - var ret = new Memory(new T[_blockSize]); + var ret = new Memory(new T[numItemsInBlock]); var buffer = ret.Cast(); uint readCount = 0; do { readCount += await file.ReadAsync(buffer[(int)readCount..], offset + readCount); - } while (readCount < _blockSize * _sizeOfT); - return ((uint)_blockSize * (uint)_sizeOfT, ret); - } - - protected override async Task GetBlockFromFile(IByteBlockSource file, uint offset, BlockCallback callback) - { - using var buffer = MemoryOwner.Allocate(_blockSize * _sizeOfT); - uint readCount = 0; - do - { - readCount += await file.ReadAsync(buffer.Memory[(int)readCount..], offset + readCount); - } while (readCount < _blockSize * _sizeOfT); - callback(buffer.Span.Cast()); - return (uint)_blockSize * (uint)_sizeOfT; + } while (readCount < numItemsInBlock * _sizeOfT); + return (numItemsInBlock * (uint)_sizeOfT, ret); } } } diff --git a/BrightData/Buffer/InMemoryBuffer.cs b/BrightData/Buffer/InMemoryBuffer.cs index 143207ac..59369a3f 100644 --- a/BrightData/Buffer/InMemoryBuffer.cs +++ b/BrightData/Buffer/InMemoryBuffer.cs @@ -1,21 +1,29 @@ using BrightData.Buffer.MutableBlocks; -using BrightData.Helper; using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; namespace BrightData.Buffer { - internal class InMemoryBuffer(uint blockSize = Consts.DefaultBlockSize) + internal class InMemoryBuffer : TypedBufferBase, IAppendableBuffer where T : notnull { + uint _blockSize; + readonly uint _maxBlockSize; protected List>? _inMemoryBlocks; protected MutableInMemoryBufferBlock? _currBlock; + public InMemoryBuffer(uint blockSize = Consts.DefaultInitialBlockSize, uint maxBlockSize = Consts.DefaultMaxBlockSize) + { + if (maxBlockSize < blockSize) + throw new ArgumentException($"Expected max block size to be greater or equal to block size: block-size:{blockSize}, max-block-size:{maxBlockSize}"); + + _blockSize = blockSize; + _maxBlockSize = maxBlockSize; + } + public Task ForEachBlock(BlockCallback callback, INotifyOperationProgress? notify = null, string? message = null, CancellationToken ct = default) { var guid = Guid.NewGuid(); @@ -86,6 +94,7 @@ public override async IAsyncEnumerable EnumerateAllTyped() } } + // then from current block if (_currBlock is not null) { for (var i = 0; i < _currBlock.WrittenMemory.Length; i++) @@ -107,9 +116,7 @@ public uint[] BlockSizes if (_inMemoryBlocks is not null) { foreach (var block in _inMemoryBlocks) - { - ret[index++] = blockSize; - } + ret[index++] = block.Size; } if (_currBlock is not null) ret[index] = _currBlock.Size; @@ -144,8 +151,14 @@ protected MutableInMemoryBufferBlock EnsureCurrentBlock() { (_inMemoryBlocks ??= []).Add(_currBlock); } - _currBlock = new(new T[blockSize], false); + _currBlock = new(new T[_blockSize]); ++BlockCount; + + // increase the size of the next block + var nextBlockSize = _blockSize * 2; + if (nextBlockSize > _maxBlockSize) + nextBlockSize = _maxBlockSize; + _blockSize = nextBlockSize; } return _currBlock; } diff --git a/BrightData/Buffer/MutableBlocks/MutableInMemoryBufferBlock.cs b/BrightData/Buffer/MutableBlocks/MutableInMemoryBufferBlock.cs index 5d2a140e..7187b2f8 100644 --- a/BrightData/Buffer/MutableBlocks/MutableInMemoryBufferBlock.cs +++ b/BrightData/Buffer/MutableBlocks/MutableInMemoryBufferBlock.cs @@ -1,17 +1,14 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace BrightData.Buffer.MutableBlocks { - internal class MutableInMemoryBufferBlock(T[] Data) : IMutableBufferBlock + internal class MutableInMemoryBufferBlock(Memory Data) : IMutableBufferBlock { - public MutableInMemoryBufferBlock(T[] data, bool existing) : this(data) + public MutableInMemoryBufferBlock(ReadOnlyMemory data) : this(Unsafe.As, Memory>(ref data)) { - if (existing) - Size = (uint)data.Length; + Size = (uint)data.Length; } public uint Size { get; private set; } @@ -21,8 +18,8 @@ public Task WriteTo(IByteBlockSource file) } public bool HasFreeCapacity => Size < Data.Length; - public ReadOnlySpan WrittenSpan => new(Data, 0, (int)Size); - public ReadOnlyMemory WrittenMemory => new(Data, 0, (int)Size); - public ref T GetNext() => ref Data[Size++]; + public ReadOnlySpan WrittenSpan => Data.Span[..(int)Size]; + public ReadOnlyMemory WrittenMemory => Data[..(int)Size]; + public ref T GetNext() => ref Data.Span[(int)Size++]; } } diff --git a/BrightData/Buffer/MutableBlocks/MutableManagedBufferBlock.cs b/BrightData/Buffer/MutableBlocks/MutableManagedBufferBlock.cs index c0631ed2..bf5733b4 100644 --- a/BrightData/Buffer/MutableBlocks/MutableManagedBufferBlock.cs +++ b/BrightData/Buffer/MutableBlocks/MutableManagedBufferBlock.cs @@ -1,26 +1,26 @@ using System; using System.Buffers; using System.Buffers.Binary; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using CommunityToolkit.HighPerformance.Buffers; namespace BrightData.Buffer.MutableBlocks { - internal class MutableManagedBufferBlock(T[] Data) : IMutableBufferBlock + internal class MutableManagedBufferBlock(Memory Data) : IMutableBufferBlock where T : IHaveDataAsReadOnlyByteSpan { - public MutableManagedBufferBlock(T[] data, bool existing) : this(data) + public MutableManagedBufferBlock(ReadOnlyMemory data) : this(Unsafe.As, Memory>(ref data)) { - if (existing) - Size = (uint)data.Length; + Size = (uint)data.Length; } public const int HeaderSize = 8; public uint Size { get; private set; } - public ref T GetNext() => ref Data[Size++]; + public ref T GetNext() => ref Data.Span[(int)Size++]; public bool HasFreeCapacity => Size < Data.Length; - public ReadOnlySpan WrittenSpan => new(Data, 0, (int)Size); - public ReadOnlyMemory WrittenMemory => new(Data, 0, (int)Size); + public ReadOnlySpan WrittenSpan => Data.Span[..(int)Size]; + public ReadOnlyMemory WrittenMemory => Data[..(int)Size]; public async Task WriteTo(IByteBlockSource file) { @@ -29,8 +29,8 @@ public async Task WriteTo(IByteBlockSource file) var memoryOwner = (IMemoryOwner)writer; writer.Advance(HeaderSize); uint size = 0; - for (uint i = 0; i < Size; i++) - size += WriteBlock(Data[i].DataAsBytes, writer); + for (var i = 0; i < Size; i++) + size += WriteBlock(Data.Span[i].DataAsBytes, writer); BinaryPrimitives.WriteUInt32LittleEndian(memoryOwner.Memory.Span, Size); BinaryPrimitives.WriteUInt32LittleEndian(memoryOwner.Memory.Span[4..], size); await file.WriteAsync(writer.WrittenMemory, offset); diff --git a/BrightData/Buffer/MutableBlocks/MutableStringBufferBlock.cs b/BrightData/Buffer/MutableBlocks/MutableStringBufferBlock.cs index 33017f9f..8bb437de 100644 --- a/BrightData/Buffer/MutableBlocks/MutableStringBufferBlock.cs +++ b/BrightData/Buffer/MutableBlocks/MutableStringBufferBlock.cs @@ -1,33 +1,33 @@ using System; using System.Buffers.Binary; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using CommunityToolkit.HighPerformance.Buffers; namespace BrightData.Buffer.MutableBlocks { - internal class MutableStringBufferBlock(string[] Data) : IMutableBufferBlock + internal class MutableStringBufferBlock(Memory Data) : IMutableBufferBlock { - public MutableStringBufferBlock(string[] data, bool existing) : this(data) + public MutableStringBufferBlock(ReadOnlyMemory data) : this(Unsafe.As, Memory>(ref data)) { - if (existing) - Size = (uint)data.Length; + Size = (uint)data.Length; } public uint Size { get; private set; } - public ref string GetNext() => ref Data[Size++]; + public ref string GetNext() => ref Data.Span[(int)Size++]; public bool HasFreeCapacity => Size < Data.Length; - public ReadOnlySpan WrittenSpan => new(Data, 0, (int)Size); - public ReadOnlyMemory WrittenMemory => new(Data, 0, (int)Size); + public ReadOnlySpan WrittenSpan => Data.Span[..(int)Size]; + public ReadOnlyMemory WrittenMemory => Data[..(int)Size]; public const int HeaderSize = 8; public async Task WriteTo(IByteBlockSource file) { var offset = file.Size; var startOffset = offset += HeaderSize; - for (uint i = 0; i < Size; i++) + for (var i = 0; i < Size; i++) { - Encode(Data[i], bytes => + Encode(Data.Span[i], bytes => { file.Write(bytes, offset); offset += (uint)bytes.Length; diff --git a/BrightData/Buffer/MutableBlocks/MutableUnmanagedBufferBlock.cs b/BrightData/Buffer/MutableBlocks/MutableUnmanagedBufferBlock.cs index a3271ce3..585b321e 100644 --- a/BrightData/Buffer/MutableBlocks/MutableUnmanagedBufferBlock.cs +++ b/BrightData/Buffer/MutableBlocks/MutableUnmanagedBufferBlock.cs @@ -1,27 +1,28 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; +using System.Runtime.Intrinsics.Arm; using System.Text; using System.Threading.Tasks; using CommunityToolkit.HighPerformance; namespace BrightData.Buffer.MutableBlocks { - internal class MutableUnmanagedBufferBlock(T[] Data) : IMutableBufferBlock + internal class MutableUnmanagedBufferBlock(Memory Data) : IMutableBufferBlock where T : unmanaged { - public MutableUnmanagedBufferBlock(T[] data, bool existing) : this(data) + public MutableUnmanagedBufferBlock(ReadOnlyMemory data) : this(Unsafe.As, Memory>(ref data)) { - if (existing) - Size = (uint)data.Length; + Size = (uint)data.Length; } public uint Size { get; private set; } - public ref T GetNext() => ref Data[Size++]; + public ref T GetNext() => ref Data.Span[(int)Size++]; public bool HasFreeCapacity => Size < Data.Length; public uint AvailableCapacity => (uint)Data.Length - Size; - public ReadOnlySpan WrittenSpan => new(Data, 0, (int)Size); - public ReadOnlyMemory WrittenMemory => new(Data, 0, (int)Size); + public ReadOnlySpan WrittenSpan => Data.Span[..(int)Size]; + public ReadOnlyMemory WrittenMemory => Data[..(int)Size]; public async Task WriteTo(IByteBlockSource file) { @@ -32,7 +33,7 @@ public async Task WriteTo(IByteBlockSource file) public void Write(ReadOnlySpan data) { - data.CopyTo(Data.AsSpan((int)Size, (int)AvailableCapacity)); + data.CopyTo(Data.Span.Slice((int)Size, (int)AvailableCapacity)); Size += (uint)data.Length; } } diff --git a/BrightData/Buffer/ReadOnly/BlockReaderReadOnlyBuffer.cs b/BrightData/Buffer/ReadOnly/BlockReaderReadOnlyBuffer.cs index 3d59ce2b..baf1ff08 100644 --- a/BrightData/Buffer/ReadOnly/BlockReaderReadOnlyBuffer.cs +++ b/BrightData/Buffer/ReadOnly/BlockReaderReadOnlyBuffer.cs @@ -19,7 +19,7 @@ internal class BlockReaderReadOnlyBuffer : TypedBufferBase, IReadOnlyBuffe ReadOnlyMemory? _lastBlock = null; uint _lastBlockIndex = uint.MaxValue; - public BlockReaderReadOnlyBuffer(IByteBlockReader reader, MetaData metadata, uint offset, uint byteSize, uint blockSize = Consts.DefaultBlockSize) + public BlockReaderReadOnlyBuffer(IByteBlockReader reader, MetaData metadata, uint offset, uint byteSize, uint blockSize) { MetaData = metadata; _reader = reader; diff --git a/BrightData/Buffer/ReadOnly/ReadOnlyCompositeBufferBase.cs b/BrightData/Buffer/ReadOnly/ReadOnlyCompositeBufferBase.cs index 90c74e5f..a4007278 100644 --- a/BrightData/Buffer/ReadOnly/ReadOnlyCompositeBufferBase.cs +++ b/BrightData/Buffer/ReadOnly/ReadOnlyCompositeBufferBase.cs @@ -24,7 +24,6 @@ protected ReadOnlyCompositeBufferBase(Stream stream) _stream = stream; using var reader = new BinaryReader(stream, Encoding.UTF8, true); Size = reader.ReadUInt32(); - BlockSize = reader.ReadUInt32(); BlockCount = reader.ReadUInt32(); BlockSizes = new uint[BlockCount]; @@ -36,7 +35,6 @@ protected ReadOnlyCompositeBufferBase(Stream stream) } public uint Size { get; } - public uint BlockSize { get; } public uint BlockCount { get; } public Type DataType => typeof(T); public uint[] BlockSizes { get; } diff --git a/BrightData/Consts.cs b/BrightData/Consts.cs index d64cbc08..c8ffe6c0 100644 --- a/BrightData/Consts.cs +++ b/BrightData/Consts.cs @@ -31,30 +31,20 @@ public class Consts public const int MinimumSizeForVectorised = 64; /// - /// Default in memory buffer size + /// Default initial size of a block buffer /// - public const int DefaultInMemoryBufferSize = 32768 * 4; + public const int DefaultInitialBlockSize = 1_024; /// - /// Default number of table rows to cache in memory + /// Default max size of a block buffer /// - public const int DefaultTableRowCacheSize = 32768; - - /// - /// Default size of a small buffer - /// - public const int DefaultBlockSize = 1024; + public const int DefaultMaxBlockSize = 32_768; /// /// Default maximum number of blocks to keep in memory /// public const int DefaultMaxBlocksInMemory = 4096; - /// - /// Default max number of distinct items - /// - public const ushort DefaultMaxDistinctCount = 32768; - /// /// Default max write count /// diff --git a/BrightData/DataTable/ColumnOrientedDataTable.cs b/BrightData/DataTable/ColumnOrientedDataTable.cs index 750bc35a..9f65fed7 100644 --- a/BrightData/DataTable/ColumnOrientedDataTable.cs +++ b/BrightData/DataTable/ColumnOrientedDataTable.cs @@ -1,11 +1,9 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; -using BrightData.Buffer.Composite; using BrightData.Buffer.MutableBlocks; using BrightData.Buffer.ReadOnly; using BrightData.Buffer.ReadOnly.Helper; diff --git a/BrightData/DataTable/ColumnOrientedDataTableBuilder.cs b/BrightData/DataTable/ColumnOrientedDataTableBuilder.cs index 35d18d62..928f56df 100644 --- a/BrightData/DataTable/ColumnOrientedDataTableBuilder.cs +++ b/BrightData/DataTable/ColumnOrientedDataTableBuilder.cs @@ -16,8 +16,9 @@ namespace BrightData.DataTable /// internal class ColumnOrientedDataTableBuilder( BrightDataContext context, - IProvideDataBlocks? tempData = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempData = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = Consts.DefaultMaxBlocksInMemory) : IBuildDataTables { @@ -48,7 +49,7 @@ public ICompositeBuffer CreateColumn(BrightDataType type, string? name = null) public ICompositeBuffer CreateColumn(BrightDataType type, MetaData metaData) { - var buffer = type.CreateCompositeBuffer(tempData, blockSize, maxInMemoryBlocks); + var buffer = type.CreateCompositeBuffer(tempData, blockSize, maxBlockSize, maxInMemoryBlocks); metaData.CopyTo(buffer.MetaData); buffer.MetaData.Set(Consts.ColumnIndex, (uint)_columns.Count); _columns.Add(buffer); @@ -117,7 +118,7 @@ public Task AddRows(IReadOnlyList buffers, Cancella public Task WriteTo(Stream stream) { - var writer = new ColumnOrientedDataTableWriter(tempData, blockSize, maxInMemoryBlocks); + var writer = new ColumnOrientedDataTableWriter(tempData, blockSize, maxBlockSize, maxInMemoryBlocks); return writer.Write( TableMetaData, _columns.Cast().ToArray(), diff --git a/BrightData/DataTable/ColumnOrientedDataTableWriter.cs b/BrightData/DataTable/ColumnOrientedDataTableWriter.cs index b55b14c2..61a3fbc3 100644 --- a/BrightData/DataTable/ColumnOrientedDataTableWriter.cs +++ b/BrightData/DataTable/ColumnOrientedDataTableWriter.cs @@ -6,7 +6,6 @@ using System.Runtime.InteropServices; using System.Text; using System.Threading.Tasks; -using BrightData.Buffer.Composite; using BrightData.Buffer.MutableBlocks; using BrightData.DataTable.Columns; using BrightData.LinearAlgebra.ReadOnly; @@ -20,8 +19,9 @@ namespace BrightData.DataTable /// Writes buffers to a data table /// internal class ColumnOrientedDataTableWriter( - IProvideDataBlocks? tempData = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempData = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = Consts.DefaultMaxBlocksInMemory) { public async Task Write(MetaData tableMetaData, IReadOnlyBufferWithMetaData[] buffers, Stream output) @@ -55,11 +55,11 @@ public async Task Write(MetaData tableMetaData, IReadOnlyBufferWithMetaData[] bu header.InfoSizeBytes = (uint)(output.Position - header.InfoOffset); header.DataOffset = (uint)output.Position; - var indexWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxInMemoryBlocks)); - var weightedIndexWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxInMemoryBlocks)); - var byteWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxInMemoryBlocks)); - var stringWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxInMemoryBlocks)); - var floatWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxInMemoryBlocks)); + var indexWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxBlockSize, maxInMemoryBlocks)); + var weightedIndexWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxBlockSize, maxInMemoryBlocks)); + var byteWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxBlockSize, maxInMemoryBlocks)); + var stringWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxBlockSize, maxInMemoryBlocks)); + var floatWriter = new Lazy>(() => tempData.CreateCompositeBuffer(blockSize, maxBlockSize, maxInMemoryBlocks)); // write the data (column oriented) foreach (var columnSegment in buffers) { diff --git a/BrightData/ExtensionMethods.Buffers.cs b/BrightData/ExtensionMethods.Buffers.cs index ca2c618d..3f1a259b 100644 --- a/BrightData/ExtensionMethods.Buffers.cs +++ b/BrightData/ExtensionMethods.Buffers.cs @@ -289,15 +289,17 @@ await buffer.ForEachBlock(x => { /// /// /// + /// /// /// /// public static ICompositeBuffer CreateCompositeBuffer( - this IProvideDataBlocks? tempStreams, - int blockSize = Consts.DefaultBlockSize, + this IProvideByteBlocks? tempStreams, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null - ) => new StringCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + ) => new StringCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); /// /// Creates a composite buffer for types that can be created from a block of byte data @@ -306,15 +308,17 @@ public static ICompositeBuffer CreateCompositeBuffer( /// /// /// + /// /// /// /// public static ICompositeBuffer CreateCompositeBuffer( - this IProvideDataBlocks? tempStreams, + this IProvideByteBlocks? tempStreams, CreateFromReadOnlyByteSpan createItem, - int blockSize = Consts.DefaultBlockSize, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, - uint? maxDistinctItems = null) where T: IHaveDataAsReadOnlyByteSpan => new ManagedCompositeBuffer(createItem, tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + uint? maxDistinctItems = null) where T: IHaveDataAsReadOnlyByteSpan => new ManagedCompositeBuffer(createItem, tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); /// /// Creates a composite buffer for unmanaged types @@ -322,15 +326,17 @@ public static ICompositeBuffer CreateCompositeBuffer( /// /// /// + /// /// /// /// public static ICompositeBuffer CreateCompositeBuffer( - this IProvideDataBlocks? tempStreams, - int blockSize = Consts.DefaultBlockSize, + this IProvideByteBlocks? tempStreams, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null - ) where T: unmanaged => new UnmanagedCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + ) where T: unmanaged => new UnmanagedCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); /// /// Creates a composite buffer @@ -338,37 +344,39 @@ public static ICompositeBuffer CreateCompositeBuffer( /// /// /// + /// /// /// /// /// public static ICompositeBuffer CreateCompositeBuffer( this BrightDataType dataType, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null) { return dataType switch { - BrightDataType.BinaryData => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Boolean => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Date => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.DateOnly => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Decimal => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.SByte => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Short => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Int => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Long => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Float => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Double => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.String => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.IndexList => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.WeightedIndexList => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Vector => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Matrix => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Tensor3D => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.Tensor4D => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems), - BrightDataType.TimeOnly => CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Boolean => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Date => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.DateOnly => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.TimeOnly => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Decimal => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.SByte => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Short => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Int => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Long => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Float => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Double => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.String => CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.BinaryData => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.IndexList => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.WeightedIndexList => CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Vector => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Matrix => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Tensor3D => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + BrightDataType.Tensor4D => CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), _ => throw new ArgumentOutOfRangeException(nameof(dataType), dataType, $"Not able to create a composite buffer for type: {dataType}") }; } @@ -397,19 +405,21 @@ public static ICompositeBuffer CreateCompositeBuffer( /// /// /// + /// /// /// /// public static (T[] Table, ICompositeBuffer Data) Encode( this ICompositeBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null ) where T : notnull { if(!buffer.DistinctItems.HasValue) throw new ArgumentException("Buffer cannot be encoded as the number of distinct items is not known - create the composite buffer with a high max distinct items", nameof(buffer)); var table = new Dictionary((int)buffer.DistinctItems.Value); - var data = new UnmanagedCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks); + var data = new UnmanagedCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks); buffer.ForEachBlock(block => { var len = block.Length; @@ -454,15 +464,17 @@ public static (T[] Table, ICompositeBuffer Data) Encode( /// /// /// + /// /// /// /// public static ICompositeBuffer CreateCompositeBuffer(this Type type, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null - ) => CreateCompositeBuffer(GetBrightDataType(type), tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + ) => CreateCompositeBuffer(GetBrightDataType(type), tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); /// /// Creates a column analyser @@ -530,20 +542,22 @@ static BufferCopyOperation CastBuffer(IReadOnlyBuffer buffer, IApp /// /// public static IOperation Analyse(this IReadOnlyBufferWithMetaData buffer, bool force) => Analyse(buffer.MetaData, force, buffer); - + /// /// Creates a numeric composite buffer from an existing buffer /// /// /// /// + /// /// /// /// /// public static async Task ToNumeric(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { @@ -572,7 +586,7 @@ public static async Task ToNumeric(this IReadOnlyBuffer buffer : BrightDataType.Double; } - var output = CreateCompositeBuffer(toType, tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(toType, tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); var converter = buffer.ConvertUnmanagedTo(toType.GetDataType()); var conversion = GenericTypeMapping.BufferCopyOperation(converter, output); await conversion.Execute(); @@ -587,16 +601,18 @@ public static async Task ToNumeric(this IReadOnlyBuffer buffer /// /// /// + /// /// /// /// public static async Task> ToBoolean(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(bool)) conversion = buffer.CreateBufferCopyOperation(output); @@ -618,16 +634,18 @@ public static async Task> ToBoolean(this IReadOnlyBuffer /// /// /// + /// /// /// /// public static async Task> ToString(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); var conversion = buffer.DataType == typeof(string) ? buffer.CreateBufferCopyOperation(output) : new BufferCopyOperation(GenericTypeMapping.ToStringConverter(buffer), output, null); @@ -641,16 +659,18 @@ public static async Task> ToString(this IReadOnlyBuffer /// /// /// + /// /// /// /// public static async Task> ToDateTime(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(DateTime)) conversion = buffer.CreateBufferCopyOperation(output); @@ -682,16 +702,18 @@ static DateTime StringToDate(string str) /// /// /// + /// /// /// /// public static async Task> ToDate(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(DateOnly)) conversion = buffer.CreateBufferCopyOperation(output); @@ -723,16 +745,18 @@ static DateOnly StringToDate(string str) /// /// /// + /// /// /// /// public static async Task> ToTime(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(TimeOnly)) conversion = buffer.CreateBufferCopyOperation(output); @@ -764,17 +788,19 @@ static TimeOnly StringToTime(string str) /// /// /// + /// /// /// /// public static async Task> ToCategoricalIndex(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); var index = GenericTypeMapping.TypedIndexer(buffer); await index.Execute(); var indexer = (ICanIndex)index; @@ -797,17 +823,19 @@ public static async Task> ToCategoricalIndex(this IReadOnl /// /// /// + /// /// /// /// /// public static async Task> ToIndexList(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(IndexList)) conversion = new NopConversion((IReadOnlyBuffer)buffer, output); @@ -830,16 +858,18 @@ public static async Task> ToIndexList(this IReadOnly /// /// /// + /// /// /// /// public static async Task>> ToVector(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(ReadOnlyVector)) conversion = new NopConversion>((IReadOnlyBuffer>)buffer, output); @@ -873,17 +903,19 @@ public static async Task>> ToVector(this /// /// /// + /// /// /// /// /// public static async Task> ToWeightedIndexList(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); IOperation conversion; if (buffer.DataType == typeof(WeightedIndexList)) conversion = new NopConversion((IReadOnlyBuffer)buffer, output); @@ -907,16 +939,18 @@ public static async Task> ToWeightedIndexLis /// /// /// + /// /// /// /// public static async Task> To(this IReadOnlyBuffer buffer, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null) where T: unmanaged { - var output = CreateCompositeBuffer(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); // convert from strings if (buffer.DataType == typeof(string)) @@ -934,16 +968,18 @@ public static async Task> To(this IReadOnlyBuffer buffer, /// /// /// + /// /// /// /// public static async Task>> Vectorise(this IReadOnlyBuffer[] buffers, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - var output = CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxInMemoryBlocks, maxDistinctItems); + var output = CreateCompositeBuffer>(tempStreams, x => new(x), blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems); var floatBuffers = buffers.Select(x => x.ConvertTo()); var conversion = new ManyToOneMutation>(floatBuffers, output, x => new(x)); await conversion.Execute(); diff --git a/BrightData/ExtensionMethods.DataTable.cs b/BrightData/ExtensionMethods.DataTable.cs index 320956e7..56c761c5 100644 --- a/BrightData/ExtensionMethods.DataTable.cs +++ b/BrightData/ExtensionMethods.DataTable.cs @@ -982,31 +982,32 @@ public static async Task Normalize(this IDataTable dataTable, string public static async Task Convert( this IReadOnlyBufferWithMetaData buffer, ColumnConversion conversion, - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { return conversion switch { ColumnConversion.Unchanged => buffer, - ColumnConversion.ToBoolean => await buffer.ToBoolean(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToByte => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToCategoricalIndex => await buffer.ToCategoricalIndex(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToDateTime => await buffer.ToDateTime(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToDate => await buffer.ToDate(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToTime => await buffer.ToTime(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToDecimal => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToDouble => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToFloat => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToIndexList => await buffer.ToIndexList(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToInt => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToLong => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToNumeric => await buffer.ToNumeric(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToString => await buffer.ToString(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToWeightedIndexList => await buffer.ToWeightedIndexList(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToVector => await buffer.ToVector(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), - ColumnConversion.ToShort => await buffer.To(tempStreams, blockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToBoolean => await buffer.ToBoolean(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToByte => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToCategoricalIndex => await buffer.ToCategoricalIndex(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToDateTime => await buffer.ToDateTime(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToDate => await buffer.ToDate(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToTime => await buffer.ToTime(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToDecimal => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToDouble => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToFloat => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToIndexList => await buffer.ToIndexList(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToInt => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToLong => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToNumeric => await buffer.ToNumeric(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToString => await buffer.ToString(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToWeightedIndexList => await buffer.ToWeightedIndexList(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToVector => await buffer.ToVector(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), + ColumnConversion.ToShort => await buffer.To(tempStreams, blockSize, maxBlockSize, maxInMemoryBlocks, maxDistinctItems), _ => throw new ArgumentOutOfRangeException(nameof(conversion), conversion, null) }; } diff --git a/BrightData/ExtensionMethods.Span.cs b/BrightData/ExtensionMethods.Span.cs index b2b72035..146c49d0 100644 --- a/BrightData/ExtensionMethods.Span.cs +++ b/BrightData/ExtensionMethods.Span.cs @@ -1684,13 +1684,13 @@ public static MemoryOwner ManhattanNormalize(this ReadOnlySpan span) /// /// /// - public static IReadOnlyVector ToReadOnlyVector(this ReadOnlySpan span) where T: unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue => new ReadOnlyVector(span.ToArray()); + public static ReadOnlyVector ToReadOnlyVector(this ReadOnlySpan span) where T: unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue => new(span.ToArray()); /// /// Creates a read only vector from the span /// /// /// - public static IReadOnlyVector ToReadOnlyVector(this Span span) where T: unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue => new ReadOnlyVector(span.ToArray()); + public static ReadOnlyVector ToReadOnlyVector(this Span span) where T: unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue => new(span.ToArray()); } } diff --git a/BrightData/ExtensionMethods.cs b/BrightData/ExtensionMethods.cs index 5fa7118e..7a862400 100644 --- a/BrightData/ExtensionMethods.cs +++ b/BrightData/ExtensionMethods.cs @@ -17,6 +17,7 @@ using BrightData.Converter; using BrightData.DataTable; using BrightData.Helper; +using BrightData.Helper.Vectors; using BrightData.LinearAlgebra.Clustering; using BrightData.LinearAlgebra.ReadOnly; using CommunityToolkit.HighPerformance; @@ -973,5 +974,17 @@ public static IEnumerable EnumerateValues(this AT array) for (uint i = 0, len = array.Size; i < len; i++) yield return array[i].Value; } + + public static ISupportKnnSearch KDTreeSearch(this IReadOnlyVectorStore vectors) + where T : unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue + { + return new VectorKDTree(vectors); + } + + public static ISupportKnnSearch BallTreeSearch(this IReadOnlyVectorStore vectors, DistanceMetric distanceMetric = DistanceMetric.Cosine) + where T : unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue + { + return new VectorBallTree(vectors, distanceMetric); + } } } diff --git a/BrightData/Helper/CsvParser.cs b/BrightData/Helper/CsvParser.cs index 117916d4..5c381614 100644 --- a/BrightData/Helper/CsvParser.cs +++ b/BrightData/Helper/CsvParser.cs @@ -115,7 +115,7 @@ public void FinishLine(ReadOnlySpan data, bool force) text = text[1..^1]; while ((Columns ??= []).Count <= j) - Columns.Add(new StringCompositeBuffer(parser._tempStreams, parser._blockSize, parser._maxInMemoryBlocks, parser._maxDistinctItems)); + Columns.Add(new StringCompositeBuffer(parser._tempStreams, parser._blockSize, parser._maxBlockSize, parser._maxInMemoryBlocks, parser._maxDistinctItems)); // set the column name if needed if (_isFirstRow && parser._firstRowIsHeader) @@ -145,8 +145,8 @@ public void FinishLine(ReadOnlySpan data, bool force) readonly bool _firstRowIsHeader; readonly char _delimiter, _quote; - readonly IProvideDataBlocks? _tempStreams; - readonly int _blockSize; + readonly IProvideByteBlocks? _tempStreams; + readonly int _blockSize, _maxBlockSize; readonly uint? _maxInMemoryBlocks, _maxDistinctItems; /// @@ -163,19 +163,21 @@ public CsvParser( bool firstRowIsHeader, char delimiter, char quote = '"', - IProvideDataBlocks? tempStreams = null, - int blockSize = Consts.DefaultBlockSize, + IProvideByteBlocks? tempStreams = null, + int blockSize = Consts.DefaultInitialBlockSize, + int maxBlockSize = Consts.DefaultMaxBlockSize, uint? maxInMemoryBlocks = null, uint? maxDistinctItems = null ) { - _firstRowIsHeader = firstRowIsHeader; - _delimiter = delimiter; - _quote = quote; - _tempStreams = tempStreams; - _blockSize = blockSize; + _firstRowIsHeader = firstRowIsHeader; + _delimiter = delimiter; + _quote = quote; + _tempStreams = tempStreams; + _blockSize = blockSize; + _maxBlockSize = maxBlockSize; _maxInMemoryBlocks = maxInMemoryBlocks; - _maxDistinctItems = maxDistinctItems; + _maxDistinctItems = maxDistinctItems; } /// diff --git a/BrightData/Helper/TempFileProvider.cs b/BrightData/Helper/TempFileProvider.cs index 50e32428..d4ea491a 100644 --- a/BrightData/Helper/TempFileProvider.cs +++ b/BrightData/Helper/TempFileProvider.cs @@ -10,7 +10,7 @@ namespace BrightData.Helper /// Temp file provider /// /// - internal class TempFileProvider(string? basePath = null) : IProvideDataBlocks + internal class TempFileProvider(string? basePath = null) : IProvideByteBlocks { class TempData(Guid id, string path) : IByteBlockSource { diff --git a/BrightData/Interfaces.DataTable.cs b/BrightData/Interfaces.DataTable.cs index e0c3750b..62cc640b 100644 --- a/BrightData/Interfaces.DataTable.cs +++ b/BrightData/Interfaces.DataTable.cs @@ -486,19 +486,19 @@ public interface IByteBlockSource : IDisposable, IHaveSize } /// - /// Provides data blocks + /// Provides byte blocks /// - public interface IProvideDataBlocks : IDisposable + public interface IProvideByteBlocks : IDisposable { /// - /// Returns a data block associated with the id + /// Returns a new or existing byte block associated with the id /// /// /// IByteBlockSource Get(Guid id); /// - /// Clears all data blocks + /// Clears all byte blocks /// void Clear(); } diff --git a/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/KnnSearchVectorIndex.cs b/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/KnnSearchVectorIndex.cs index 01a74c88..501ab335 100644 --- a/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/KnnSearchVectorIndex.cs +++ b/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/KnnSearchVectorIndex.cs @@ -18,7 +18,11 @@ public void Dispose() public IStoreVectors Storage => vectors; - public uint Add(ReadOnlySpan vector) => Storage.Add(vector); + public uint Add(ReadOnlySpan vector) + { + _search = null; + return Storage.Add(vector); + } public IEnumerable Rank(ReadOnlySpan vector) { diff --git a/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/PreCalculatedEmbeddingVectorIndex.cs b/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/PreCalculatedEmbeddingVectorIndex.cs new file mode 100644 index 00000000..6013a83f --- /dev/null +++ b/BrightData/LinearAlgebra/VectorIndexing/IndexStrategy/PreCalculatedEmbeddingVectorIndex.cs @@ -0,0 +1,60 @@ +using BrightData.LinearAlgebra.VectorIndexing.Storage; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Numerics; +using System.Text; +using System.Threading.Tasks; + +namespace BrightData.LinearAlgebra.VectorIndexing.IndexStrategy +{ + internal class PreCalculatedEmbeddingVectorIndex : IVectorIndex + where T: unmanaged, IBinaryFloatingPointIeee754, IMinMaxValue + { + readonly LinearAlgebraProvider _lap; + readonly IMatrix _weights; + readonly IVector _bias; + readonly FlatVectorIndex _embeddingIndex; + + public PreCalculatedEmbeddingVectorIndex(IStoreVectors storage, IMatrix weights, IVector bias, DistanceMetric distanceMetric, uint? capacity) + { + _lap = weights.LinearAlgebraProvider; + _weights = weights; + _bias = bias; + Storage = storage; + _embeddingIndex = new(new InMemoryVectorStorage(weights.ColumnCount, capacity), distanceMetric); + } + + public void Dispose() + { + _weights.Dispose(); + _bias.Dispose(); + } + + public IStoreVectors Storage { get; } + + public uint Add(ReadOnlySpan vector) + { + using var vector2 = _weights.LinearAlgebraProvider.CreateVector(vector); + using var matrix = vector2.Reshape(_weights.RowCount, 1); + using var projection = _weights.TransposeThisAndMultiply(matrix); + projection.AddInPlace(_bias); + _embeddingIndex.Add(projection.ReadOnlySegment.Contiguous!.ReadOnlySpan); + return Storage.Add(vector); + } + + public IEnumerable Rank(ReadOnlySpan vector) + { + using var vector2 = _weights.LinearAlgebraProvider.CreateVector(vector); + using var matrix = vector2.Reshape(_weights.RowCount, 1); + using var projection = _weights.TransposeThisAndMultiply(matrix); + projection.AddInPlace(_bias); + return _embeddingIndex.Rank(projection.ReadOnlySegment.Contiguous!.ReadOnlySpan); + } + + public uint[] Closest(ReadOnlyMemory[] vector) + { + throw new NotImplementedException(); + } + } +} diff --git a/BrightData/LinearAlgebra/VectorIndexing/VectorSet.cs b/BrightData/LinearAlgebra/VectorIndexing/VectorSet.cs index b68f4cdc..2909fe00 100644 --- a/BrightData/LinearAlgebra/VectorIndexing/VectorSet.cs +++ b/BrightData/LinearAlgebra/VectorIndexing/VectorSet.cs @@ -71,16 +71,21 @@ public static VectorSet CreateHNSW(BrightDataContext context, uint vectorSize /// /// /// - /// /// /// /// - public static VectorSet Create(Func, ISupportKnnSearch> creator, uint vectorSize, DistanceMetric distanceMetric = DistanceMetric.Cosine, VectorStorageType storageType = VectorStorageType.InMemory, uint? capacity = null) + public static VectorSet CreateKnnSearch(uint vectorSize, Func, ISupportKnnSearch> creator, VectorStorageType storageType = VectorStorageType.InMemory, uint? capacity = null) { var storage = GetStorage(storageType, vectorSize, capacity); return new(new KnnSearchVectorIndex(storage, creator)); } + public static VectorSet CreateFromPreCalculatedEmbedding(IMatrix weight, IVector bias, DistanceMetric distanceMetric = DistanceMetric.Cosine, VectorStorageType storageType = VectorStorageType.InMemory, uint? capacity = null) + { + var storage = GetStorage(storageType, weight.RowCount, capacity); + return new(new PreCalculatedEmbeddingVectorIndex(storage, weight, bias, distanceMetric, capacity)); + } + /// /// Creates vector storage ///