diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index eda146cc4cc0..4e601ed90e9d 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -498,9 +498,6 @@ func getIndexJoinBatchSize( } // NewColIndexJoin creates a new ColIndexJoin operator. -// -// If spec.MaintainOrdering is true, then the diskMonitor argument must be -// non-nil. func NewColIndexJoin( ctx context.Context, allocator *colmem.Allocator, @@ -547,15 +544,24 @@ func NewColIndexJoin( if streamerBudgetAcc == nil { return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") } - if spec.MaintainOrdering && diskMonitor == nil { - return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") - } // Keep 1/16th of the memory limit for the output batch of the cFetcher, // another 1/16th of the limit for the input tuples buffered by the index // joiner, and we'll give the remaining memory to the streamer budget // below. cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0)) streamerBudgetLimit := 14 * cFetcherMemoryLimit + // When we have SplitFamilyIDs with more than one family ID, then it's + // possible for a single lookup span to be split into multiple "family" + // spans, and in order to preserve the invariant that all KVs for a + // single SQL row are contiguous we must ask the streamer to preserve + // the ordering. See #113013 for an example. + maintainOrdering := spec.MaintainOrdering || len(spec.SplitFamilyIDs) > 1 + if flowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering { + maintainOrdering = true + } + if maintainOrdering && diskMonitor == nil { + return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") + } kvFetcher = row.NewStreamingKVFetcher( flowCtx.Cfg.DistSender, flowCtx.Stopper(), @@ -566,7 +572,7 @@ func NewColIndexJoin( spec.LockingDurability, streamerBudgetLimit, streamerBudgetAcc, - spec.MaintainOrdering, + maintainOrdering, true, /* singleRowLookup */ int(spec.FetchSpec.MaxKeysPerRow), rowcontainer.NewKVStreamerResultDiskBuffer( diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 1871fb45c726..a9b5aa6d6cc8 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3636,6 +3636,10 @@ func (m *sessionDataMutator) SetStreamerEnabled(val bool) { m.data.StreamerEnabled = val } +func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) { + m.data.StreamerAlwaysMaintainOrdering = val +} + func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) { m.data.MultipleActivePortalsEnabled = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index b20942a5f973..4f9d44400c8b 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5593,6 +5593,7 @@ sql_safe_updates off ssl on standard_conforming_strings on statement_timeout 0 +streamer_always_maintain_ordering off streamer_enabled on strict_ddl_atomicity off stub_catalog_tables on diff --git a/pkg/sql/logictest/testdata/logic_test/lookup_join b/pkg/sql/logictest/testdata/logic_test/lookup_join index c729aa317870..d1e342b475f8 100644 --- a/pkg/sql/logictest/testdata/logic_test/lookup_join +++ b/pkg/sql/logictest/testdata/logic_test/lookup_join @@ -1546,3 +1546,19 @@ query III SELECT k3, v3, w3 FROM t108489_1 INNER LOOKUP JOIN t108489_3 ON i3 = k1 AND u3 = 1 WHERE k1 = 1; ---- 1 1 1 + +# Regression test for incorrectly using OutOfOrder mode of the streamer with +# multiple column families (#113013). + +statement ok +CREATE TABLE l_113013 (r_id INT, l_id INT, PRIMARY KEY (r_id, l_id), INDEX l_id_idx(l_id)); +CREATE TABLE r_113013 (id INT PRIMARY KEY, c1 STRING NOT NULL, c2 STRING NOT NULL, c3 STRING NOT NULL, FAMILY f1 (id, c1), FAMILY f2 (c2), FAMILY f3 (c3)); +ALTER TABLE r_113013 SPLIT AT VALUES (2); +INSERT INTO l_113013 VALUES (1, 1), (2, 1); +INSERT INTO r_113013 VALUES (1, 'c1', 'c2', repeat('c3', 2000)), (2, 'c1', 'c2', 'c3'); + +query II rowsort +SELECT length(c1), length(c3) FROM l_113013 l INNER JOIN r_113013 r ON l.r_id = r.id WHERE l.l_id = 1; +---- +2 4000 +2 2 diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 0db766da7213..3b37e09e8b0b 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2911,6 +2911,7 @@ sql_safe_updates off N ssl on NULL NULL NULL string standard_conforming_strings on NULL NULL NULL string statement_timeout 0 NULL NULL NULL string +streamer_always_maintain_ordering off NULL NULL NULL string streamer_enabled on NULL NULL NULL string strict_ddl_atomicity off NULL NULL NULL string stub_catalog_tables on NULL NULL NULL string @@ -3074,6 +3075,7 @@ sql_safe_updates off N ssl on NULL user NULL on on standard_conforming_strings on NULL user NULL on on statement_timeout 0 NULL user NULL 0s 0s +streamer_always_maintain_ordering off NULL user NULL off off streamer_enabled on NULL user NULL on on strict_ddl_atomicity off NULL user NULL off off stub_catalog_tables on NULL user NULL on on @@ -3237,6 +3239,7 @@ sql_safe_updates NULL NULL NULL ssl NULL NULL NULL NULL NULL standard_conforming_strings NULL NULL NULL NULL NULL statement_timeout NULL NULL NULL NULL NULL +streamer_always_maintain_ordering NULL NULL NULL NULL NULL streamer_enabled NULL NULL NULL NULL NULL strict_ddl_atomicity NULL NULL NULL NULL NULL stub_catalog_tables NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 71c772796b1e..38480c7ac58c 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -149,6 +149,7 @@ sql_safe_updates off ssl on standard_conforming_strings on statement_timeout 0 +streamer_always_maintain_ordering off streamer_enabled on strict_ddl_atomicity off stub_catalog_tables on diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 2a79282e6393..3d8a05e0ab84 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -134,15 +134,20 @@ type joinReader struct { unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount // maintainOrdering indicates whether the ordering of the input stream - // needs to be maintained AND that we rely on the streamer for that. - // We currently only rely on the streamer in two cases: - // 1. We are performing an index join and joinReader.maintainOrdering is + // needs to be maintained AND that we rely on the streamer for that. We + // currently rely on the streamer in the following cases: + // 1. When spec.SplitFamilyIDs has more than one family, for both + // index and lookup joins (this is needed to ensure that all KVs + // for a single row are returned contiguously). + // 2. We are performing an index join and spec.MaintainOrdering is // true. - // 2. We are performing a lookup join and maintainLookupOrdering is true. - // Except for case (2), we don't rely on the streamer for maintaining - // the ordering for lookup joins due to implementation details (since we - // still buffer all looked up rows and restore the ordering explicitly via - // the joinReaderOrderingStrategy). + // 3. We are performing a lookup join and spec.MaintainLookupOrdering + // is true. + // Note that in case (3), we don't rely on the streamer for maintaining + // the ordering for lookup joins when spec.MaintainOrdering is true due + // to implementation details (since we still buffer all looked up rows + // and restore the ordering explicitly via the + // joinReaderOrderingStrategy). maintainOrdering bool diskMonitor *mon.BytesMonitor txnKVStreamerMemAcc mon.BoundAccount @@ -511,13 +516,30 @@ func newJoinReader( jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.Mon) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - // The index joiner can rely on the streamer to maintain the input ordering, - // but the lookup joiner currently handles this logic itself, so the - // streamer can operate in OutOfOrder mode. The exception is when the - // results of each lookup need to be returned in index order - in this case, - // InOrder mode must be used for the streamer. - jr.streamerInfo.maintainOrdering = (jr.maintainOrdering && readerType == indexJoinReaderType) || - spec.MaintainLookupOrdering + // When we have SplitFamilyIDs with more than one family ID, then it's + // possible for a single lookup span to be split into multiple "family" + // spans, and in order to preserve the invariant that all KVs for a + // single SQL row are contiguous we must ask the streamer to preserve + // the ordering. See #113013 for an example. + jr.streamerInfo.maintainOrdering = len(spec.SplitFamilyIDs) > 1 + if readerType == indexJoinReaderType { + if spec.MaintainOrdering { + // The index join can rely on the streamer to maintain the input + // ordering. + jr.streamerInfo.maintainOrdering = true + } + } else { + // Due to implementation details (the join reader strategy restores + // the desired order when spec.MaintainOrdering is set) we only need + // to ask the streamer to maintain ordering if the results of each + // lookup need to be returned in index order. + if spec.MaintainLookupOrdering { + jr.streamerInfo.maintainOrdering = true + } + } + if jr.FlowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering { + jr.streamerInfo.maintainOrdering = true + } var diskBuffer kvstreamer.ResultDiskBuffer if jr.streamerInfo.maintainOrdering { diff --git a/pkg/sql/sessiondatapb/session_data.proto b/pkg/sql/sessiondatapb/session_data.proto index 47656cb7f36b..328581799a48 100644 --- a/pkg/sql/sessiondatapb/session_data.proto +++ b/pkg/sql/sessiondatapb/session_data.proto @@ -112,6 +112,13 @@ message SessionData { // that is used for builtins like to_tsvector and to_tsquery if no // text search configuration is explicitly passed in. string default_text_search_config = 26; + // StreamerAlwaysMaintainOrdering indicates that the SQL users of the Streamer + // should always ask it to maintain the ordering, even when it might not be + // strictly necessary for the query. + // + // This session variable is introduced as a possible workaround in case we + // have more bugs like #113013. + bool streamer_always_maintain_ordering = 27; } // DataConversionConfig contains the parameters that influence the output diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 10df42358ecb..02622515e655 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2796,6 +2796,23 @@ var varGen = map[string]sessionVar{ }, }, + // CockroachDB extension. + `streamer_always_maintain_ordering`: { + GetStringVal: makePostgresBoolGetStringValFn(`streamer_always_maintain_ordering`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("streamer_always_maintain_ordering", s) + if err != nil { + return err + } + m.SetStreamerAlwaysMaintainOrdering(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().StreamerAlwaysMaintainOrdering), nil + }, + GlobalDefault: globalFalse, + }, + // CockroachDB extension. `multiple_active_portals_enabled`: { GetStringVal: makePostgresBoolGetStringValFn(`multiple_active_portals_enabled`),