Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126306: colexec: use streaming mem account in columnarizer and materializer r=yuzefovich a=yuzefovich

Previously, in order to account for the memory usage of the metadata we required an allocator that wasn't shared with any other user. This was an overkill on two fronts:
- we can easily track precisely how much memory was reserved for the metadata (meaning that we can use the "streaming" allocator)
- we don't actually need the allocator object in many cases since we don't allocate any vectors or batches.

This commit takes advantage of these observations and refactors the relevant code to work on the streaming memory account instead. In particular, columnarizers, materializers, and parallel unordered syncs will now use the streaming memory account, which reduces the number of allocations we make.

Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 27, 2024
2 parents 9da1ebf + d2614c9 commit d2a5af4
Show file tree
Hide file tree
Showing 19 changed files with 101 additions and 87 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_library(
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/metamorphic",
"//pkg/util/mon",
"//pkg/util/stringarena",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd", # keep
Expand Down
15 changes: 6 additions & 9 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func checkNumIn(inputs []colexecargs.OpWithMetaInfo, numIn int) error {
func wrapRowSources(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
inputs []colexecargs.OpWithMetaInfo,
inputTypes [][]*types.T,
monitorRegistry *colexecargs.MonitorRegistry,
processorID int32,
newToWrap func([]execinfra.RowSource) (execinfra.RowSource, error),
factory coldata.ColumnFactory,
Expand All @@ -88,10 +88,8 @@ func wrapRowSources(
c.MarkAsRemovedFromFlow()
toWrapInputs = append(toWrapInputs, c.Input())
} else {
// We need to create a separate memory account for the materializer.
materializerMemAcc := monitorRegistry.NewStreamingMemAccount(flowCtx)
toWrapInput := colexec.NewMaterializer(
colmem.NewAllocator(ctx, materializerMemAcc, factory),
getStreamingMemAccount(args, flowCtx),
flowCtx,
processorID,
inputs[i],
Expand All @@ -117,13 +115,12 @@ func wrapRowSources(
if !isProcessor {
return nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap)
}
batchAllocator := colmem.NewAllocator(ctx, monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
metadataAllocator := colmem.NewAllocator(ctx, monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
batchAllocator := colmem.NewAllocator(ctx, args.MonitorRegistry.NewStreamingMemAccount(flowCtx), factory)
var c *colexec.Columnarizer
if proc.MustBeStreaming() {
c = colexec.NewStreamingColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, toWrap)
c = colexec.NewStreamingColumnarizer(batchAllocator, getStreamingMemAccount(args, flowCtx), flowCtx, processorID, toWrap)
} else {
c = colexec.NewBufferingColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, toWrap)
c = colexec.NewBufferingColumnarizer(batchAllocator, getStreamingMemAccount(args, flowCtx), flowCtx, processorID, toWrap)
}
return c, nil
}
Expand Down Expand Up @@ -530,9 +527,9 @@ func (r opResult) createAndWrapRowSource(
c, err := wrapRowSources(
ctx,
flowCtx,
args,
inputs,
inputTypes,
args.MonitorRegistry,
processorID,
func(inputs []execinfra.RowSource) (execinfra.RowSource, error) {
// We provide a slice with a single nil as 'outputs' parameter
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
defer r.TestCleanupNoError(t)

m := colexec.NewMaterializer(
nil, /* allocator */
nil, /* streamingMemAcc */
flowCtx,
0, /* processorID */
r.OpWithMetaInfo,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, parallelUnorderedSynchronizerInputs, &wg)
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1)
})
Expand Down
17 changes: 14 additions & 3 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
package colexecutils

import (
"context"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -321,14 +324,22 @@ func init() {
}
}

// AccountForMetadata registers the memory footprint of meta with the allocator.
func AccountForMetadata(allocator *colmem.Allocator, meta []execinfrapb.ProducerMetadata) {
// AccountForMetadata registers the memory footprint of meta with the memory
// account and returns the total new memory usage.
func AccountForMetadata(
ctx context.Context, memAcc *mon.BoundAccount, meta []execinfrapb.ProducerMetadata,
) int64 {
var newMemUsage int64
for i := range meta {
// Perform the memory accounting for the LeafTxnFinalState metadata
// since it might be of non-trivial size.
if ltfs := meta[i].LeafTxnFinalState; ltfs != nil {
memUsage := roachpb.Spans(ltfs.RefreshSpans).MemUsageUpToLen()
allocator.AdjustMemoryUsageAfterAllocation(memUsage)
if err := memAcc.Grow(ctx, memUsage); err != nil {
colexecerror.InternalError(err)
}
newMemUsage += memUsage
}
}
return newMemUsage
}
47 changes: 26 additions & 21 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -57,12 +58,15 @@ type Columnarizer struct {
execinfra.ProcessorBaseNoHelper
colexecop.NonExplainable

mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
metadataAllocator *colmem.Allocator
input execinfra.RowSource
da tree.DatumAlloc
mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
streamingMemAcc *mon.BoundAccount
// metadataAccountedFor tracks how much memory has been reserved in the
// streamingMemAcc for the metadata.
metadataAccountedFor int64
input execinfra.RowSource
da tree.DatumAlloc
// getWrappedExecStats, if non-nil, is the function to get the execution
// statistics of the wrapped row-by-row processor. We store it separately
// from execinfra.ProcessorBaseNoHelper.ExecStatsForTrace so that the
Expand All @@ -87,16 +91,16 @@ var _ execreleasable.Releasable = &Columnarizer{}

// NewBufferingColumnarizer returns a new Columnarizer that will be buffering up
// rows before emitting them as output batches.
// - batchAllocator and metadataAllocator must use memory accounts that are not
// shared with any other user.
// - batchAllocator must use the memory account that is not shared with any
// other user.
func NewBufferingColumnarizer(
batchAllocator *colmem.Allocator,
metadataAllocator *colmem.Allocator,
streamingMemAcc *mon.BoundAccount,
flowCtx *execinfra.FlowCtx,
processorID int32,
input execinfra.RowSource,
) *Columnarizer {
return newColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, input, columnarizerBufferingMode)
return newColumnarizer(batchAllocator, streamingMemAcc, flowCtx, processorID, input, columnarizerBufferingMode)
}

// NewBufferingColumnarizerForTests is a convenience wrapper around
Expand All @@ -108,21 +112,21 @@ func NewBufferingColumnarizerForTests(
processorID int32,
input execinfra.RowSource,
) *Columnarizer {
return NewBufferingColumnarizer(allocator, allocator, flowCtx, processorID, input)
return NewBufferingColumnarizer(allocator, allocator.Acc(), flowCtx, processorID, input)
}

// NewStreamingColumnarizer returns a new Columnarizer that emits every input
// row as a separate batch.
// - batchAllocator and metadataAllocator must use memory accounts that are not
// shared with any other user.
// - batchAllocator must use the memory account that is not shared with any
// other user.
func NewStreamingColumnarizer(
batchAllocator *colmem.Allocator,
metadataAllocator *colmem.Allocator,
streamingMemAcc *mon.BoundAccount,
flowCtx *execinfra.FlowCtx,
processorID int32,
input execinfra.RowSource,
) *Columnarizer {
return newColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, input, columnarizerStreamingMode)
return newColumnarizer(batchAllocator, streamingMemAcc, flowCtx, processorID, input, columnarizerStreamingMode)
}

var columnarizerPool = sync.Pool{
Expand All @@ -134,7 +138,7 @@ var columnarizerPool = sync.Pool{
// newColumnarizer returns a new Columnarizer.
func newColumnarizer(
batchAllocator *colmem.Allocator,
metadataAllocator *colmem.Allocator,
streamingMemAcc *mon.BoundAccount,
flowCtx *execinfra.FlowCtx,
processorID int32,
input execinfra.RowSource,
Expand All @@ -148,7 +152,7 @@ func newColumnarizer(
c := columnarizerPool.Get().(*Columnarizer)
*c = Columnarizer{
ProcessorBaseNoHelper: c.ProcessorBaseNoHelper,
metadataAllocator: metadataAllocator,
streamingMemAcc: streamingMemAcc,
input: input,
mode: mode,
}
Expand Down Expand Up @@ -239,7 +243,7 @@ func (c *Columnarizer) Next() coldata.Batch {
colexecerror.ExpectedError(meta.Err)
}
c.accumulatedMeta = append(c.accumulatedMeta, *meta)
colexecutils.AccountForMetadata(c.metadataAllocator, c.accumulatedMeta[len(c.accumulatedMeta)-1:])
c.metadataAccountedFor += colexecutils.AccountForMetadata(c.Ctx(), c.streamingMemAcc, c.accumulatedMeta[len(c.accumulatedMeta)-1:])
continue
}
if row == nil {
Expand All @@ -266,9 +270,10 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// Eagerly lose the reference to the metadata since it might be of
// non-trivial footprint.
c.accumulatedMeta = nil
// When this method returns, we no longer will have the reference to the
// metadata, so we can release all memory from the metadata allocator.
defer c.metadataAllocator.ReleaseAll()
defer func() {
c.streamingMemAcc.Shrink(c.Ctx(), c.metadataAccountedFor)
c.metadataAccountedFor = 0
}()
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
Expand Down
27 changes: 14 additions & 13 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -71,8 +71,9 @@ type drainHelper struct {
// are noops.
ctx context.Context

// allocator can be nil in tests.
allocator *colmem.Allocator
// streamingMemAcc can be nil in tests.
streamingMemAcc *mon.BoundAccount
metadataAccountedFor int64

statsCollectors []colexecop.VectorizedStatsCollector
sources colexecop.MetadataSources
Expand Down Expand Up @@ -118,17 +119,18 @@ func (d *drainHelper) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata)
if !d.drained {
d.meta = d.sources.DrainMeta()
d.drained = true
if d.allocator != nil {
colexecutils.AccountForMetadata(d.allocator, d.meta)
if d.streamingMemAcc != nil {
d.metadataAccountedFor = colexecutils.AccountForMetadata(d.ctx, d.streamingMemAcc, d.meta)
}
}
if len(d.meta) == 0 {
// Eagerly lose the reference to the slice.
d.meta = nil
if d.allocator != nil {
// At this point, the caller took over all of the metadata, so we
// can release all of the allocations.
d.allocator.ReleaseAll()
if d.streamingMemAcc != nil {
// At this point, the caller took over the metadata, so we can
// release the allocations.
d.streamingMemAcc.Shrink(d.ctx, d.metadataAccountedFor)
d.metadataAccountedFor = 0
}
return nil, nil
}
Expand All @@ -152,13 +154,12 @@ var materializerPool = sync.Pool{
// NewMaterializer creates a new Materializer processor which processes the
// columnar data coming from input to return it as rows.
// Arguments:
// - allocator must use a memory account that is not shared with any other user,
// can be nil in tests.
// - streamingMemAcc can be nil in tests.
// - typs is the output types schema. Typs are assumed to have been hydrated.
// NOTE: the constructor does *not* take in an execinfrapb.PostProcessSpec
// because we expect input to handle that for us.
func NewMaterializer(
allocator *colmem.Allocator,
streamingMemAcc *mon.BoundAccount,
flowCtx *execinfra.FlowCtx,
processorID int32,
input colexecargs.OpWithMetaInfo,
Expand All @@ -179,7 +180,7 @@ func NewMaterializer(
} else {
m.row = make(rowenc.EncDatumRow, len(typs))
}
m.drainHelper.allocator = allocator
m.drainHelper.streamingMemAcc = streamingMemAcc
m.drainHelper.statsCollectors = input.StatsCollectors
m.drainHelper.sources = input.MetadataSources

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestColumnarizeMaterialize(t *testing.T) {
c := NewBufferingColumnarizerForTests(testAllocator, flowCtx, 0, input)

m := NewMaterializer(
nil, /* allocator */
nil, /* streamingMemAcc */
flowCtx,
1, /* processorID */
colexecargs.OpWithMetaInfo{Root: c},
Expand Down Expand Up @@ -174,7 +174,7 @@ func BenchmarkMaterializer(b *testing.B) {
b.SetBytes(int64(nRows * nCols * int(memsize.Int64)))
for i := 0; i < b.N; i++ {
m := NewMaterializer(
nil, /* allocator */
nil, /* streamingMemAcc */
flowCtx,
0, /* processorID */
colexecargs.OpWithMetaInfo{Root: input},
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
}

m := NewMaterializer(
nil, /* allocator */
nil, /* streamingMemAcc */
flowCtx,
0, /* processorID */
colexecargs.OpWithMetaInfo{
Expand Down Expand Up @@ -269,7 +269,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
b.SetBytes(int64(nRows * nCols * int(memsize.Int64)))
for i := 0; i < b.N; i++ {
m := NewMaterializer(
nil, /* allocator */
nil, /* streamingMemAcc */
flowCtx,
1, /* processorID */
colexecargs.OpWithMetaInfo{Root: c},
Expand Down
Loading

0 comments on commit d2a5af4

Please sign in to comment.