Skip to content

Commit

Permalink
sql: fix usage of streamer with multiple column families
Browse files Browse the repository at this point in the history
Previously, we had the following bug in how the streamer was used under
the following conditions:
- we're looking up into a table with at least 3 column families
- not all column families are needed, so we end up creating
"family-specific" spans (either Gets or Scans - the latter is possible
when multiple contiguous families are needed)
- the streamer is used with OutOfOrder mode (which is the case for index
joins when `MaintainOrdering` is `false` and for lookup joins when
`MaintainLookupOrdering` is `false`)
- the index we're looking up into is split across multiple ranges.

In such a scenario we could end up with KVs from different SQL rows
being intertwined because the streamer could execute a separate
BatchRequest for those rows, and in case `TargetBytes` limit estimate
was insufficient, we'd end up creating "resume" batches, at which point
the "results" stream would be incorrect. Later, at the SQL fetcher level
this could either manifest as an internal "non-nullable column with no
value" error or with silent incorrect output (we'd create multiple
output SQL rows from a true single one).

This problem is now fixed by asking the streamer to maintain the
ordering of the requests whenever we have `SplitFamilyIDs` with more
than one family, meaning that we might end up creating family-specific
spans.

Note that the non-streamer code path doesn't suffer from this problem
because there we only parallelize BatchRequests when neither `TargetBytes`
nor `MaxSpanRequestKeys` limits are set, which is the requirement for
having "resume" batches.

Release note (bug fix): CockroachDB previously could incorrectly
evaluate lookup and index joins into tables with at least 3 column
families in some cases (either "non-nullable column with no value"
internal error would occur or the query would return incorrect results),
and this is now fixed. The bug was introduced in 22.2.
  • Loading branch information
yuzefovich committed Oct 25, 2023
1 parent 68ad71d commit a1d0f31
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
18 changes: 12 additions & 6 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ func getIndexJoinBatchSize(

// NewColIndexJoin creates a new ColIndexJoin operator.
//
// If spec.MaintainOrdering is true, then the diskMonitor argument must be
// non-nil.
// If spec.MaintainOrdering is true, or spec.SplitFamilyIDs has more than one
// family, then the diskMonitor argument must be non-nil.
func NewColIndexJoin(
ctx context.Context,
allocator *colmem.Allocator,
Expand Down Expand Up @@ -547,15 +547,21 @@ func NewColIndexJoin(
if streamerBudgetAcc == nil {
return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired")
}
if spec.MaintainOrdering && diskMonitor == nil {
return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained")
}
// Keep 1/16th of the memory limit for the output batch of the cFetcher,
// another 1/16th of the limit for the input tuples buffered by the index
// joiner, and we'll give the remaining memory to the streamer budget
// below.
cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0))
streamerBudgetLimit := 14 * cFetcherMemoryLimit
// When we have SplitFamilyIDs with more than one family ID, then it's
// possible for a single lookup span to be split into multiple "family"
// spans, and in order to preserve the invariant that all KVs for a
// single SQL row are contiguous we must ask the streamer to preserve
// the ordering. See #113013 for an example.
maintainOrdering := spec.MaintainOrdering || len(spec.SplitFamilyIDs) > 1
if maintainOrdering && diskMonitor == nil {
return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained")
}
kvFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Stopper(),
Expand All @@ -566,7 +572,7 @@ func NewColIndexJoin(
spec.LockingDurability,
streamerBudgetLimit,
streamerBudgetAcc,
spec.MaintainOrdering,
maintainOrdering,
true, /* singleRowLookup */
int(spec.FetchSpec.MaxKeysPerRow),
rowcontainer.NewKVStreamerResultDiskBuffer(
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -1546,3 +1546,19 @@ query III
SELECT k3, v3, w3 FROM t108489_1 INNER LOOKUP JOIN t108489_3 ON i3 = k1 AND u3 = 1 WHERE k1 = 1;
----
1 1 1

# Regression test for incorrectly using OutOfOrder mode of the streamer with
# multiple column families (#113013).

statement ok
CREATE TABLE l_113013 (r_id INT, l_id INT, PRIMARY KEY (r_id, l_id), INDEX l_id_idx(l_id));
CREATE TABLE r_113013 (id INT PRIMARY KEY, c1 STRING NOT NULL, c2 STRING NOT NULL, c3 STRING NOT NULL, FAMILY f1 (id, c1), FAMILY f2 (c2), FAMILY f3 (c3));
ALTER TABLE r_113013 SPLIT AT VALUES (2);
INSERT INTO l_113013 VALUES (1, 1), (2, 1);
INSERT INTO r_113013 VALUES (1, 'c1', 'c2', repeat('c3', 2000)), (2, 'c1', 'c2', 'c3');

query II rowsort
SELECT length(c1), length(c3) FROM l_113013 l INNER JOIN r_113013 r ON l.r_id = r.id WHERE l.l_id = 1;
----
2 4000
2 2
49 changes: 34 additions & 15 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ type joinReader struct {
unlimitedMemMonitor *mon.BytesMonitor
budgetAcc mon.BoundAccount
// maintainOrdering indicates whether the ordering of the input stream
// needs to be maintained AND that we rely on the streamer for that.
// We currently only rely on the streamer in two cases:
// 1. We are performing an index join and joinReader.maintainOrdering is
// needs to be maintained AND that we rely on the streamer for that. We
// currently rely on the streamer in the following cases:
// 1. When spec.SplitFamilyIDs has more than one family, for both
// index and lookup joins (this is needed to ensure that all KVs
// for a single row are returned contiguously).
// 2. We are performing an index join and spec.MaintainOrdering is
// true.
// 2. We are performing a lookup join and maintainLookupOrdering is true.
// Except for case (2), we don't rely on the streamer for maintaining
// the ordering for lookup joins due to implementation details (since we
// still buffer all looked up rows and restore the ordering explicitly via
// the joinReaderOrderingStrategy).
// 3. We are performing a lookup join and spec.MaintainLookupOrdering
// is true.
// Note that in case (3), we don't rely on the streamer for maintaining
// the ordering for lookup joins when spec.MaintainOrdering is true due
// to implementation details (since we still buffer all looked up rows
// and restore the ordering explicitly via the
// joinReaderOrderingStrategy).
maintainOrdering bool
diskMonitor *mon.BytesMonitor
txnKVStreamerMemAcc mon.BoundAccount
Expand Down Expand Up @@ -511,13 +516,27 @@ func newJoinReader(
jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.Mon)
jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
// The index joiner can rely on the streamer to maintain the input ordering,
// but the lookup joiner currently handles this logic itself, so the
// streamer can operate in OutOfOrder mode. The exception is when the
// results of each lookup need to be returned in index order - in this case,
// InOrder mode must be used for the streamer.
jr.streamerInfo.maintainOrdering = (jr.maintainOrdering && readerType == indexJoinReaderType) ||
spec.MaintainLookupOrdering
// When we have SplitFamilyIDs with more than one family ID, then it's
// possible for a single lookup span to be split into multiple "family"
// spans, and in order to preserve the invariant that all KVs for a
// single SQL row are contiguous we must ask the streamer to preserve
// the ordering. See #113013 for an example.
jr.streamerInfo.maintainOrdering = len(spec.SplitFamilyIDs) > 1
if readerType == indexJoinReaderType {
if spec.MaintainOrdering {
// The index join can rely on the streamer to maintain the input
// ordering.
jr.streamerInfo.maintainOrdering = true
}
} else {
// Due to implementation details (the join reader strategy restores
// the desired order when spec.MaintainOrdering is set) we only need
// to ask the streamer to maintain ordering if the results of each
// lookup need to be returned in index order.
if spec.MaintainLookupOrdering {
jr.streamerInfo.maintainOrdering = true
}
}

var diskBuffer kvstreamer.ResultDiskBuffer
if jr.streamerInfo.maintainOrdering {
Expand Down

0 comments on commit a1d0f31

Please sign in to comment.