diff --git a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go index 3d8cf1c5864a..71b2a5049a52 100644 --- a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go @@ -40,10 +40,10 @@ type spanConfigEventStream struct { data tree.Datums // Data to send to the consumer // Fields below initialized when Start called. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*spanconfigkvsubscriber.BufferEvent] streamGroup ctxgroup.Group // Context group controlling stream execution. doneChan chan struct{} // Channel signaled to close the stream loop. - updateCh chan rangefeedcache.Update + updateCh chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent] errCh chan error // Signaled when error occurs in rangefeed. streamCh chan tree.Datums // Channel signaled to forward datums to consumer. sp *tracing.Span // Span representing the lifetime of the eventStream. @@ -74,7 +74,7 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error { s.errCh = make(chan error, 1) // updateCh gets buffered RangeFeedEvents and is consumed by the ValueGenerator. - s.updateCh = make(chan rangefeedcache.Update) + s.updateCh = make(chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent]) // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) @@ -178,7 +178,9 @@ func (s *spanConfigEventStream) Close(ctx context.Context) { s.sp.Finish() } -func (s *spanConfigEventStream) handleUpdate(ctx context.Context, update rangefeedcache.Update) { +func (s *spanConfigEventStream) handleUpdate( + ctx context.Context, update rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent], +) { select { case <-ctx.Done(): log.Warningf(ctx, "rangefeedcache context cancelled with error %s", ctx.Err()) @@ -233,8 +235,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { batcher.spanConfigFrontier = hlc.MinTimestamp } for _, ev := range update.Events { - spcfgEvent := ev.(*spanconfigkvsubscriber.BufferEvent) - target := spcfgEvent.Update.GetTarget() + target := ev.Update.GetTarget() if target.IsSystemTarget() { // We skip replicating SystemTarget Span configs as they are not // necessary to replicate. System target span configurations are @@ -259,9 +260,9 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { streamedSpanCfgEntry := streampb.StreamedSpanConfigEntry{ SpanConfig: roachpb.SpanConfigEntry{ Target: target.ToProto(), - Config: spcfgEvent.Update.GetConfig(), + Config: ev.Update.GetConfig(), }, - Timestamp: spcfgEvent.Timestamp(), + Timestamp: ev.Timestamp(), FromFullScan: fromFullScan, } bufferedEvents = append(bufferedEvents, streamedSpanCfgEntry) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go index cdac92e28a34..73fb3f8814a7 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -24,7 +24,7 @@ import ( // events than the limit the buffer is configured with. var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded") -// Event is the unit of what can be added to the buffer. +// Event is a contract for the unit of what can be added to the buffer. type Event interface { Timestamp() hlc.Timestamp } @@ -33,25 +33,25 @@ type Event interface { // accumulates raw events which can then be flushed out in timestamp sorted // order en-masse whenever the rangefeed frontier is bumped. If we accumulate // more events than the limit allows for, we error out to the caller. -type Buffer struct { +type Buffer[E Event] struct { mu struct { syncutil.Mutex - events + events[E] frontier hlc.Timestamp limit int } } // New constructs a Buffer with the provided limit. -func New(limit int) *Buffer { - b := &Buffer{} +func New[E Event](limit int) *Buffer[E] { + b := &Buffer[E]{} b.mu.limit = limit return b } // Add adds the given entry to the buffer. -func (b *Buffer) Add(ev Event) error { +func (b *Buffer[E]) Add(ev E) error { b.mu.Lock() defer b.mu.Unlock() @@ -73,7 +73,7 @@ func (b *Buffer) Add(ev Event) error { // less than or equal to the provided frontier timestamp. The timestamp is // recorded (expected to monotonically increase), and future events with // timestamps less than or equal to it are discarded. -func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) { +func (b *Buffer[E]) Flush(ctx context.Context, frontier hlc.Timestamp) (events []E) { b.mu.Lock() defer b.mu.Unlock() @@ -97,17 +97,17 @@ func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Ev // SetLimit is used to limit the number of events the buffer internally tracks. // If already in excess of the limit, future additions will error out (until the // buffer is Flush()-ed at least). -func (b *Buffer) SetLimit(limit int) { +func (b *Buffer[E]) SetLimit(limit int) { b.mu.Lock() defer b.mu.Unlock() b.mu.limit = limit } -type events []Event +type events[E Event] []E -var _ sort.Interface = (*events)(nil) +var _ sort.Interface = (*events[Event])(nil) -func (es *events) Len() int { return len(*es) } -func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) } -func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } +func (es *events[E]) Len() int { return len(*es) } +func (es *events[E]) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) } +func (es *events[E]) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go index ae125c442cc3..2b088ac6dd7a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -30,7 +30,7 @@ func TestBuffer(t *testing.T) { ctx := context.Background() const limit = 25 - buffer := rangefeedbuffer.New(limit) + buffer := rangefeedbuffer.New[*testEvent](limit) { // Sanity check the newly initialized rangefeed buffer. events := buffer.Flush(ctx, ts(0)) @@ -62,16 +62,16 @@ func TestBuffer(t *testing.T) { events := buffer.Flush(ctx, ts(14)) require.True(t, len(events) == 3) - require.Equal(t, events[0].(*testEvent).data, "b") // b@11 - require.Equal(t, events[1].(*testEvent).data, "d") // d@12 - require.Equal(t, events[2].(*testEvent).data, "a") // a@13 + require.Equal(t, events[0].data, "b") // b@11 + require.Equal(t, events[1].data, "d") // d@12 + require.Equal(t, events[2].data, "a") // a@13 } { // Incremental advances should only surface the events until the given timestamp. events := buffer.Flush(ctx, ts(15)) require.True(t, len(events) == 1) - require.Equal(t, events[0].(*testEvent).data, "c") // c@15 + require.Equal(t, events[0].data, "c") // c@15 } { // Adding events with timestamps <= the last flush are discarded. @@ -90,14 +90,14 @@ func TestBuffer(t *testing.T) { events := buffer.Flush(ctx, ts(20)) require.True(t, len(events) == 2) - require.Equal(t, events[0].(*testEvent).data, "e") // e@18 - require.Equal(t, events[1].(*testEvent).data, "f") // f@19 + require.Equal(t, events[0].data, "e") // e@18 + require.Equal(t, events[1].data, "f") // f@19 } { // Ensure that a timestamp greater than any previous event flushes everything. events := buffer.Flush(ctx, ts(100)) require.True(t, len(events) == 1) - require.Equal(t, events[0].(*testEvent).data, "g") // g@21 + require.Equal(t, events[0].data, "g") // g@21 } { // Sanity check that there are no events left over. @@ -138,6 +138,6 @@ func (t *testEvent) Timestamp() hlc.Timestamp { var _ rangefeedbuffer.Event = &testEvent{} -func makeEvent(data string, ts hlc.Timestamp) rangefeedbuffer.Event { +func makeEvent(data string, ts hlc.Timestamp) *testEvent { return &testEvent{data: data, ts: ts} } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go index b18ac18e270f..247ebd187d68 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go @@ -19,13 +19,12 @@ import ( // RangeFeedValueEventToKV is a function to type assert an Event into a // *kvpb.RangeFeedValue and then convert it to a roachpb.KeyValue. -func RangeFeedValueEventToKV(event Event) roachpb.KeyValue { - rfv := event.(*kvpb.RangeFeedValue) +func RangeFeedValueEventToKV(rfv *kvpb.RangeFeedValue) roachpb.KeyValue { return roachpb.KeyValue{Key: rfv.Key, Value: rfv.Value} } // EventsToKVs converts a slice of Events to a slice of KeyValue pairs. -func EventsToKVs(events []Event, f func(ev Event) roachpb.KeyValue) []roachpb.KeyValue { +func EventsToKVs[E Event](events []E, f func(ev E) roachpb.KeyValue) []roachpb.KeyValue { kvs := make([]roachpb.KeyValue, 0, len(events)) for _, ev := range events { kvs = append(kvs, f(ev)) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go index db1af09e4ed9..2d59b1d4c869 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go @@ -58,8 +58,8 @@ func TestMergeKVs(t *testing.T) { } return kvs } - toBuffer := func(t *testing.T, rows []row) *Buffer { - buf := New(len(rows)) + toBuffer := func(t *testing.T, rows []row) *Buffer[*kvpb.RangeFeedValue] { + buf := New[*kvpb.RangeFeedValue](len(rows)) for _, r := range rows { require.NoError(t, buf.Add(toRangeFeedEvent(r))) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go index b4338c0331cb..95a977dfd839 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go @@ -27,7 +27,7 @@ import ( // cache provides a consistent snapshot when available, but the snapshot // may be stale. type Cache struct { - w *Watcher + w *Watcher[*kvpb.RangeFeedValue] mu struct { syncutil.RWMutex @@ -75,7 +75,7 @@ func (c *Cache) GetSnapshot() ([]roachpb.KeyValue, hlc.Timestamp, bool) { return c.mu.data, c.mu.timestamp, true } -func (c *Cache) handleUpdate(ctx context.Context, update Update) { +func (c *Cache) handleUpdate(ctx context.Context, update Update[*kvpb.RangeFeedValue]) { updateKVs := rangefeedbuffer.EventsToKVs(update.Events, rangefeedbuffer.RangeFeedValueEventToKV) var updatedData []roachpb.KeyValue @@ -96,6 +96,8 @@ func (c *Cache) handleUpdate(ctx context.Context, update Update) { c.mu.timestamp = update.Timestamp } -func passThroughTranslation(ctx context.Context, value *kvpb.RangeFeedValue) rangefeedbuffer.Event { - return value +func passThroughTranslation( + ctx context.Context, value *kvpb.RangeFeedValue, +) (*kvpb.RangeFeedValue, bool) { + return value, value != nil } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go index 17c41112d2a7..983162dc1b7a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go @@ -63,7 +63,7 @@ import ( // Users seeking to leverage the Updates which arrive with that delay but also // react to the row-level events as they arrive can hijack the translateEvent // function to trigger some non-blocking action. -type Watcher struct { +type Watcher[E rangefeedbuffer.Event] struct { name redact.SafeString clock *hlc.Clock rangefeedFactory *rangefeed.Factory @@ -74,8 +74,8 @@ type Watcher struct { started int32 // accessed atomically - translateEvent TranslateEventFunc - onUpdate OnUpdateFunc + translateEvent TranslateEventFunc[E] + onUpdate OnUpdateFunc[E] lastFrontierTS hlc.Timestamp // used to assert monotonicity across rangefeed attempts @@ -107,20 +107,20 @@ func (u UpdateType) String() string { } // TranslateEventFunc is used by the client to translate a low-level event -// into an event for buffering. If nil is returned, the event is skipped. -type TranslateEventFunc func( +// into an event for buffering. If false is returned, the event is skipped. +type TranslateEventFunc[E rangefeedbuffer.Event] func( context.Context, *kvpb.RangeFeedValue, -) rangefeedbuffer.Event +) (_ E, ok bool) // OnUpdateFunc is used by the client to receive an Update, which is a batch // of events which represent either a CompleteUpdate or an IncrementalUpdate. -type OnUpdateFunc func(context.Context, Update) +type OnUpdateFunc[E rangefeedbuffer.Event] func(context.Context, Update[E]) // Update corresponds to a set of events derived from the underlying RangeFeed. -type Update struct { +type Update[E rangefeedbuffer.Event] struct { Type UpdateType Timestamp hlc.Timestamp - Events []rangefeedbuffer.Event + Events []E } // TestingKnobs allows tests to inject behavior into the Watcher. @@ -162,7 +162,7 @@ var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) // Callers can control whether the values passed to translateEvent carry a // populated PrevValue using the withPrevValue parameter. See // rangefeed.WithDiff for more details. -func NewWatcher( +func NewWatcher[E rangefeedbuffer.Event]( name redact.SafeString, clock *hlc.Clock, rangeFeedFactory *rangefeed.Factory, @@ -170,11 +170,11 @@ func NewWatcher( spans []roachpb.Span, withPrevValue bool, withRowTSInInitialScan bool, - translateEvent TranslateEventFunc, - onUpdate OnUpdateFunc, + translateEvent TranslateEventFunc[E], + onUpdate OnUpdateFunc[E], knobs *TestingKnobs, -) *Watcher { - w := &Watcher{ +) *Watcher[E] { + w := &Watcher[E]{ name: name, clock: clock, rangefeedFactory: rangeFeedFactory, @@ -194,7 +194,9 @@ func NewWatcher( // Start calls Run on the Watcher as an async task with exponential backoff. // If the Watcher ran for what is deemed long enough, the backoff is reset. -func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func(error)) error { +func Start[E rangefeedbuffer.Event]( + ctx context.Context, stopper *stop.Stopper, c *Watcher[E], onError func(error), +) error { return stopper.RunAsyncTask(ctx, string(c.name), func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -231,7 +233,7 @@ func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func( // This is a blocking operation, returning only when the context is canceled, // or when an error occurs. For the latter, it's expected that callers will // re-run the watcher. -func (s *Watcher) Run(ctx context.Context) error { +func (s *Watcher[E]) Run(ctx context.Context) error { if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { log.Fatal(ctx, "currently started: only allowed once at any point in time") } @@ -252,7 +254,7 @@ func (s *Watcher) Run(ctx context.Context) error { // transparently query the backing table if the record requested is not found. // We could also have the initial scan operate in chunks, handing off results // to the caller incrementally, all within the "initial scan" phase. - buffer := rangefeedbuffer.New(math.MaxInt) + buffer := rangefeedbuffer.New[E](math.MaxInt) frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error) mu := struct { // serializes access between the rangefeed and the main thread here syncutil.Mutex @@ -266,8 +268,8 @@ func (s *Watcher) Run(ctx context.Context) error { }() onValue := func(ctx context.Context, ev *kvpb.RangeFeedValue) { - bEv := s.translateEvent(ctx, ev) - if bEv == nil { + bEv, ok := s.translateEvent(ctx, ev) + if !ok { return } @@ -375,14 +377,14 @@ var restartErr = errors.New("testing restart requested") // it to restart. This is separate from the testing knob so that we // can force a restart from test infrastructure without overriding the // user-provided testing knobs. -func (s *Watcher) TestingRestart() { +func (s *Watcher[E]) TestingRestart() { s.restartErrCh <- restartErr } -func (s *Watcher) handleUpdate( - ctx context.Context, buffer *rangefeedbuffer.Buffer, ts hlc.Timestamp, updateType UpdateType, +func (s *Watcher[E]) handleUpdate( + ctx context.Context, buffer *rangefeedbuffer.Buffer[E], ts hlc.Timestamp, updateType UpdateType, ) { - s.onUpdate(ctx, Update{ + s.onUpdate(ctx, Update[E]{ Type: updateType, Timestamp: ts, Events: buffer.Flush(ctx, ts), diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go index c1f6906e5300..29ce7ccbcf62 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -56,11 +55,11 @@ func TestWatchAuthErr(t *testing.T) { []roachpb.Span{hostScratchSpan}, false, /* withPrevValue */ true, /* withRowTSInInitialScan */ - func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + func(ctx context.Context, kv *kvpb.RangeFeedValue) (*kvpb.RangeFeedValue, bool) { t.Fatalf("rangefeed should fail before producing results") - return nil + return nil, false }, - func(ctx context.Context, update rangefeedcache.Update) { + func(ctx context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue]) { t.Fatalf("rangefeed should fail before producing results") }, &rangefeedcache.TestingKnobs{}) diff --git a/pkg/kv/kvserver/client_replica_backpressure_test.go b/pkg/kv/kvserver/client_replica_backpressure_test.go index 2f99f2d9373f..699998de26db 100644 --- a/pkg/kv/kvserver/client_replica_backpressure_test.go +++ b/pkg/kv/kvserver/client_replica_backpressure_test.go @@ -42,7 +42,6 @@ func TestBackpressureNotAppliedWhenReducingRangeSize(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes >1m under race") - skip.UnderRemoteExecutionWithIssue(t, 113032, "probable OOM") rRand, _ := randutil.NewTestRand() ctx := context.Background() diff --git a/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go b/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go index 2342f92a32a0..ce254c17fdcb 100644 --- a/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go +++ b/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go @@ -65,7 +65,7 @@ type Watcher struct { // rfc provides access to the underlying // rangefeedcache.Watcher for testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[rangefeedbuffer.Event] // initialScan is used to synchronize the Start() method with the // reception of the initial batch of values from the rangefeed @@ -282,7 +282,9 @@ func (w *Watcher) onError(err error) { } } -func (w *Watcher) handleRangefeedCacheEvent(ctx context.Context, u rangefeedcache.Update) { +func (w *Watcher) handleRangefeedCacheEvent( + ctx context.Context, u rangefeedcache.Update[rangefeedbuffer.Event], +) { switch u.Type { case rangefeedcache.CompleteUpdate: log.Info(ctx, "received results of a full table scan for tenant capabilities") @@ -335,10 +337,10 @@ func (w *Watcher) onAnyChangeLocked() { // callback. func (w *Watcher) handleIncrementalUpdate( ctx context.Context, ev *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (rangefeedbuffer.Event, bool) { hasEvent, update := w.decoder.translateEvent(ctx, ev) if !hasEvent { - return nil + return nil, false } if fn := w.knobs.WatcherUpdatesInterceptor; fn != nil { @@ -352,7 +354,7 @@ func (w *Watcher) handleIncrementalUpdate( if prevTs, ok := w.mu.lastUpdate[tid]; ok && ts.Less(prevTs) { // Skip updates which have an earlier timestamp to avoid // regressing on the contents of an entry. - return nil + return nil, false } w.mu.lastUpdate[tid] = ts @@ -376,7 +378,7 @@ func (w *Watcher) handleIncrementalUpdate( } } w.onAnyChangeLocked() - return nil + return nil, false } func (w *Watcher) removeEntriesBeforeTimestamp(ctx context.Context, ts hlc.Timestamp) { diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index db2b1a16d34b..fa428ed31f1e 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -93,7 +93,7 @@ type SettingsWatcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*kvpb.RangeFeedValue] } // Storage is used to write a snapshot of KVs out to disk for use upon restart. @@ -182,7 +182,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }{ ch: make(chan struct{}), } - noteUpdate := func(update rangefeedcache.Update) { + noteUpdate := func(update rangefeedcache.Update[*kvpb.RangeFeedValue]) { if update.Type != rangefeedcache.CompleteUpdate { return } @@ -250,10 +250,10 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { []roachpb.Span{settingsTableSpan}, false, // withPrevValue true, // withRowTSInInitialScan - func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + func(ctx context.Context, kv *kvpb.RangeFeedValue) (*kvpb.RangeFeedValue, bool) { return s.handleKV(ctx, kv) }, - func(ctx context.Context, update rangefeedcache.Update) { + func(ctx context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue]) { noteUpdate(update) }, s.testingWatcherKnobs, @@ -340,7 +340,7 @@ func (s *SettingsWatcher) TestingRestart() { func (s *SettingsWatcher) handleKV( ctx context.Context, kv *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (*kvpb.RangeFeedValue, bool) { rkv := roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, @@ -352,19 +352,19 @@ func (s *SettingsWatcher) handleKV( // This should never happen: the rangefeed should only ever deliver valid SQL rows. err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key) logcrash.ReportOrPanic(ctx, &s.settings.SV, "%w", err) - return nil + return nil, false } settingKey := settings.InternalKey(settingKeyS) setting, ok := settings.LookupForLocalAccessByKey(settingKey, s.codec.ForSystemTenant()) if !ok { log.Warningf(ctx, "unknown setting %s, skipping update", settingKey) - return nil + return nil, false } if !s.codec.ForSystemTenant() { if setting.Class() != settings.ApplicationLevel { log.Warningf(ctx, "ignoring read-only setting %s", settingKey) - return nil + return nil, false } } @@ -389,9 +389,9 @@ func (s *SettingsWatcher) handleKV( tombstone: tombstone, }, setting.Class()) if s.storage != nil { - return kv + return kv, true } - return nil + return nil, false } // maybeSet will update the stored value and the corresponding setting diff --git a/pkg/server/systemconfigwatcher/cache.go b/pkg/server/systemconfigwatcher/cache.go index 7f01e78306f0..71aca8494494 100644 --- a/pkg/server/systemconfigwatcher/cache.go +++ b/pkg/server/systemconfigwatcher/cache.go @@ -31,7 +31,7 @@ import ( // cache provides a consistent snapshot when available, but the snapshot // may be stale. type Cache struct { - w *rangefeedcache.Watcher + w *rangefeedcache.Watcher[*kvpb.RangeFeedValue] defaultZoneConfig *zonepb.ZoneConfig additionalKVsSource config.SystemConfigProvider mu struct { @@ -222,7 +222,9 @@ func (k keyValues) Less(i, j int) bool { return k[i].Key.Compare(k[j].Key) < 0 } var _ sort.Interface = (keyValues)(nil) -func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { +func (c *Cache) handleUpdate( + _ context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue], +) { updateKVs := rangefeedbuffer.EventsToKVs(update.Events, rangefeedbuffer.RangeFeedValueEventToKV) c.mu.Lock() @@ -256,8 +258,10 @@ func (c *Cache) setUpdatedConfigLocked(updated *config.SystemConfig) { } } -func passThroughTranslation(ctx context.Context, value *kvpb.RangeFeedValue) rangefeedbuffer.Event { - return value +func passThroughTranslation( + ctx context.Context, value *kvpb.RangeFeedValue, +) (*kvpb.RangeFeedValue, bool) { + return value, value != nil } var _ config.SystemConfigProvider = (*Cache)(nil) diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index d5ee4643b3fe..a44ccda8a0a0 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -64,7 +64,7 @@ type Watcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[rangefeedbuffer.Event] mu struct { syncutil.Mutex // Used by TestingRestart. @@ -134,30 +134,30 @@ func (w *Watcher) startRangeFeed( allOverrides := make(map[roachpb.TenantID][]kvpb.TenantSetting) - translateEvent := func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + translateEvent := func(ctx context.Context, kv *kvpb.RangeFeedValue) (rangefeedbuffer.Event, bool) { tenantID, setting, tombstone, err := w.dec.DecodeRow(roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, }) if err != nil { log.Warningf(ctx, "failed to decode settings row %v: %v", kv.Key, err) - return nil + return nil, false } if allOverrides != nil { // We are in the process of doing a full table scan if tombstone { log.Warning(ctx, "unexpected empty value during rangefeed scan") - return nil + return nil, false } allOverrides[tenantID] = append(allOverrides[tenantID], setting) } else { // We are processing incremental changes. w.store.setTenantOverride(ctx, tenantID, setting) } - return nil + return nil, false } - onUpdate := func(ctx context.Context, update rangefeedcache.Update) { + onUpdate := func(ctx context.Context, update rangefeedcache.Update[rangefeedbuffer.Event]) { if update.Type == rangefeedcache.CompleteUpdate { // The CompleteUpdate indicates that the table scan is complete. // Henceforth, all calls to translateEvent will be incremental changes, diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 7f458cf8aff9..c2a82146d2b0 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -123,7 +123,7 @@ type KVSubscriber struct { knobs *spanconfig.TestingKnobs settings *cluster.Settings - rfc *rangefeedcache.Watcher + rfc *rangefeedcache.Watcher[*BufferEvent] mu struct { // serializes between Start and external threads syncutil.RWMutex @@ -401,7 +401,7 @@ func (s *KVSubscriber) GetProtectionTimestamps( return protectionTimestamps, s.mu.lastUpdated, nil } -func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update) { +func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update[*BufferEvent]) { switch u.Type { case rangefeedcache.CompleteUpdate: s.handleCompleteUpdate(ctx, u.Timestamp, u.Events) @@ -411,11 +411,11 @@ func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update } func (s *KVSubscriber) handleCompleteUpdate( - ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event, + ctx context.Context, ts hlc.Timestamp, events []*BufferEvent, ) { freshStore := spanconfigstore.New(s.fallback, s.settings, s.boundsReader, s.knobs) for _, ev := range events { - freshStore.Apply(ctx, false /* dryrun */, ev.(*BufferEvent).Update) + freshStore.Apply(ctx, false /* dryrun */, ev.Update) } handlers := func() []handler { s.mu.Lock() @@ -438,7 +438,7 @@ func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) { } func (s *KVSubscriber) handlePartialUpdate( - ctx context.Context, ts hlc.Timestamp, events []rangefeedbuffer.Event, + ctx context.Context, ts hlc.Timestamp, events []*BufferEvent, ) { // The events we've received from the rangefeed buffer are sorted in // increasing timestamp order. However, any updates with the same timestamp @@ -456,7 +456,7 @@ func (s *KVSubscriber) handlePartialUpdate( case 1: // ts(i) > ts(j) return false case 0: // ts(i) == ts(j); deletions sort before additions - return events[i].(*BufferEvent).Deletion() // no need to worry about the sort being stable + return events[i].Deletion() // no need to worry about the sort being stable default: panic("unexpected") } @@ -469,7 +469,7 @@ func (s *KVSubscriber) handlePartialUpdate( // atomically, the updates need to be non-overlapping. That's not the case // here because we can have deletion events followed by additions for // overlapping spans. - s.mu.internal.Apply(ctx, false /* dryrun */, ev.(*BufferEvent).Update) + s.mu.internal.Apply(ctx, false /* dryrun */, ev.Update) } s.setLastUpdatedLocked(ts) return s.mu.handlers @@ -478,7 +478,7 @@ func (s *KVSubscriber) handlePartialUpdate( for i := range handlers { handler := &handlers[i] // mutated by invoke for _, ev := range events { - target := ev.(*BufferEvent).Update.GetTarget() + target := ev.Update.GetTarget() handler.invoke(ctx, target.KeyspaceTargeted()) } } diff --git a/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go b/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go index 23160c024449..eb9f68360f5e 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/spanconfigdecoder.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -92,7 +91,7 @@ func (sd *SpanConfigDecoder) decode(kv roachpb.KeyValue) (spanconfig.Record, err func (sd *SpanConfigDecoder) TranslateEvent( ctx context.Context, ev *kvpb.RangeFeedValue, -) rangefeedbuffer.Event { +) (*BufferEvent, bool) { deleted := !ev.Value.IsPresent() var value roachpb.Value if deleted { @@ -100,7 +99,7 @@ func (sd *SpanConfigDecoder) TranslateEvent( // It's possible to write a KV tombstone on top of another KV // tombstone -- both the new and old value will be empty. We simply // ignore these events. - return nil + return nil, false } // Since the end key is not part of the primary key, we need to @@ -132,7 +131,7 @@ func (sd *SpanConfigDecoder) TranslateEvent( update = spanconfig.Update(record) } - return &BufferEvent{update, ev.Value.Timestamp} + return &BufferEvent{update, ev.Value.Timestamp}, true } // TestingDecoderFn exports the decoding routine for testing purposes. diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go index 92faf7eb3a14..05edf20a3eb3 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go @@ -34,7 +34,7 @@ type buffer struct { syncutil.Mutex // rangefeed.Buffer stores spanconfigsqlwatcher.Events. - buffer *rangefeedbuffer.Buffer + buffer *rangefeedbuffer.Buffer[event] // rangefeedFrontiers tracks the frontier timestamps of individual // rangefeeds established by the SQLWatcher. @@ -75,7 +75,7 @@ const ( // newBuffer constructs a new buffer initialized with a starting frontier // timestamp. func newBuffer(limit int, initialFrontierTS hlc.Timestamp) *buffer { - rangefeedBuffer := rangefeedbuffer.New(limit) + rangefeedBuffer := rangefeedbuffer.New[event](limit) eventBuffer := &buffer{} eventBuffer.mu.buffer = rangefeedBuffer for i := range eventBuffer.mu.rangefeedFrontiers { @@ -167,7 +167,7 @@ var _ sort.Interface = &events{} // returns a list of relevant events which were buffered up to that timestamp. func (b *buffer) flushEvents( ctx context.Context, -) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) { +) (updates []event, combinedFrontierTS hlc.Timestamp) { b.mu.Lock() defer b.mu.Unlock() // First we determine the checkpoint timestamp, which is the minimum @@ -196,7 +196,7 @@ func (b *buffer) flush( ev, prevEv) } } - evs = append(evs, ev.(event)) + evs = append(evs, ev) } // Nil out the underlying slice since we have copied over the events. bufferedEvents = nil diff --git a/pkg/sql/opt/exec/execbuilder/cascades.go b/pkg/sql/opt/exec/execbuilder/cascades.go index 467e5d0cb167..82deabf8621b 100644 --- a/pkg/sql/opt/exec/execbuilder/cascades.go +++ b/pkg/sql/opt/exec/execbuilder/cascades.go @@ -71,39 +71,39 @@ import ( // processed (in a queue). At this time the PlanFn method is called and the // following happens: // -// 1. We set up a new empty memo and add metadata for the columns of the -// buffer node (binding &1). +// 1: We set up a new empty memo and add metadata for the columns of the +// buffer node (binding &1). // -// 2. We invoke the memo.CascadeBuilder to optbuild the cascading query. At this -// point, the new memo will contain the following expression: +// 2: We invoke the memo.CascadeBuilder to optbuild the cascading query. At +// this point, the new memo will contain the following expression: // -// delete child -// ├── columns: -// ├── fetch columns: c:4 child.p:5 -// └── semi-join (hash) -// ├── columns: c:4!null child.p:5!null -// ├── scan child -// │ └── columns: c:4!null child.p:5!null -// ├── with-scan &1 -// │ ├── columns: p:6!null -// │ └── mapping: -// │ └── parent.p:1 => p:6 -// └── filters -// └── child.p:5 = p:6 +// delete child +// ├── columns: +// ├── fetch columns: c:4 child.p:5 +// └── semi-join (hash) +// ├── columns: c:4!null child.p:5!null +// ├── scan child +// │ └── columns: c:4!null child.p:5!null +// ├── with-scan &1 +// │ ├── columns: p:6!null +// │ └── mapping: +// │ └── parent.p:1 => p:6 +// └── filters +// └── child.p:5 = p:6 // -// Notes: -// - normally, a WithScan can only refer to an ancestor mutation or With -// operator. In this case we are creating a reference "out of the void". -// This works just fine; we can consider adding a special dummy root -// operator but so far it hasn't been necessary; -// - the binding &1 column ID has changed: it used to be 2, it is now 1. -// This is because we are starting with a fresh memo. We need to take into -// account this remapping when referring to the foreign key columns. +// Notes: +// - normally, a WithScan can only refer to an ancestor mutation or With +// operator. In this case we are creating a reference "out of the void". +// This works just fine; we can consider adding a special dummy root +// operator but so far it hasn't been necessary; +// - the binding &1 column ID has changed: it used to be 2, it is now 1. +// This is because we are starting with a fresh memo. We need to take into +// account this remapping when referring to the foreign key columns. // -// 3. We optimize the newly built expression. +// 3: We optimize the newly built expression. // -// 4. We execbuild the optimizer expression. We have to be careful to set up -// the "With" reference before starting. +// 4: We execbuild the optimizer expression. We have to be careful to set up +// the "With" reference before starting. // // After PlanFn is called, the resulting plan is executed. Note that this plan // could itself have more exec.Cascades; these are queued and handled in the