From 4989dbf37540a1185e09dd56d83ee1a627f440e6 Mon Sep 17 00:00:00 2001 From: Oleksii Zuiev Date: Fri, 22 Jan 2021 15:09:00 +0200 Subject: [PATCH] Separate queries of stream row and events in read operation, as combined query may result in slow paging. --- Source/Streamstone/Stream.Operations.cs | 43 +++++++++++++++---------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/Source/Streamstone/Stream.Operations.cs b/Source/Streamstone/Stream.Operations.cs index 150b32c..8e8e146 100644 --- a/Source/Streamstone/Stream.Operations.cs +++ b/Source/Streamstone/Stream.Operations.cs @@ -383,21 +383,23 @@ public ReadOperation(Partition partition, int startVersion, int sliceSize) table = partition.Table; } - public async Task> ExecuteAsync(Func transform) => - Result(await ExecuteQueryAsync(PrepareQuery()), transform); - - StreamSlice Result(ICollection entities, Func transform) + public async Task> ExecuteAsync(Func transform) { - var streamEntity = FindStreamEntity(entities); - entities.Remove(streamEntity); + var eventsQuery = ExecuteQueryAsync(EventsQuery()); + var streamRowQuery = ExecuteQueryAsync(StreamRowQuery()); + await Task.WhenAll(eventsQuery, streamRowQuery); + return Result(await eventsQuery, FindStreamEntity(await streamRowQuery), transform); + } + StreamSlice Result(ICollection entities, DynamicTableEntity streamEntity, Func transform) + { var stream = BuildStream(streamEntity); var events = BuildEvents(entities, transform); return new StreamSlice(stream, events, startVersion, sliceSize); } - TableQuery PrepareQuery() + TableQuery EventsQuery() { var rowKeyStart = partition.EventVersionRowKey(startVersion); var rowKeyEnd = partition.EventVersionRowKey(startVersion + sliceSize - 1); @@ -405,16 +407,23 @@ TableQuery PrepareQuery() var filter = TableQuery.CombineFilters( TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.PartitionKey), QueryComparisons.Equal, partition.PartitionKey), TableOperators.And, - TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.Equal, partition.StreamRowKey()), - TableOperators.Or, - TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.GreaterThanOrEqual, rowKeyStart), - TableOperators.And, - TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.LessThanOrEqual, rowKeyEnd) - ) - ) - ); + TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.GreaterThanOrEqual, rowKeyStart), + TableOperators.And, + TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.LessThanOrEqual, rowKeyEnd) + ) + ); + + return new TableQuery().Where(filter); + } + + TableQuery StreamRowQuery() + { + var filter = TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.PartitionKey), QueryComparisons.Equal, partition.PartitionKey), + TableOperators.And, + TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.Equal, partition.StreamRowKey()) + ); return new TableQuery().Where(filter); }