Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119467: opt: fix formatting of cascadeBuilder doc comment r=mgartner a=mgartner

Epic: None

Release note: None

119870: rangefeedbuffer: parameterize by generic Event type r=nvanbenschoten a=nvanbenschoten

This commit replaces the rangefeedbuffer package's use of Event interface objects with a generic type parameter. In doing so, it avoids having users of the package (e.g. spanconfigkvsubscriber) need to type assert the Event interface objects back to the specific event type they are interested in and know that their TranslateEventFunc has decoded events into.

Epic: None
Release note: None

119876: kvserver: unskip `TestBackpressureNotApplied...` under remote exec r=rail a=rickystewart

There should be enough RAM now.

Epic: CRDB-8308
Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
4 people committed Mar 4, 2024
4 parents 61ad745 + 0d01f6a + 78c81b2 + 20ac603 commit c52d468
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 137 deletions.
17 changes: 9 additions & 8 deletions pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type spanConfigEventStream struct {
data tree.Datums // Data to send to the consumer

// Fields below initialized when Start called.
rfc *rangefeedcache.Watcher
rfc *rangefeedcache.Watcher[*spanconfigkvsubscriber.BufferEvent]
streamGroup ctxgroup.Group // Context group controlling stream execution.
doneChan chan struct{} // Channel signaled to close the stream loop.
updateCh chan rangefeedcache.Update
updateCh chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent]
errCh chan error // Signaled when error occurs in rangefeed.
streamCh chan tree.Datums // Channel signaled to forward datums to consumer.
sp *tracing.Span // Span representing the lifetime of the eventStream.
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error {
s.errCh = make(chan error, 1)

// updateCh gets buffered RangeFeedEvents and is consumed by the ValueGenerator.
s.updateCh = make(chan rangefeedcache.Update)
s.updateCh = make(chan rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent])

// Stream channel receives datums to be sent to the consumer.
s.streamCh = make(chan tree.Datums)
Expand Down Expand Up @@ -178,7 +178,9 @@ func (s *spanConfigEventStream) Close(ctx context.Context) {
s.sp.Finish()
}

func (s *spanConfigEventStream) handleUpdate(ctx context.Context, update rangefeedcache.Update) {
func (s *spanConfigEventStream) handleUpdate(
ctx context.Context, update rangefeedcache.Update[*spanconfigkvsubscriber.BufferEvent],
) {
select {
case <-ctx.Done():
log.Warningf(ctx, "rangefeedcache context cancelled with error %s", ctx.Err())
Expand Down Expand Up @@ -233,8 +235,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
batcher.spanConfigFrontier = hlc.MinTimestamp
}
for _, ev := range update.Events {
spcfgEvent := ev.(*spanconfigkvsubscriber.BufferEvent)
target := spcfgEvent.Update.GetTarget()
target := ev.Update.GetTarget()
if target.IsSystemTarget() {
// We skip replicating SystemTarget Span configs as they are not
// necessary to replicate. System target span configurations are
Expand All @@ -259,9 +260,9 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
streamedSpanCfgEntry := streampb.StreamedSpanConfigEntry{
SpanConfig: roachpb.SpanConfigEntry{
Target: target.ToProto(),
Config: spcfgEvent.Update.GetConfig(),
Config: ev.Update.GetConfig(),
},
Timestamp: spcfgEvent.Timestamp(),
Timestamp: ev.Timestamp(),
FromFullScan: fromFullScan,
}
bufferedEvents = append(bufferedEvents, streamedSpanCfgEntry)
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// events than the limit the buffer is configured with.
var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded")

// Event is the unit of what can be added to the buffer.
// Event is a contract for the unit of what can be added to the buffer.
type Event interface {
Timestamp() hlc.Timestamp
}
Expand All @@ -33,25 +33,25 @@ type Event interface {
// accumulates raw events which can then be flushed out in timestamp sorted
// order en-masse whenever the rangefeed frontier is bumped. If we accumulate
// more events than the limit allows for, we error out to the caller.
type Buffer struct {
type Buffer[E Event] struct {
mu struct {
syncutil.Mutex

events
events[E]
frontier hlc.Timestamp
limit int
}
}

// New constructs a Buffer with the provided limit.
func New(limit int) *Buffer {
b := &Buffer{}
func New[E Event](limit int) *Buffer[E] {
b := &Buffer[E]{}
b.mu.limit = limit
return b
}

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ev Event) error {
func (b *Buffer[E]) Add(ev E) error {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -73,7 +73,7 @@ func (b *Buffer) Add(ev Event) error {
// less than or equal to the provided frontier timestamp. The timestamp is
// recorded (expected to monotonically increase), and future events with
// timestamps less than or equal to it are discarded.
func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) {
func (b *Buffer[E]) Flush(ctx context.Context, frontier hlc.Timestamp) (events []E) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -97,17 +97,17 @@ func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Ev
// SetLimit is used to limit the number of events the buffer internally tracks.
// If already in excess of the limit, future additions will error out (until the
// buffer is Flush()-ed at least).
func (b *Buffer) SetLimit(limit int) {
func (b *Buffer[E]) SetLimit(limit int) {
b.mu.Lock()
defer b.mu.Unlock()

b.mu.limit = limit
}

type events []Event
type events[E Event] []E

var _ sort.Interface = (*events)(nil)
var _ sort.Interface = (*events[Event])(nil)

func (es *events) Len() int { return len(*es) }
func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) }
func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] }
func (es *events[E]) Len() int { return len(*es) }
func (es *events[E]) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) }
func (es *events[E]) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] }
18 changes: 9 additions & 9 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBuffer(t *testing.T) {

ctx := context.Background()
const limit = 25
buffer := rangefeedbuffer.New(limit)
buffer := rangefeedbuffer.New[*testEvent](limit)

{ // Sanity check the newly initialized rangefeed buffer.
events := buffer.Flush(ctx, ts(0))
Expand Down Expand Up @@ -62,16 +62,16 @@ func TestBuffer(t *testing.T) {
events := buffer.Flush(ctx, ts(14))

require.True(t, len(events) == 3)
require.Equal(t, events[0].(*testEvent).data, "b") // b@11
require.Equal(t, events[1].(*testEvent).data, "d") // d@12
require.Equal(t, events[2].(*testEvent).data, "a") // a@13
require.Equal(t, events[0].data, "b") // b@11
require.Equal(t, events[1].data, "d") // d@12
require.Equal(t, events[2].data, "a") // a@13
}

{ // Incremental advances should only surface the events until the given timestamp.
events := buffer.Flush(ctx, ts(15))

require.True(t, len(events) == 1)
require.Equal(t, events[0].(*testEvent).data, "c") // c@15
require.Equal(t, events[0].data, "c") // c@15
}

{ // Adding events with timestamps <= the last flush are discarded.
Expand All @@ -90,14 +90,14 @@ func TestBuffer(t *testing.T) {

events := buffer.Flush(ctx, ts(20))
require.True(t, len(events) == 2)
require.Equal(t, events[0].(*testEvent).data, "e") // e@18
require.Equal(t, events[1].(*testEvent).data, "f") // f@19
require.Equal(t, events[0].data, "e") // e@18
require.Equal(t, events[1].data, "f") // f@19
}

{ // Ensure that a timestamp greater than any previous event flushes everything.
events := buffer.Flush(ctx, ts(100))
require.True(t, len(events) == 1)
require.Equal(t, events[0].(*testEvent).data, "g") // g@21
require.Equal(t, events[0].data, "g") // g@21
}

{ // Sanity check that there are no events left over.
Expand Down Expand Up @@ -138,6 +138,6 @@ func (t *testEvent) Timestamp() hlc.Timestamp {

var _ rangefeedbuffer.Event = &testEvent{}

func makeEvent(data string, ts hlc.Timestamp) rangefeedbuffer.Event {
func makeEvent(data string, ts hlc.Timestamp) *testEvent {
return &testEvent{data: data, ts: ts}
}
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (

// RangeFeedValueEventToKV is a function to type assert an Event into a
// *kvpb.RangeFeedValue and then convert it to a roachpb.KeyValue.
func RangeFeedValueEventToKV(event Event) roachpb.KeyValue {
rfv := event.(*kvpb.RangeFeedValue)
func RangeFeedValueEventToKV(rfv *kvpb.RangeFeedValue) roachpb.KeyValue {
return roachpb.KeyValue{Key: rfv.Key, Value: rfv.Value}
}

// EventsToKVs converts a slice of Events to a slice of KeyValue pairs.
func EventsToKVs(events []Event, f func(ev Event) roachpb.KeyValue) []roachpb.KeyValue {
func EventsToKVs[E Event](events []E, f func(ev E) roachpb.KeyValue) []roachpb.KeyValue {
kvs := make([]roachpb.KeyValue, 0, len(events))
for _, ev := range events {
kvs = append(kvs, f(ev))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func TestMergeKVs(t *testing.T) {
}
return kvs
}
toBuffer := func(t *testing.T, rows []row) *Buffer {
buf := New(len(rows))
toBuffer := func(t *testing.T, rows []row) *Buffer[*kvpb.RangeFeedValue] {
buf := New[*kvpb.RangeFeedValue](len(rows))
for _, r := range rows {
require.NoError(t, buf.Add(toRangeFeedEvent(r)))
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// cache provides a consistent snapshot when available, but the snapshot
// may be stale.
type Cache struct {
w *Watcher
w *Watcher[*kvpb.RangeFeedValue]

mu struct {
syncutil.RWMutex
Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *Cache) GetSnapshot() ([]roachpb.KeyValue, hlc.Timestamp, bool) {
return c.mu.data, c.mu.timestamp, true
}

func (c *Cache) handleUpdate(ctx context.Context, update Update) {
func (c *Cache) handleUpdate(ctx context.Context, update Update[*kvpb.RangeFeedValue]) {
updateKVs := rangefeedbuffer.EventsToKVs(update.Events,
rangefeedbuffer.RangeFeedValueEventToKV)
var updatedData []roachpb.KeyValue
Expand All @@ -96,6 +96,8 @@ func (c *Cache) handleUpdate(ctx context.Context, update Update) {
c.mu.timestamp = update.Timestamp
}

func passThroughTranslation(ctx context.Context, value *kvpb.RangeFeedValue) rangefeedbuffer.Event {
return value
func passThroughTranslation(
ctx context.Context, value *kvpb.RangeFeedValue,
) (*kvpb.RangeFeedValue, bool) {
return value, value != nil
}
48 changes: 25 additions & 23 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import (
// Users seeking to leverage the Updates which arrive with that delay but also
// react to the row-level events as they arrive can hijack the translateEvent
// function to trigger some non-blocking action.
type Watcher struct {
type Watcher[E rangefeedbuffer.Event] struct {
name redact.SafeString
clock *hlc.Clock
rangefeedFactory *rangefeed.Factory
Expand All @@ -74,8 +74,8 @@ type Watcher struct {

started int32 // accessed atomically

translateEvent TranslateEventFunc
onUpdate OnUpdateFunc
translateEvent TranslateEventFunc[E]
onUpdate OnUpdateFunc[E]

lastFrontierTS hlc.Timestamp // used to assert monotonicity across rangefeed attempts

Expand Down Expand Up @@ -107,20 +107,20 @@ func (u UpdateType) String() string {
}

// TranslateEventFunc is used by the client to translate a low-level event
// into an event for buffering. If nil is returned, the event is skipped.
type TranslateEventFunc func(
// into an event for buffering. If false is returned, the event is skipped.
type TranslateEventFunc[E rangefeedbuffer.Event] func(
context.Context, *kvpb.RangeFeedValue,
) rangefeedbuffer.Event
) (_ E, ok bool)

// OnUpdateFunc is used by the client to receive an Update, which is a batch
// of events which represent either a CompleteUpdate or an IncrementalUpdate.
type OnUpdateFunc func(context.Context, Update)
type OnUpdateFunc[E rangefeedbuffer.Event] func(context.Context, Update[E])

// Update corresponds to a set of events derived from the underlying RangeFeed.
type Update struct {
type Update[E rangefeedbuffer.Event] struct {
Type UpdateType
Timestamp hlc.Timestamp
Events []rangefeedbuffer.Event
Events []E
}

// TestingKnobs allows tests to inject behavior into the Watcher.
Expand Down Expand Up @@ -162,19 +162,19 @@ var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil)
// Callers can control whether the values passed to translateEvent carry a
// populated PrevValue using the withPrevValue parameter. See
// rangefeed.WithDiff for more details.
func NewWatcher(
func NewWatcher[E rangefeedbuffer.Event](
name redact.SafeString,
clock *hlc.Clock,
rangeFeedFactory *rangefeed.Factory,
bufferSize int,
spans []roachpb.Span,
withPrevValue bool,
withRowTSInInitialScan bool,
translateEvent TranslateEventFunc,
onUpdate OnUpdateFunc,
translateEvent TranslateEventFunc[E],
onUpdate OnUpdateFunc[E],
knobs *TestingKnobs,
) *Watcher {
w := &Watcher{
) *Watcher[E] {
w := &Watcher[E]{
name: name,
clock: clock,
rangefeedFactory: rangeFeedFactory,
Expand All @@ -194,7 +194,9 @@ func NewWatcher(

// Start calls Run on the Watcher as an async task with exponential backoff.
// If the Watcher ran for what is deemed long enough, the backoff is reset.
func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func(error)) error {
func Start[E rangefeedbuffer.Event](
ctx context.Context, stopper *stop.Stopper, c *Watcher[E], onError func(error),
) error {
return stopper.RunAsyncTask(ctx, string(c.name), func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
Expand Down Expand Up @@ -231,7 +233,7 @@ func Start(ctx context.Context, stopper *stop.Stopper, c *Watcher, onError func(
// This is a blocking operation, returning only when the context is canceled,
// or when an error occurs. For the latter, it's expected that callers will
// re-run the watcher.
func (s *Watcher) Run(ctx context.Context) error {
func (s *Watcher[E]) Run(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
log.Fatal(ctx, "currently started: only allowed once at any point in time")
}
Expand All @@ -252,7 +254,7 @@ func (s *Watcher) Run(ctx context.Context) error {
// transparently query the backing table if the record requested is not found.
// We could also have the initial scan operate in chunks, handing off results
// to the caller incrementally, all within the "initial scan" phase.
buffer := rangefeedbuffer.New(math.MaxInt)
buffer := rangefeedbuffer.New[E](math.MaxInt)
frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error)
mu := struct { // serializes access between the rangefeed and the main thread here
syncutil.Mutex
Expand All @@ -266,8 +268,8 @@ func (s *Watcher) Run(ctx context.Context) error {
}()

onValue := func(ctx context.Context, ev *kvpb.RangeFeedValue) {
bEv := s.translateEvent(ctx, ev)
if bEv == nil {
bEv, ok := s.translateEvent(ctx, ev)
if !ok {
return
}

Expand Down Expand Up @@ -375,14 +377,14 @@ var restartErr = errors.New("testing restart requested")
// it to restart. This is separate from the testing knob so that we
// can force a restart from test infrastructure without overriding the
// user-provided testing knobs.
func (s *Watcher) TestingRestart() {
func (s *Watcher[E]) TestingRestart() {
s.restartErrCh <- restartErr
}

func (s *Watcher) handleUpdate(
ctx context.Context, buffer *rangefeedbuffer.Buffer, ts hlc.Timestamp, updateType UpdateType,
func (s *Watcher[E]) handleUpdate(
ctx context.Context, buffer *rangefeedbuffer.Buffer[E], ts hlc.Timestamp, updateType UpdateType,
) {
s.onUpdate(ctx, Update{
s.onUpdate(ctx, Update[E]{
Type: updateType,
Timestamp: ts,
Events: buffer.Flush(ctx, ts),
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -56,11 +55,11 @@ func TestWatchAuthErr(t *testing.T) {
[]roachpb.Span{hostScratchSpan},
false, /* withPrevValue */
true, /* withRowTSInInitialScan */
func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event {
func(ctx context.Context, kv *kvpb.RangeFeedValue) (*kvpb.RangeFeedValue, bool) {
t.Fatalf("rangefeed should fail before producing results")
return nil
return nil, false
},
func(ctx context.Context, update rangefeedcache.Update) {
func(ctx context.Context, update rangefeedcache.Update[*kvpb.RangeFeedValue]) {
t.Fatalf("rangefeed should fail before producing results")
},
&rangefeedcache.TestingKnobs{})
Expand Down
Loading

0 comments on commit c52d468

Please sign in to comment.