Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113019: sql: fix usage of streamer with multiple column families r=yuzefovich a=yuzefovich

Previously, we had the following bug in how the streamer was used under the following conditions:
- we're looking up into a table with at least 3 column families
- not all column families are needed, so we end up creating "family-specific" spans (either Gets or Scans - the latter is possible when multiple contiguous families are needed)
- the streamer is used with OutOfOrder mode (which is the case for index joins when `MaintainOrdering` is `false` and for lookup joins when `MaintainLookupOrdering` is `false`)
- the index we're looking up into is split across multiple ranges.

In such a scenario we could end up with KVs from different SQL rows being intertwined because the streamer could execute a separate BatchRequest for those rows, and in case `TargetBytes` limit estimate was insufficient, we'd end up creating "resume" batches, at which point the "results" stream would be incorrect. Later, at the SQL fetcher level this could either manifest as an internal "non-nullable column with no value" error or with silent incorrect output (we'd create multiple output SQL rows from a true single one).

This problem is now fixed by asking the streamer to maintain the ordering of the requests whenever we have `SplitFamilyIDs` with more than one family, meaning that we might end up creating family-specific spans.

Note that the non-streamer code path doesn't suffer from this problem because there we only parallelize BatchRequests when neither `TargetBytes` nor `MaxSpanRequestKeys` limits are set, which is the requirement for having "resume" batches.

Addresses: cockroachdb#113013.
Epic: None

Release note (bug fix): CockroachDB previously could incorrectly evaluate lookup and index joins into tables with at least 3 column families in some cases (either "non-nullable column with no value" internal error would occur or the query would return incorrect results), and this is now fixed. The bug was introduced in 22.2.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Oct 25, 2023
2 parents 5b91179 + 05dbd92 commit c2ffc96
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 22 deletions.
20 changes: 13 additions & 7 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,6 @@ func getIndexJoinBatchSize(
}

// NewColIndexJoin creates a new ColIndexJoin operator.
//
// If spec.MaintainOrdering is true, then the diskMonitor argument must be
// non-nil.
func NewColIndexJoin(
ctx context.Context,
allocator *colmem.Allocator,
Expand Down Expand Up @@ -547,15 +544,24 @@ func NewColIndexJoin(
if streamerBudgetAcc == nil {
return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired")
}
if spec.MaintainOrdering && diskMonitor == nil {
return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained")
}
// Keep 1/16th of the memory limit for the output batch of the cFetcher,
// another 1/16th of the limit for the input tuples buffered by the index
// joiner, and we'll give the remaining memory to the streamer budget
// below.
cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0))
streamerBudgetLimit := 14 * cFetcherMemoryLimit
// When we have SplitFamilyIDs with more than one family ID, then it's
// possible for a single lookup span to be split into multiple "family"
// spans, and in order to preserve the invariant that all KVs for a
// single SQL row are contiguous we must ask the streamer to preserve
// the ordering. See #113013 for an example.
maintainOrdering := spec.MaintainOrdering || len(spec.SplitFamilyIDs) > 1
if flowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering {
maintainOrdering = true
}
if maintainOrdering && diskMonitor == nil {
return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained")
}
kvFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Stopper(),
Expand All @@ -566,7 +572,7 @@ func NewColIndexJoin(
spec.LockingDurability,
streamerBudgetLimit,
streamerBudgetAcc,
spec.MaintainOrdering,
maintainOrdering,
true, /* singleRowLookup */
int(spec.FetchSpec.MaxKeysPerRow),
rowcontainer.NewKVStreamerResultDiskBuffer(
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3636,6 +3636,10 @@ func (m *sessionDataMutator) SetStreamerEnabled(val bool) {
m.data.StreamerEnabled = val
}

func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) {
m.data.StreamerAlwaysMaintainOrdering = val
}

func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) {
m.data.MultipleActivePortalsEnabled = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5593,6 +5593,7 @@ sql_safe_updates off
ssl on
standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
strict_ddl_atomicity off
stub_catalog_tables on
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -1546,3 +1546,19 @@ query III
SELECT k3, v3, w3 FROM t108489_1 INNER LOOKUP JOIN t108489_3 ON i3 = k1 AND u3 = 1 WHERE k1 = 1;
----
1 1 1

# Regression test for incorrectly using OutOfOrder mode of the streamer with
# multiple column families (#113013).

statement ok
CREATE TABLE l_113013 (r_id INT, l_id INT, PRIMARY KEY (r_id, l_id), INDEX l_id_idx(l_id));
CREATE TABLE r_113013 (id INT PRIMARY KEY, c1 STRING NOT NULL, c2 STRING NOT NULL, c3 STRING NOT NULL, FAMILY f1 (id, c1), FAMILY f2 (c2), FAMILY f3 (c3));
ALTER TABLE r_113013 SPLIT AT VALUES (2);
INSERT INTO l_113013 VALUES (1, 1), (2, 1);
INSERT INTO r_113013 VALUES (1, 'c1', 'c2', repeat('c3', 2000)), (2, 'c1', 'c2', 'c3');

query II rowsort
SELECT length(c1), length(c3) FROM l_113013 l INNER JOIN r_113013 r ON l.r_id = r.id WHERE l.l_id = 1;
----
2 4000
2 2
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,7 @@ sql_safe_updates off N
ssl on NULL NULL NULL string
standard_conforming_strings on NULL NULL NULL string
statement_timeout 0 NULL NULL NULL string
streamer_always_maintain_ordering off NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
strict_ddl_atomicity off NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
Expand Down Expand Up @@ -3074,6 +3075,7 @@ sql_safe_updates off N
ssl on NULL user NULL on on
standard_conforming_strings on NULL user NULL on on
statement_timeout 0 NULL user NULL 0s 0s
streamer_always_maintain_ordering off NULL user NULL off off
streamer_enabled on NULL user NULL on on
strict_ddl_atomicity off NULL user NULL off off
stub_catalog_tables on NULL user NULL on on
Expand Down Expand Up @@ -3237,6 +3239,7 @@ sql_safe_updates NULL NULL NULL
ssl NULL NULL NULL NULL NULL
standard_conforming_strings NULL NULL NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_always_maintain_ordering NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
strict_ddl_atomicity NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ sql_safe_updates off
ssl on
standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
strict_ddl_atomicity off
stub_catalog_tables on
Expand Down
52 changes: 37 additions & 15 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ type joinReader struct {
unlimitedMemMonitor *mon.BytesMonitor
budgetAcc mon.BoundAccount
// maintainOrdering indicates whether the ordering of the input stream
// needs to be maintained AND that we rely on the streamer for that.
// We currently only rely on the streamer in two cases:
// 1. We are performing an index join and joinReader.maintainOrdering is
// needs to be maintained AND that we rely on the streamer for that. We
// currently rely on the streamer in the following cases:
// 1. When spec.SplitFamilyIDs has more than one family, for both
// index and lookup joins (this is needed to ensure that all KVs
// for a single row are returned contiguously).
// 2. We are performing an index join and spec.MaintainOrdering is
// true.
// 2. We are performing a lookup join and maintainLookupOrdering is true.
// Except for case (2), we don't rely on the streamer for maintaining
// the ordering for lookup joins due to implementation details (since we
// still buffer all looked up rows and restore the ordering explicitly via
// the joinReaderOrderingStrategy).
// 3. We are performing a lookup join and spec.MaintainLookupOrdering
// is true.
// Note that in case (3), we don't rely on the streamer for maintaining
// the ordering for lookup joins when spec.MaintainOrdering is true due
// to implementation details (since we still buffer all looked up rows
// and restore the ordering explicitly via the
// joinReaderOrderingStrategy).
maintainOrdering bool
diskMonitor *mon.BytesMonitor
txnKVStreamerMemAcc mon.BoundAccount
Expand Down Expand Up @@ -511,13 +516,30 @@ func newJoinReader(
jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.Mon)
jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
// The index joiner can rely on the streamer to maintain the input ordering,
// but the lookup joiner currently handles this logic itself, so the
// streamer can operate in OutOfOrder mode. The exception is when the
// results of each lookup need to be returned in index order - in this case,
// InOrder mode must be used for the streamer.
jr.streamerInfo.maintainOrdering = (jr.maintainOrdering && readerType == indexJoinReaderType) ||
spec.MaintainLookupOrdering
// When we have SplitFamilyIDs with more than one family ID, then it's
// possible for a single lookup span to be split into multiple "family"
// spans, and in order to preserve the invariant that all KVs for a
// single SQL row are contiguous we must ask the streamer to preserve
// the ordering. See #113013 for an example.
jr.streamerInfo.maintainOrdering = len(spec.SplitFamilyIDs) > 1
if readerType == indexJoinReaderType {
if spec.MaintainOrdering {
// The index join can rely on the streamer to maintain the input
// ordering.
jr.streamerInfo.maintainOrdering = true
}
} else {
// Due to implementation details (the join reader strategy restores
// the desired order when spec.MaintainOrdering is set) we only need
// to ask the streamer to maintain ordering if the results of each
// lookup need to be returned in index order.
if spec.MaintainLookupOrdering {
jr.streamerInfo.maintainOrdering = true
}
}
if jr.FlowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering {
jr.streamerInfo.maintainOrdering = true
}

var diskBuffer kvstreamer.ResultDiskBuffer
if jr.streamerInfo.maintainOrdering {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/sessiondatapb/session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ message SessionData {
// that is used for builtins like to_tsvector and to_tsquery if no
// text search configuration is explicitly passed in.
string default_text_search_config = 26;
// StreamerAlwaysMaintainOrdering indicates that the SQL users of the Streamer
// should always ask it to maintain the ordering, even when it might not be
// strictly necessary for the query.
//
// This session variable is introduced as a possible workaround in case we
// have more bugs like #113013.
bool streamer_always_maintain_ordering = 27;
}

// DataConversionConfig contains the parameters that influence the output
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2796,6 +2796,23 @@ var varGen = map[string]sessionVar{
},
},

// CockroachDB extension.
`streamer_always_maintain_ordering`: {
GetStringVal: makePostgresBoolGetStringValFn(`streamer_always_maintain_ordering`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("streamer_always_maintain_ordering", s)
if err != nil {
return err
}
m.SetStreamerAlwaysMaintainOrdering(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().StreamerAlwaysMaintainOrdering), nil
},
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`multiple_active_portals_enabled`: {
GetStringVal: makePostgresBoolGetStringValFn(`multiple_active_portals_enabled`),
Expand Down

0 comments on commit c2ffc96

Please sign in to comment.