From a1d0f3169d5122ca7687a79eddf93cf1349a7e8e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 24 Oct 2023 18:05:58 -0700 Subject: [PATCH] sql: fix usage of streamer with multiple column families Previously, we had the following bug in how the streamer was used under the following conditions: - we're looking up into a table with at least 3 column families - not all column families are needed, so we end up creating "family-specific" spans (either Gets or Scans - the latter is possible when multiple contiguous families are needed) - the streamer is used with OutOfOrder mode (which is the case for index joins when `MaintainOrdering` is `false` and for lookup joins when `MaintainLookupOrdering` is `false`) - the index we're looking up into is split across multiple ranges. In such a scenario we could end up with KVs from different SQL rows being intertwined because the streamer could execute a separate BatchRequest for those rows, and in case `TargetBytes` limit estimate was insufficient, we'd end up creating "resume" batches, at which point the "results" stream would be incorrect. Later, at the SQL fetcher level this could either manifest as an internal "non-nullable column with no value" error or with silent incorrect output (we'd create multiple output SQL rows from a true single one). This problem is now fixed by asking the streamer to maintain the ordering of the requests whenever we have `SplitFamilyIDs` with more than one family, meaning that we might end up creating family-specific spans. Note that the non-streamer code path doesn't suffer from this problem because there we only parallelize BatchRequests when neither `TargetBytes` nor `MaxSpanRequestKeys` limits are set, which is the requirement for having "resume" batches. Release note (bug fix): CockroachDB previously could incorrectly evaluate lookup and index joins into tables with at least 3 column families in some cases (either "non-nullable column with no value" internal error would occur or the query would return incorrect results), and this is now fixed. The bug was introduced in 22.2. --- pkg/sql/colfetcher/index_join.go | 18 ++++--- .../logictest/testdata/logic_test/lookup_join | 16 ++++++ pkg/sql/rowexec/joinreader.go | 49 +++++++++++++------ 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index eda146cc4cc0..32d5540ecf66 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -499,8 +499,8 @@ func getIndexJoinBatchSize( // NewColIndexJoin creates a new ColIndexJoin operator. // -// If spec.MaintainOrdering is true, then the diskMonitor argument must be -// non-nil. +// If spec.MaintainOrdering is true, or spec.SplitFamilyIDs has more than one +// family, then the diskMonitor argument must be non-nil. func NewColIndexJoin( ctx context.Context, allocator *colmem.Allocator, @@ -547,15 +547,21 @@ func NewColIndexJoin( if streamerBudgetAcc == nil { return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") } - if spec.MaintainOrdering && diskMonitor == nil { - return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") - } // Keep 1/16th of the memory limit for the output batch of the cFetcher, // another 1/16th of the limit for the input tuples buffered by the index // joiner, and we'll give the remaining memory to the streamer budget // below. cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0)) streamerBudgetLimit := 14 * cFetcherMemoryLimit + // When we have SplitFamilyIDs with more than one family ID, then it's + // possible for a single lookup span to be split into multiple "family" + // spans, and in order to preserve the invariant that all KVs for a + // single SQL row are contiguous we must ask the streamer to preserve + // the ordering. See #113013 for an example. + maintainOrdering := spec.MaintainOrdering || len(spec.SplitFamilyIDs) > 1 + if maintainOrdering && diskMonitor == nil { + return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") + } kvFetcher = row.NewStreamingKVFetcher( flowCtx.Cfg.DistSender, flowCtx.Stopper(), @@ -566,7 +572,7 @@ func NewColIndexJoin( spec.LockingDurability, streamerBudgetLimit, streamerBudgetAcc, - spec.MaintainOrdering, + maintainOrdering, true, /* singleRowLookup */ int(spec.FetchSpec.MaxKeysPerRow), rowcontainer.NewKVStreamerResultDiskBuffer( diff --git a/pkg/sql/logictest/testdata/logic_test/lookup_join b/pkg/sql/logictest/testdata/logic_test/lookup_join index c729aa317870..d1e342b475f8 100644 --- a/pkg/sql/logictest/testdata/logic_test/lookup_join +++ b/pkg/sql/logictest/testdata/logic_test/lookup_join @@ -1546,3 +1546,19 @@ query III SELECT k3, v3, w3 FROM t108489_1 INNER LOOKUP JOIN t108489_3 ON i3 = k1 AND u3 = 1 WHERE k1 = 1; ---- 1 1 1 + +# Regression test for incorrectly using OutOfOrder mode of the streamer with +# multiple column families (#113013). + +statement ok +CREATE TABLE l_113013 (r_id INT, l_id INT, PRIMARY KEY (r_id, l_id), INDEX l_id_idx(l_id)); +CREATE TABLE r_113013 (id INT PRIMARY KEY, c1 STRING NOT NULL, c2 STRING NOT NULL, c3 STRING NOT NULL, FAMILY f1 (id, c1), FAMILY f2 (c2), FAMILY f3 (c3)); +ALTER TABLE r_113013 SPLIT AT VALUES (2); +INSERT INTO l_113013 VALUES (1, 1), (2, 1); +INSERT INTO r_113013 VALUES (1, 'c1', 'c2', repeat('c3', 2000)), (2, 'c1', 'c2', 'c3'); + +query II rowsort +SELECT length(c1), length(c3) FROM l_113013 l INNER JOIN r_113013 r ON l.r_id = r.id WHERE l.l_id = 1; +---- +2 4000 +2 2 diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 2a79282e6393..2821acb66a50 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -134,15 +134,20 @@ type joinReader struct { unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount // maintainOrdering indicates whether the ordering of the input stream - // needs to be maintained AND that we rely on the streamer for that. - // We currently only rely on the streamer in two cases: - // 1. We are performing an index join and joinReader.maintainOrdering is + // needs to be maintained AND that we rely on the streamer for that. We + // currently rely on the streamer in the following cases: + // 1. When spec.SplitFamilyIDs has more than one family, for both + // index and lookup joins (this is needed to ensure that all KVs + // for a single row are returned contiguously). + // 2. We are performing an index join and spec.MaintainOrdering is // true. - // 2. We are performing a lookup join and maintainLookupOrdering is true. - // Except for case (2), we don't rely on the streamer for maintaining - // the ordering for lookup joins due to implementation details (since we - // still buffer all looked up rows and restore the ordering explicitly via - // the joinReaderOrderingStrategy). + // 3. We are performing a lookup join and spec.MaintainLookupOrdering + // is true. + // Note that in case (3), we don't rely on the streamer for maintaining + // the ordering for lookup joins when spec.MaintainOrdering is true due + // to implementation details (since we still buffer all looked up rows + // and restore the ordering explicitly via the + // joinReaderOrderingStrategy). maintainOrdering bool diskMonitor *mon.BytesMonitor txnKVStreamerMemAcc mon.BoundAccount @@ -511,13 +516,27 @@ func newJoinReader( jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.Mon) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - // The index joiner can rely on the streamer to maintain the input ordering, - // but the lookup joiner currently handles this logic itself, so the - // streamer can operate in OutOfOrder mode. The exception is when the - // results of each lookup need to be returned in index order - in this case, - // InOrder mode must be used for the streamer. - jr.streamerInfo.maintainOrdering = (jr.maintainOrdering && readerType == indexJoinReaderType) || - spec.MaintainLookupOrdering + // When we have SplitFamilyIDs with more than one family ID, then it's + // possible for a single lookup span to be split into multiple "family" + // spans, and in order to preserve the invariant that all KVs for a + // single SQL row are contiguous we must ask the streamer to preserve + // the ordering. See #113013 for an example. + jr.streamerInfo.maintainOrdering = len(spec.SplitFamilyIDs) > 1 + if readerType == indexJoinReaderType { + if spec.MaintainOrdering { + // The index join can rely on the streamer to maintain the input + // ordering. + jr.streamerInfo.maintainOrdering = true + } + } else { + // Due to implementation details (the join reader strategy restores + // the desired order when spec.MaintainOrdering is set) we only need + // to ask the streamer to maintain ordering if the results of each + // lookup need to be returned in index order. + if spec.MaintainLookupOrdering { + jr.streamerInfo.maintainOrdering = true + } + } var diskBuffer kvstreamer.ResultDiskBuffer if jr.streamerInfo.maintainOrdering {