Skip to content

Commit

Permalink
sql: add session variable to force streamer to maintain ordering
Browse files Browse the repository at this point in the history
This commit adds `streamer_always_maintain_ordering` session variable
that - when set to `true` - forces all current usages of the streamer in
SQL layer (lookup and index joins) to ask it to maintain the ordering,
even if this is not stricly necessary by the query. This variable is
introduced as a possible workaround in case we find more scenarios where
we currently are incorrectly using the OutOfOrder mode of the streamer.

Release note: None
  • Loading branch information
yuzefovich committed Oct 25, 2023
1 parent a1d0f31 commit 05dbd92
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,6 @@ func getIndexJoinBatchSize(
}

// NewColIndexJoin creates a new ColIndexJoin operator.
//
// If spec.MaintainOrdering is true, or spec.SplitFamilyIDs has more than one
// family, then the diskMonitor argument must be non-nil.
func NewColIndexJoin(
ctx context.Context,
allocator *colmem.Allocator,
Expand Down Expand Up @@ -559,6 +556,9 @@ func NewColIndexJoin(
// single SQL row are contiguous we must ask the streamer to preserve
// the ordering. See #113013 for an example.
maintainOrdering := spec.MaintainOrdering || len(spec.SplitFamilyIDs) > 1
if flowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering {
maintainOrdering = true
}
if maintainOrdering && diskMonitor == nil {
return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained")
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3636,6 +3636,10 @@ func (m *sessionDataMutator) SetStreamerEnabled(val bool) {
m.data.StreamerEnabled = val
}

func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) {
m.data.StreamerAlwaysMaintainOrdering = val
}

func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) {
m.data.MultipleActivePortalsEnabled = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5593,6 +5593,7 @@ sql_safe_updates off
ssl on
standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
strict_ddl_atomicity off
stub_catalog_tables on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,7 @@ sql_safe_updates off N
ssl on NULL NULL NULL string
standard_conforming_strings on NULL NULL NULL string
statement_timeout 0 NULL NULL NULL string
streamer_always_maintain_ordering off NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
strict_ddl_atomicity off NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
Expand Down Expand Up @@ -3074,6 +3075,7 @@ sql_safe_updates off N
ssl on NULL user NULL on on
standard_conforming_strings on NULL user NULL on on
statement_timeout 0 NULL user NULL 0s 0s
streamer_always_maintain_ordering off NULL user NULL off off
streamer_enabled on NULL user NULL on on
strict_ddl_atomicity off NULL user NULL off off
stub_catalog_tables on NULL user NULL on on
Expand Down Expand Up @@ -3237,6 +3239,7 @@ sql_safe_updates NULL NULL NULL
ssl NULL NULL NULL NULL NULL
standard_conforming_strings NULL NULL NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_always_maintain_ordering NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
strict_ddl_atomicity NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ sql_safe_updates off
ssl on
standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
strict_ddl_atomicity off
stub_catalog_tables on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ func newJoinReader(
jr.streamerInfo.maintainOrdering = true
}
}
if jr.FlowCtx.EvalCtx.SessionData().StreamerAlwaysMaintainOrdering {
jr.streamerInfo.maintainOrdering = true
}

var diskBuffer kvstreamer.ResultDiskBuffer
if jr.streamerInfo.maintainOrdering {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/sessiondatapb/session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ message SessionData {
// that is used for builtins like to_tsvector and to_tsquery if no
// text search configuration is explicitly passed in.
string default_text_search_config = 26;
// StreamerAlwaysMaintainOrdering indicates that the SQL users of the Streamer
// should always ask it to maintain the ordering, even when it might not be
// strictly necessary for the query.
//
// This session variable is introduced as a possible workaround in case we
// have more bugs like #113013.
bool streamer_always_maintain_ordering = 27;
}

// DataConversionConfig contains the parameters that influence the output
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2796,6 +2796,23 @@ var varGen = map[string]sessionVar{
},
},

// CockroachDB extension.
`streamer_always_maintain_ordering`: {
GetStringVal: makePostgresBoolGetStringValFn(`streamer_always_maintain_ordering`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("streamer_always_maintain_ordering", s)
if err != nil {
return err
}
m.SetStreamerAlwaysMaintainOrdering(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().StreamerAlwaysMaintainOrdering), nil
},
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`multiple_active_portals_enabled`: {
GetStringVal: makePostgresBoolGetStringValFn(`multiple_active_portals_enabled`),
Expand Down

0 comments on commit 05dbd92

Please sign in to comment.