From 0d01f6a5fe0b127eca9565af0dcbbb324494e3a3 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 21 Feb 2024 11:46:23 -0500 Subject: [PATCH 1/3] opt: fix formatting of cascadeBuilder doc comment Release note: None --- pkg/sql/opt/exec/execbuilder/cascades.go | 56 ++++++++++++------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/cascades.go b/pkg/sql/opt/exec/execbuilder/cascades.go index b9b74a03e572..d172dcc8169b 100644 --- a/pkg/sql/opt/exec/execbuilder/cascades.go +++ b/pkg/sql/opt/exec/execbuilder/cascades.go @@ -71,39 +71,39 @@ import ( // processed (in a queue). At this time the PlanFn method is called and the // following happens: // -// 1. We set up a new empty memo and add metadata for the columns of the -// buffer node (binding &1). +// 1: We set up a new empty memo and add metadata for the columns of the +// buffer node (binding &1). // -// 2. We invoke the memo.CascadeBuilder to optbuild the cascading query. At this -// point, the new memo will contain the following expression: +// 2: We invoke the memo.CascadeBuilder to optbuild the cascading query. At +// this point, the new memo will contain the following expression: // -// delete child -// ├── columns: -// ├── fetch columns: c:4 child.p:5 -// └── semi-join (hash) -// ├── columns: c:4!null child.p:5!null -// ├── scan child -// │ └── columns: c:4!null child.p:5!null -// ├── with-scan &1 -// │ ├── columns: p:6!null -// │ └── mapping: -// │ └── parent.p:1 => p:6 -// └── filters -// └── child.p:5 = p:6 +// delete child +// ├── columns: +// ├── fetch columns: c:4 child.p:5 +// └── semi-join (hash) +// ├── columns: c:4!null child.p:5!null +// ├── scan child +// │ └── columns: c:4!null child.p:5!null +// ├── with-scan &1 +// │ ├── columns: p:6!null +// │ └── mapping: +// │ └── parent.p:1 => p:6 +// └── filters +// └── child.p:5 = p:6 // -// Notes: -// - normally, a WithScan can only refer to an ancestor mutation or With -// operator. In this case we are creating a reference "out of the void". -// This works just fine; we can consider adding a special dummy root -// operator but so far it hasn't been necessary; -// - the binding &1 column ID has changed: it used to be 2, it is now 1. -// This is because we are starting with a fresh memo. We need to take into -// account this remapping when referring to the foreign key columns. +// Notes: +// - normally, a WithScan can only refer to an ancestor mutation or With +// operator. In this case we are creating a reference "out of the void". +// This works just fine; we can consider adding a special dummy root +// operator but so far it hasn't been necessary; +// - the binding &1 column ID has changed: it used to be 2, it is now 1. +// This is because we are starting with a fresh memo. We need to take into +// account this remapping when referring to the foreign key columns. // -// 3. We optimize the newly built expression. +// 3: We optimize the newly built expression. // -// 4. We execbuild the optimizer expression. We have to be careful to set up -// the "With" reference before starting. +// 4: We execbuild the optimizer expression. We have to be careful to set up +// the "With" reference before starting. // // After PlanFn is called, the resulting plan is executed. Note that this plan // could itself have more exec.Cascades; these are queued and handled in the From 78c81b2d02247ad32891b6e58e05cc653b5c3af3 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 4 Mar 2024 10:56:07 -0500 Subject: [PATCH 2/3] rangefeedbuffer: parameterize by generic Event type This commit replaces the rangefeedbuffer package's use of Event interface objects with a generic type parameter. In doing so, it avoids having users of the package (e.g. spanconfigkvsubscriber) need to type assert the Event interface objects back to the specific event type they are interested in and know that their TranslateEventFunc has decoded events into. Epic: None Release note: None --- .../span_config_event_stream.go | 17 +++---- .../rangefeed/rangefeedbuffer/buffer.go | 26 +++++----- .../rangefeed/rangefeedbuffer/buffer_test.go | 18 +++---- .../kvclient/rangefeed/rangefeedbuffer/kvs.go | 5 +- .../rangefeed/rangefeedbuffer/kvs_test.go | 4 +- .../rangefeedcache/cache_impl_test.go | 10 ++-- .../rangefeed/rangefeedcache/watcher.go | 48 ++++++++++--------- .../rangefeed/rangefeedcache/watcher_test.go | 7 ++- .../tenantcapabilitieswatcher/watcher.go | 14 +++--- .../settingswatcher/settings_watcher.go | 20 ++++---- pkg/server/systemconfigwatcher/cache.go | 12 +++-- pkg/server/tenantsettingswatcher/watcher.go | 12 ++--- .../spanconfigkvsubscriber/kvsubscriber.go | 16 +++---- .../spanconfigdecoder.go | 7 ++- pkg/spanconfig/spanconfigsqlwatcher/buffer.go | 8 ++-- 15 files changed, 116 insertions(+), 108 deletions(-) diff --git a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go index 3d8cf1c5864a..71b2a5049a52 100644 --- a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go @@ -40,10 +40,10 @@ type spanConfigEventStream struct { data tree.Datums // Data to send to the consumer // Fields below initialized when Start called. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*spanconfigkvsubscriber.BufferEvent] streamGroup ctxgroup.Group // Context group controlling stream execution. doneChan chan struct{} // Channel signaled to close the stream loop. - updateCh chan rangefeedcache.Update + updateCh chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent] errCh chan error // Signaled when error occurs in rangefeed. streamCh chan tree.Datums // Channel signaled to forward datums to consumer. sp *tracing.Span // Span representing the lifetime of the eventStream. @@ -74,7 +74,7 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error { s.errCh = make(chan error, 1) // updateCh gets buffered RangeFeedEvents and is consumed by the ValueGenerator. - s.updateCh = make(chan rangefeedcache.Update) + s.updateCh = make(chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent]) // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) @@ -178,7 +178,9 @@ func (s *spanConfigEventStream) Close(ctx context.Context) { s.sp.Finish() } -func (s *spanConfigEventStream) handleUpdate(ctx context.Context, update rangefeedcache.Update) { +func (s *spanConfigEventStream) handleUpdate( + ctx context.Context, update rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent], +) { select { case <-ctx.Done(): log.Warningf(ctx, "rangefeedcache context cancelled with error %s", ctx.Err()) @@ -233,8 +235,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { batcher.spanConfigFrontier = hlc.MinTimestamp } for _, ev := range update.Events { - spcfgEvent := ev.(*spanconfigkvsubscriber.BufferEvent) - target := spcfgEvent.Update.GetTarget() + target := ev.Update.GetTarget() if target.IsSystemTarget() { // We skip replicating SystemTarget Span configs as they are not // necessary to replicate. System target span configurations are @@ -259,9 +260,9 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { streamedSpanCfgEntry := streampb.StreamedSpanConfigEntry{ SpanConfig: roachpb.SpanConfigEntry{ Target: target.ToProto(), - Config: spcfgEvent.Update.GetConfig(), + Config: ev.Update.GetConfig(), }, - Timestamp: spcfgEvent.Timestamp(), + Timestamp: ev.Timestamp(), FromFullScan: fromFullScan, } bufferedEvents = append(bufferedEvents, streamedSpanCfgEntry) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go index cdac92e28a34..73fb3f8814a7 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -24,7 +24,7 @@ import ( // events than the limit the buffer is configured with. var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded") -// Event is the unit of what can be added to the buffer. +// Event is a contract for the unit of what can be added to the buffer. type Event interface { Timestamp() hlc.Timestamp } @@ -33,25 +33,25 @@ type Event interface { // accumulates raw events which can then be flushed out in timestamp sorted // order en-masse whenever the rangefeed frontier is bumped. If we accumulate // more events than the limit allows for, we error out to the caller. -type Buffer struct { +type Buffer[E Event] struct { mu struct { syncutil.Mutex - events + events[E] frontier hlc.Timestamp limit int } } // New constructs a Buffer with the provided limit. -func New(limit int) *Buffer { - b := &Buffer{} +func New[E Event](limit int) *Buffer[E] { + b := &Buffer[E]{} b.mu.limit = limit return b } // Add adds the given entry to the buffer. -func (b *Buffer) Add(ev Event) error { +func (b *Buffer[E]) Add(ev E) error { b.mu.Lock() defer b.mu.Unlock() @@ -73,7 +73,7 @@ func (b *Buffer) Add(ev Event) error { // less than or equal to the provided frontier timestamp. The timestamp is // recorded (expected to monotonically increase), and future events with // timestamps less than or equal to it are discarded. -func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) { +func (b *Buffer[E]) Flush(ctx context.Context, frontier hlc.Timestamp) (events []E) { b.mu.Lock() defer b.mu.Unlock() @@ -97,17 +97,17 @@ func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Ev // SetLimit is used to limit the number of events the buffer internally tracks. // If already in excess of the limit, future additions will error out (until the // buffer is Flush()-ed at least). -func (b *Buffer) SetLimit(limit int) { +func (b *Buffer[E]) SetLimit(limit int) { b.mu.Lock() defer b.mu.Unlock() b.mu.limit = limit } -type events []Event +type events[E Event] []E -var _ sort.Interface = (*events)(nil) +var _ sort.Interface = (*events[Event])(nil) -func (es *events) Len() int { return len(*es) } -func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) } -func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } +func (es *events[E]) Len() int { return len(*es) } +func (es *events[E]) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) } +func (es *events[E]) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go index ae125c442cc3..2b088ac6dd7a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -30,7 +30,7 @@ func TestBuffer(t *testing.T) { ctx := context.Background() const limit = 25 - buffer := rangefeedbuffer.New(limit) + buffer := rangefeedbuffer.New[*testEvent](limit) { // Sanity check the newly initialized rangefeed buffer. events := buffer.Flush(ctx, ts(0)) @@ -62,16 +62,16 @@ func TestBuffer(t *testing.T) { events := buffer.Flush(ctx, ts(14)) require.True(t, len(events) == 3) - require.Equal(t, events[0].(*testEvent).data, "b") // b@11 - require.Equal(t, events[1].(*testEvent).data, "d") // d@12 - require.Equal(t, events[2].(*testEvent).data, "a") // a@13 + require.Equal(t, events[0].data, "b") // b@11 + require.Equal(t, events[1].data, "d") // d@12 + require.Equal(t, events[2].data, "a") // a@13 } { // Incremental advances should only surface the events until the given timestamp. events := buffer.Flush(ctx, ts(15)) require.True(t, len(events) == 1) - require.Equal(t, events[0].(*testEvent).data, "c") // c@15 + require.Equal(t, events[0].data, "c") // c@15 } { // Adding events with timestamps <= the last flush are discarded. @@ -90,14 +90,14 @@ func TestBuffer(t *testing.T) { events := buffer.Flush(ctx, ts(20)) require.True(t, len(events) == 2) - require.Equal(t, events[0].(*testEvent).data, "e") // e@18 - require.Equal(t, events[1].(*testEvent).data, "f") // f@19 + require.Equal(t, events[0].data, "e") // e@18 + require.Equal(t, events[1].data, "f") // f@19 } { // Ensure that a timestamp greater than any previous event flushes everything. events := buffer.Flush(ctx, ts(100)) require.True(t, len(events) == 1) - require.Equal(t, events[0].(*testEvent).data, "g") // g@21 + require.Equal(t, events[0].data, "g") // g@21 } { // Sanity check that there are no events left over. @@ -138,6 +138,6 @@ func (t *testEvent) Timestamp() hlc.Timestamp { var _ rangefeedbuffer.Event = &testEvent{} -func makeEvent(data string, ts hlc.Timestamp) rangefeedbuffer.Event { +func makeEvent(data string, ts hlc.Timestamp) *testEvent { return &testEvent{data: data, ts: ts} } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go index b18ac18e270f..247ebd187d68 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go @@ -19,13 +19,12 @@ import ( // RangeFeedValueEventToKV is a function to type assert an Event into a // *kvpb.RangeFeedValue and then convert it to a roachpb.KeyValue. -func RangeFeedValueEventToKV(event Event) roachpb.KeyValue { - rfv := event.(*kvpb.RangeFeedValue) +func RangeFeedValueEventToKV(rfv *kvpb.RangeFeedValue) roachpb.KeyValue { return roachpb.KeyValue{Key: rfv.Key, Value: rfv.Value} } // EventsToKVs converts a slice of Events to a slice of KeyValue pairs. -func EventsToKVs(events []Event, f func(ev Event) roachpb.KeyValue) []roachpb.KeyValue { +func EventsToKVs[E Event](events []E, f func(ev E) roachpb.KeyValue) []roachpb.KeyValue { kvs := make([]roachpb.KeyValue, 0, len(events)) for _, ev := range events { kvs = append(kvs, f(ev)) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go index db1af09e4ed9..2d59b1d4c869 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go @@ -58,8 +58,8 @@ func TestMergeKVs(t *testing.T) { } return kvs } - toBuffer := func(t *testing.T, rows []row) *Buffer { - buf := New(len(rows)) + toBuffer := func(t *testing.T, rows []row) *Buffer[*kvpb.RangeFeedValue] { + buf := New[*kvpb.RangeFeedValue](len(rows)) for _, r := range rows { require.NoError(t, buf.Add(toRangeFeedEvent(r))) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go index b4338c0331cb..95a977dfd839 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go @@ -27,7 +27,7 @@ import ( // cache provides a consistent snapshot when available, but the snapshot // may be stale. type Cache struct { - w *Watcher + w *Watcher[*kvpb.RangeFeedValue] mu struct { syncutil.RWMutex @@ -75,7 +75,7 @@ func (c *Cache) GetSnapshot() ([]roachpb.KeyValue, hlc.Timestamp, bool) { return c.mu.data, c.mu.timestamp, true } -func (c *Cache) handleUpdate(ctx context.Context, update Update) { +func (c *Cache) handleUpdate(ctx context.Context, update Update[*kvpb.RangeFeedValue]) { updateKVs := rangefeedbuffer.EventsToKVs(update.Events, rangefeedbuffer.RangeFeedValueEventToKV) var updatedData []roachpb.KeyValue @@ -96,6 +96,8 @@ func (c *Cache) handleUpdate(ctx context.Context, update Update) { c.mu.timestamp = update.Timestamp } -func passThroughTranslation(ctx context.Context, value *kvpb.RangeFeedValue) rangefeedbuffer.Event { - return value +func passThroughTranslation( + ctx context.Context, value *kvpb.RangeFeedValue, +) (*kvpb.RangeFeedValue, bool) { + return value, value != nil } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go index 17c41112d2a7..983162dc1b7a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go @@ -63,7 +63,7 @@ import ( // Users seeking to leverage the Updates which arrive with that delay but also // react to the row-level events as they arrive can hijack the translateEvent // function to trigger some non-blocking action. -type Watcher struct { +type Watcher[E rangefeedbuffer.Event] struct { name redact.SafeString clock *hlc.Clock rangefeedFactory *rangefeed.Factory @@ -74,8 +74,8 @@ type Watcher struct { started int32 // accessed atomically - translateEvent TranslateEventFunc - onUpdate OnUpdateFunc + translateEvent TranslateEventFunc[E] + onUpdate OnUpdateFunc[E] lastFrontierTS hlc.Timestamp // used to assert monotonicity across rangefeed attempts @@ -107,20 +107,20 @@ func (u UpdateType) String() string { } // TranslateEventFunc is used by the client to translate a low-level event -// into an event for buffering. If nil is returned, the event is skipped. -type TranslateEventFunc func( +// into an event for buffering. If false is returned, the event is skipped. +type TranslateEventFunc[E rangefeedbuffer.Event] func( context.Context, *kvpb.RangeFeedValue, -) rangefeedbuffer.Event +) (_ E, ok bool) // OnUpdateFunc is used by the client to receive an Update, which is a batch // of events which represent either a CompleteUpdate or an IncrementalUpdate. -type OnUpdateFunc func(context.Context, Update) +type OnUpdateFunc[E rangefeedbuffer.Event] func(context.Context, Update[E]) // Update corresponds to a set of events derived from the underlying RangeFeed. -type Update struct { +type Update[E rangefeedbuffer.Event] struct { Type UpdateType Timestamp hlc.Timestamp - Events []rangefeedbuffer.Event + Events []E } // TestingKnobs allows tests to inject behavior into the Watcher. @@ -162,7 +162,7 @@ var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) // Callers can control whether the values passed to translateEvent carry a // populated PrevValue using the withPrevValue parameter. See // rangefeed.WithDiff for more details. -func NewWatcher( +func NewWatcher[E rangefeedbuffer.Event]( name redact.SafeString, clock *hlc.Clock, rangeFeedFactory *rangefeed.Factory, @@ -170,11 +170,11 @@ func NewWatcher( spans []roachpb.Span, withPrevValue bool, withRowTSInInitialScan bool, - translateEvent TranslateEventFunc, - onUpdate OnUpdateFunc, + translateEvent TranslateEventFunc[E], + onUpdate OnUpdateFunc[E], knobs *TestingKnobs, -) *Watcher { - w := &Watcher{ +) *Watcher[E] { + w := &Watcher[E]{ name: name, clock: clock, rangefeedFactory: rangeFeedFactory, @@ -194,7 +194,9 @@ func NewWatcher( // Start calls Run on the Watcher as an async task with exponential backoff. // If the Watcher ran for what is deemed long enough, the backoff is reset. -func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func(error)) error { +func Start[E rangefeedbuffer.Event]( + ctx context.Context, stopper *stop.Stopper, c *Watcher[E], onError func(error), +) error { return stopper.RunAsyncTask(ctx, string(c.name), func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -231,7 +233,7 @@ func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func( // This is a blocking operation, returning only when the context is canceled, // or when an error occurs. For the latter, it's expected that callers will // re-run the watcher. -func (s *Watcher) Run(ctx context.Context) error { +func (s *Watcher[E]) Run(ctx context.Context) error { if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { log.Fatal(ctx, "currently started: only allowed once at any point in time") } @@ -252,7 +254,7 @@ func (s *Watcher) Run(ctx context.Context) error { // transparently query the backing table if the record requested is not found. // We could also have the initial scan operate in chunks, handing off results // to the caller incrementally, all within the "initial scan" phase. - buffer := rangefeedbuffer.New(math.MaxInt) + buffer := rangefeedbuffer.New[E](math.MaxInt) frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error) mu := struct { // serializes access between the rangefeed and the main thread here syncutil.Mutex @@ -266,8 +268,8 @@ func (s *Watcher) Run(ctx context.Context) error { }() onValue := func(ctx context.Context, ev *kvpb.RangeFeedValue) { - bEv := s.translateEvent(ctx, ev) - if bEv == nil { + bEv, ok := s.translateEvent(ctx, ev) + if !ok { return } @@ -375,14 +377,14 @@ var restartErr = errors.New("testing restart requested") // it to restart. This is separate from the testing knob so that we // can force a restart from test infrastructure without overriding the // user-provided testing knobs. -func (s *Watcher) TestingRestart() { +func (s *Watcher[E]) TestingRestart() { s.restartErrCh <- restartErr } -func (s *Watcher) handleUpdate( - ctx context.Context, buffer *rangefeedbuffer.Buffer, ts hlc.Timestamp, updateType UpdateType, +func (s *Watcher[E]) handleUpdate( + ctx context.Context, buffer *rangefeedbuffer.Buffer[E], ts hlc.Timestamp, updateType UpdateType, ) { - s.onUpdate(ctx, Update{ + s.onUpdate(ctx, Update[E]{ Type: updateType, Timestamp: ts, Events: buffer.Flush(ctx, ts), diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go index c1f6906e5300..29ce7ccbcf62 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -56,11 +55,11 @@ func TestWatchAuthErr(t *testing.T) { []roachpb.Span{hostScratchSpan}, false, /* withPrevValue */ true, /* withRowTSInInitialScan */ - func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + func(ctx context.Context, kv *kvpb.RangeFeedValue) (*kvpb.RangeFeedValue, bool) { t.Fatalf("rangefeed should fail before producing results") - return nil + return nil, false }, - func(ctx context.Context, update rangefeedcache.Update) { + func(ctx context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue]) { t.Fatalf("rangefeed should fail before producing results") }, &rangefeedcache.TestingKnobs{}) diff --git a/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go b/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go index 2342f92a32a0..ce254c17fdcb 100644 --- a/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go +++ b/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go @@ -65,7 +65,7 @@ type Watcher struct { // rfc provides access to the underlying // rangefeedcache.Watcher for testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[rangefeedbuffer.Event] // initialScan is used to synchronize the Start() method with the // reception of the initial batch of values from the rangefeed @@ -282,7 +282,9 @@ func (w *Watcher) onError(err error) { } } -func (w *Watcher) handleRangefeedCacheEvent(ctx context.Context, u rangefeedcache.Update) { +func (w *Watcher) handleRangefeedCacheEvent( + ctx context.Context, u rangefeedcache.Update[rangefeedbuffer.Event], +) { switch u.Type { case rangefeedcache.CompleteUpdate: log.Info(ctx, "received results of a full table scan for tenant capabilities") @@ -335,10 +337,10 @@ func (w *Watcher) onAnyChangeLocked() { // callback. func (w *Watcher) handleIncrementalUpdate( ctx context.Context, ev *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (rangefeedbuffer.Event, bool) { hasEvent, update := w.decoder.translateEvent(ctx, ev) if !hasEvent { - return nil + return nil, false } if fn := w.knobs.WatcherUpdatesInterceptor; fn != nil { @@ -352,7 +354,7 @@ func (w *Watcher) handleIncrementalUpdate( if prevTs, ok := w.mu.lastUpdate[tid]; ok && ts.Less(prevTs) { // Skip updates which have an earlier timestamp to avoid // regressing on the contents of an entry. - return nil + return nil, false } w.mu.lastUpdate[tid] = ts @@ -376,7 +378,7 @@ func (w *Watcher) handleIncrementalUpdate( } } w.onAnyChangeLocked() - return nil + return nil, false } func (w *Watcher) removeEntriesBeforeTimestamp(ctx context.Context, ts hlc.Timestamp) { diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index db2b1a16d34b..fa428ed31f1e 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -93,7 +93,7 @@ type SettingsWatcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*kvpb.RangeFeedValue] } // Storage is used to write a snapshot of KVs out to disk for use upon restart. @@ -182,7 +182,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }{ ch: make(chan struct{}), } - noteUpdate := func(update rangefeedcache.Update) { + noteUpdate := func(update rangefeedcache.Update[*kvpb.RangeFeedValue]) { if update.Type != rangefeedcache.CompleteUpdate { return } @@ -250,10 +250,10 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { []roachpb.Span{settingsTableSpan}, false, // withPrevValue true, // withRowTSInInitialScan - func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + func(ctx context.Context, kv *kvpb.RangeFeedValue) (*kvpb.RangeFeedValue, bool) { return s.handleKV(ctx, kv) }, - func(ctx context.Context, update rangefeedcache.Update) { + func(ctx context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue]) { noteUpdate(update) }, s.testingWatcherKnobs, @@ -340,7 +340,7 @@ func (s *SettingsWatcher) TestingRestart() { func (s *SettingsWatcher) handleKV( ctx context.Context, kv *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (*kvpb.RangeFeedValue, bool) { rkv := roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, @@ -352,19 +352,19 @@ func (s *SettingsWatcher) handleKV( // This should never happen: the rangefeed should only ever deliver valid SQL rows. err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key) logcrash.ReportOrPanic(ctx, &s.settings.SV, "%w", err) - return nil + return nil, false } settingKey := settings.InternalKey(settingKeyS) setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant()) if !ok { log.Warningf(ctx, "unknown setting %s, skipping update", settingKey) - return nil + return nil, false } if !s.codec.ForSystemTenant() { if setting.Class() != settings.ApplicationLevel { log.Warningf(ctx, "ignoring read-only setting %s", settingKey) - return nil + return nil, false } } @@ -389,9 +389,9 @@ func (s *SettingsWatcher) handleKV( tombstone: tombstone, }, setting.Class()) if s.storage != nil { - return kv + return kv, true } - return nil + return nil, false } // maybeSet will update the stored value and the corresponding setting diff --git a/pkg/server/systemconfigwatcher/cache.go b/pkg/server/systemconfigwatcher/cache.go index 7f01e78306f0..71aca8494494 100644 --- a/pkg/server/systemconfigwatcher/cache.go +++ b/pkg/server/systemconfigwatcher/cache.go @@ -31,7 +31,7 @@ import ( // cache provides a consistent snapshot when available, but the snapshot // may be stale. type Cache struct { - w *rangefeedcache.Watcher + w *rangefeedcache.Watcher[*kvpb.RangeFeedValue] defaultZoneConfig *zonepb.ZoneConfig additionalKVsSource config.SystemConfigProvider mu struct { @@ -222,7 +222,9 @@ func (k keyValues) Less(i, j int) bool { return k[i].Key.Compare(k[j].Key) < 0 } var _ sort.Interface = (keyValues)(nil) -func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { +func (c *Cache) handleUpdate( + _ context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue], +) { updateKVs := rangefeedbuffer.EventsToKVs(update.Events, rangefeedbuffer.RangeFeedValueEventToKV) c.mu.Lock() @@ -256,8 +258,10 @@ func (c *Cache) setUpdatedConfigLocked(updated *config.SystemConfig) { } } -func passThroughTranslation(ctx context.Context, value *kvpb.RangeFeedValue) rangefeedbuffer.Event { - return value +func passThroughTranslation( + ctx context.Context, value *kvpb.RangeFeedValue, +) (*kvpb.RangeFeedValue, bool) { + return value, value != nil } var _ config.SystemConfigProvider = (*Cache)(nil) diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index d5ee4643b3fe..a44ccda8a0a0 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -64,7 +64,7 @@ type Watcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[rangefeedbuffer.Event] mu struct { syncutil.Mutex // Used by TestingRestart. @@ -134,30 +134,30 @@ func (w *Watcher) startRangeFeed( allOverrides := make(map[roachpb.TenantID][]kvpb.TenantSetting) - translateEvent := func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + translateEvent := func(ctx context.Context, kv *kvpb.RangeFeedValue) (rangefeedbuffer.Event, bool) { tenantID, setting, tombstone, err := w.dec.DecodeRow(roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, }) if err != nil { log.Warningf(ctx, "failed to decode settings row %v: %v", kv.Key, err) - return nil + return nil, false } if allOverrides != nil { // We are in the process of doing a full table scan if tombstone { log.Warning(ctx, "unexpected empty value during rangefeed scan") - return nil + return nil, false } allOverrides[tenantID] = append(allOverrides[tenantID], setting) } else { // We are processing incremental changes. w.store.setTenantOverride(ctx, tenantID, setting) } - return nil + return nil, false } - onUpdate := func(ctx context.Context, update rangefeedcache.Update) { + onUpdate := func(ctx context.Context, update rangefeedcache.Update[rangefeedbuffer.Event]) { if update.Type == rangefeedcache.CompleteUpdate { // The CompleteUpdate indicates that the table scan is complete. // Henceforth, all calls to translateEvent will be incremental changes, diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 7f458cf8aff9..c2a82146d2b0 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -123,7 +123,7 @@ type KVSubscriber struct { knobs *spanconfig.TestingKnobs settings *cluster.Settings - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*BufferEvent] mu struct { // serializes between Start and external threads syncutil.RWMutex @@ -401,7 +401,7 @@ func (s *KVSubscriber) GetProtectionTimestamps( return protectionTimestamps, s.mu.lastUpdated, nil } -func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update) { +func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update[*BufferEvent]) { switch u.Type { case rangefeedcache.CompleteUpdate: s.handleCompleteUpdate(ctx, u.Timestamp, u.Events) @@ -411,11 +411,11 @@ func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update } func (s *KVSubscriber) handleCompleteUpdate( - ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event, + ctx context.Context, ts hlc.Timestamp, events []*BufferEvent, ) { freshStore := spanconfigstore.New(s.fallback, s.settings, s.boundsReader, s.knobs) for _, ev := range events { - freshStore.Apply(ctx, false /* dryrun */, ev.(*BufferEvent).Update) + freshStore.Apply(ctx, false /* dryrun */, ev.Update) } handlers := func() []handler { s.mu.Lock() @@ -438,7 +438,7 @@ func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) { } func (s *KVSubscriber) handlePartialUpdate( - ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event, + ctx context.Context, ts hlc.Timestamp, events []*BufferEvent, ) { // The events we've received from the rangefeed buffer are sorted in // increasing timestamp order. However, any updates with the same timestamp @@ -456,7 +456,7 @@ func (s *KVSubscriber) handlePartialUpdate( case 1: // ts(i) > ts(j) return false case 0: // ts(i) == ts(j); deletions sort before additions - return events[i].(*BufferEvent).Deletion() // no need to worry about the sort being stable + return events[i].Deletion() // no need to worry about the sort being stable default: panic("unexpected") } @@ -469,7 +469,7 @@ func (s *KVSubscriber) handlePartialUpdate( // atomically, the updates need to be non-overlapping. That's not the case // here because we can have deletion events followed by additions for // overlapping spans. - s.mu.internal.Apply(ctx, false /* dryrun */, ev.(*BufferEvent).Update) + s.mu.internal.Apply(ctx, false /* dryrun */, ev.Update) } s.setLastUpdatedLocked(ts) return s.mu.handlers @@ -478,7 +478,7 @@ func (s *KVSubscriber) handlePartialUpdate( for i := range handlers { handler := &handlers[i] // mutated by invoke for _, ev := range events { - target := ev.(*BufferEvent).Update.GetTarget() + target := ev.Update.GetTarget() handler.invoke(ctx, target.KeyspaceTargeted()) } } diff --git a/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go b/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go index 23160c024449..eb9f68360f5e 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -92,7 +91,7 @@ func (sd *SpanConfigDecoder) decode(kv roachpb.KeyValue) (spanconfig.Record, err func (sd *SpanConfigDecoder) TranslateEvent( ctx context.Context, ev *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (*BufferEvent, bool) { deleted := !ev.Value.IsPresent() var value roachpb.Value if deleted { @@ -100,7 +99,7 @@ func (sd *SpanConfigDecoder) TranslateEvent( // It's possible to write a KV tombstone on top of another KV // tombstone -- both the new and old value will be empty. We simply // ignore these events. - return nil + return nil, false } // Since the end key is not part of the primary key, we need to @@ -132,7 +131,7 @@ func (sd *SpanConfigDecoder) TranslateEvent( update = spanconfig.Update(record) } - return &BufferEvent{update, ev.Value.Timestamp} + return &BufferEvent{update, ev.Value.Timestamp}, true } // TestingDecoderFn exports the decoding routine for testing purposes. diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go index 92faf7eb3a14..05edf20a3eb3 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go @@ -34,7 +34,7 @@ type buffer struct { syncutil.Mutex // rangefeed.Buffer stores spanconfigsqlwatcher.Events. - buffer *rangefeedbuffer.Buffer + buffer *rangefeedbuffer.Buffer[event] // rangefeedFrontiers tracks the frontier timestamps of individual // rangefeeds established by the SQLWatcher. @@ -75,7 +75,7 @@ const ( // newBuffer constructs a new buffer initialized with a starting frontier // timestamp. func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer { - rangefeedBuffer := rangefeedbuffer.New(limit) + rangefeedBuffer := rangefeedbuffer.New[event](limit) eventBuffer := &buffer{} eventBuffer.mu.buffer = rangefeedBuffer for i := range eventBuffer.mu.rangefeedFrontiers { @@ -167,7 +167,7 @@ var _ sort.Interface = &events{} // returns a list of relevant events which were buffered up to that timestamp. func (b *buffer) flushEvents( ctx context.Context, -) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) { +) (updates []event, combinedFrontierTS hlc.Timestamp) { b.mu.Lock() defer b.mu.Unlock() // First we determine the checkpoint timestamp, which is the minimum @@ -196,7 +196,7 @@ func (b *buffer) flush( ev, prevEv) } } - evs = append(evs, ev.(event)) + evs = append(evs, ev) } // Nil out the underlying slice since we have copied over the events. bufferedEvents = nil From 20ac603eea20ea2646b822f7c878998afd2aef11 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 4 Mar 2024 12:04:45 -0600 Subject: [PATCH 3/3] kvserver: unskip `TestBackpressureNotApplied...` under remote exec There should be enough RAM now. Epic: CRDB-8308 Release note: None --- pkg/kv/kvserver/client_replica_backpressure_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/kv/kvserver/client_replica_backpressure_test.go b/pkg/kv/kvserver/client_replica_backpressure_test.go index 2f99f2d9373f..699998de26db 100644 --- a/pkg/kv/kvserver/client_replica_backpressure_test.go +++ b/pkg/kv/kvserver/client_replica_backpressure_test.go @@ -42,7 +42,6 @@ func TestBackpressureNotAppliedWhenReducingRangeSize(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes >1m under race") - skip.UnderRemoteExecutionWithIssue(t, 113032, "probable OOM") rRand, _ := randutil.NewTestRand() ctx := context.Background()