From 83d4a2049cb63e413e61d879572ccecebe46f69c Mon Sep 17 00:00:00 2001 From: Bryan White Date: Sat, 14 Oct 2023 01:04:33 +0200 Subject: [PATCH] test: fix & add observable tests --- pkg/observable/channel/observable_test.go | 282 +++++++++++++++++----- pkg/observable/channel/observer_test.go | 10 +- 2 files changed, 232 insertions(+), 60 deletions(-) diff --git a/pkg/observable/channel/observable_test.go b/pkg/observable/channel/observable_test.go index b822c1fde..186540fb4 100644 --- a/pkg/observable/channel/observable_test.go +++ b/pkg/observable/channel/observable_test.go @@ -3,9 +3,11 @@ package channel_test import ( "context" "fmt" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -14,7 +16,8 @@ import ( ) const ( - notifyTimeout = 100 * time.Millisecond + productionDelay = 10 * time.Millisecond + notifyTimeout = productionDelay * 4 unsubscribeSleepDuration = notifyTimeout * 2 ) @@ -28,8 +31,8 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { } inputs := []int{123, 456, 789} - queuedProducer := make(chan *int, 1) - nonEmptyBufferedProducer := make(chan *int, 1) + fullBlockingProducer := make(chan *int) + fullBufferedProducer := make(chan *int, 1) tests := []test{ { @@ -45,8 +48,8 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { expectedOutputs: inputs, }, { - name: "queued non-buffered producer", - producer: queuedProducer, + name: "full non-buffered producer", + producer: fullBlockingProducer, inputs: inputs[1:], expectedOutputs: inputs, setupFn: func(t test) { @@ -63,8 +66,8 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { expectedOutputs: inputs, }, { - name: "non-empty buffered producer", - producer: nonEmptyBufferedProducer, + name: "full buffered len 1 producer", + producer: fullBufferedProducer, inputs: inputs[1:], expectedOutputs: inputs, setupFn: func(t test) { @@ -74,7 +77,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { }, } - for _, tt := range tests[:] { + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.setupFn != nil { tt.setupFn(tt) @@ -86,64 +89,70 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { t.Cleanup(cancel) t.Logf("producer: %p", tt.producer) - testObs, producer := channel.NewObservable[*int]( + obsvbl, producer := channel.NewObservable[*int]( channel.WithProducer(tt.producer), ) - require.NotNil(t, testObs) + require.NotNil(t, obsvbl) require.NotNil(t, producer) // construct 3 distinct observers, each with its own channel - observers := make([]observable.Observer[*int], 3) + observers := make([]observable.Observer[*int], 1) for i := range observers { - observers[i] = testObs.Subscribe(ctx) + observers[i] = obsvbl.Subscribe(ctx) } group, ctx := errgroup.WithContext(ctx) - notifiedOrTimedOut := func(sub observable.Observer[*int]) func() error { - var outputIndex int - return func() error { - for { - select { - case output, ok := <-sub.Ch(): - if !ok { - return nil - } - - // observer channel should receive notified input - t.Logf("output: %d | %p", *output, output) - require.Equal(t, tt.expectedOutputs[outputIndex], *output) - outputIndex++ - case <-time.After(notifyTimeout): - return fmt.Errorf("timed out waiting for observer to be notified") - } + + // ensure all obsvr channels are notified + for obsvrIdx, obsvr := range observers { + next := func(outputIndex int, output *int) error { + // obsvr channel should receive notified input + t.Logf("output: %d | %p", *output, output) + if !assert.Equalf( + t, tt.expectedOutputs[outputIndex], + *output, + "obsvr Idx: %d", obsvrIdx, + ) { + return fmt.Errorf("unexpected output") } + return nil + } + + done := func(outputs []*int) error { + if !assert.Equalf( + t, len(tt.expectedOutputs), + len(outputs), + "obsvr addr: %p", obsvr, + ) { + return fmt.Errorf("unexpected number of outputs") + } + return nil } - } - // ensure all observer channels are notified - for _, observer := range observers { // concurrently await notification or timeout to avoid blocking on // empty and/or non-buffered producers. - group.Go(notifiedOrTimedOut(observer)) + group.Go(goNotifiedOrTimedOutFactory(obsvr, next, done)) } // notify with test input t.Logf("sending to producer %p", producer) - for i, input := range tt.inputs[:] { + for i, input := range tt.inputs { inputPtr := new(int) *inputPtr = input t.Logf("sending input ptr: %d %p", input, inputPtr) + time.Sleep(productionDelay) producer <- inputPtr + time.Sleep(productionDelay) t.Logf("send input %d", i) } cancel() - // wait for testObs to be notified or timeout + // wait for obsvbl to be notified or timeout err := group.Wait() require.NoError(t, err) t.Log("errgroup done") - // unsubscribing should close observer channel(s) + // unsubscribing should close obsvr channel(s) for i, observer := range observers { observer.Unsubscribe() t.Logf("unsusbscribed %d", i) @@ -157,21 +166,10 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { } } -// TECHDEBT/INCOMPLETE: add coverage for multiple observers, unsubscribe from one -// and ensure the rest are still notified. -func TestChannelObservable_IndependentObservers(t *testing.T) { - t.Skip("add coverage: unsubscribing one observer should not impact the rest") -} - -// TECHDEBT\INCOMPLETE: add coverage for active observers closing when producer closes. -func TestChannelObservable_ObserversCloseOnProducerClose(t *testing.T) { - t.Skip("add coverage: all observers should close when producer closes") -} - func TestChannelObservable_UnsubscribeObservers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - testObs, producer := channel.NewObservable[int]() - require.NotNil(t, testObs) + obsvbl, producer := channel.NewObservable[int]() + require.NotNil(t, obsvbl) require.NotNil(t, producer) type test struct { @@ -183,7 +181,7 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { { name: "nil context", lifecycleFn: func() observable.Observer[int] { - observer := testObs.Subscribe(nil) + observer := obsvbl.Subscribe(nil) observer.Unsubscribe() return observer }, @@ -191,7 +189,7 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { { name: "only unsubscribe", lifecycleFn: func() observable.Observer[int] { - observer := testObs.Subscribe(ctx) + observer := obsvbl.Subscribe(ctx) observer.Unsubscribe() return observer }, @@ -199,7 +197,7 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { { name: "only cancel", lifecycleFn: func() observable.Observer[int] { - observer := testObs.Subscribe(ctx) + observer := obsvbl.Subscribe(ctx) cancel() return observer }, @@ -207,7 +205,7 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { { name: "cancel then unsubscribe", lifecycleFn: func() observable.Observer[int] { - observer := testObs.Subscribe(ctx) + observer := obsvbl.Subscribe(ctx) cancel() time.Sleep(unsubscribeSleepDuration) observer.Unsubscribe() @@ -217,7 +215,7 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { { name: "unsubscribe then cancel", lifecycleFn: func() observable.Observer[int] { - observer := testObs.Subscribe(ctx) + observer := obsvbl.Subscribe(ctx) observer.Unsubscribe() time.Sleep(unsubscribeSleepDuration) cancel() @@ -247,7 +245,6 @@ func drainCh[V any](ch <-chan V) (closed bool, err error) { case _, ok := <-ch: if !ok { return true, nil - return } continue default: @@ -255,3 +252,178 @@ func drainCh[V any](ch <-chan V) (closed bool, err error) { } } } + +func TestChannelObservable_ConcurrentSubUnSub(t *testing.T) { + t.Skip("add coverage: subscribing and unsubscribing concurrently should not race") +} + +// TECHDEBT/INCOMPLETE: add coverage for multiple observers, unsubscribe from one +// and ensure the rest are still notified. +// TODO_THIS_COMMIT: consider renaming, also has to do with sequential notificaions +func TestChannelObservable_ObserversUnsubscribeIndependently(t *testing.T) { + //t.Skip("add coverage: unsubscribing one observer should not impact the rest") + + observations := new([]*observation[int]) + expectedNotifications := [][]int{ + {123, 456, 789}, + {456, 789, 987}, + {789, 987, 654}, + {987, 654, 321}, + } + + //type observation struct { + // observerIndex int + // notifications []int + //} + + obsvbl, producer := channel.NewObservable[int]() + require.NotNil(t, obsvbl) + require.NotNil(t, producer) + produceWithDelay := syncSendWithDelayFactory(producer, productionDelay) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + observation0 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation0) + go goReceiveNotifications(observation0) + produceWithDelay(123) + + observation1 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation1) + go goReceiveNotifications(observation1) + produceWithDelay(456) + + observation2 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation2) + go goReceiveNotifications(observation2) + produceWithDelay(789) + + observation3 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation3) + go goReceiveNotifications(observation3) + + observation0.Unsubscribe() + produceWithDelay(987) + + observation1.Unsubscribe() + produceWithDelay(654) + + observation2.Unsubscribe() + produceWithDelay(321) + + observation3.Unsubscribe() + + for obsnIdx, obsrvn := range *observations { + t.Run(fmt.Sprintf("observation%d", obsnIdx), func(t *testing.T) { + msg := "observer %d channel left open" + select { + case _, ok := <-obsrvn.Ch(): + require.Falsef(t, ok, msg, obsnIdx) + default: + t.Fatalf(msg, obsnIdx) + } + + obsrvn.Lock() + defer obsrvn.Unlock() + require.Equalf( + t, len(expectedNotifications[obsnIdx]), + len(*obsrvn.Notifications), + "observation index: %d, expected: %+v, actual: %+v", + obsnIdx, expectedNotifications[obsnIdx], *obsrvn.Notifications, + ) + for notificationIdx, expected := range expectedNotifications[obsnIdx] { + require.Equalf( + t, expected, + (*obsrvn.Notifications)[notificationIdx], + "allExpected: %+v, allActual: %+v", + expectedNotifications[obsnIdx], *obsrvn.Notifications, + ) + } + }) + } +} + +// TECHDEBT/INCOMPLETE: add coverage for active observers closing when producer closes. +func TestChannelObservable_ObserversCloseOnProducerClose(t *testing.T) { + t.Skip("add coverage: all observers should close when producer closes") +} + +// TECHDEBT/INCOMPLETE: add coverage for observers (un)subscribing in-between notifications. +func TestChannelObservable_ObserversSubscribeSerially(t *testing.T) { + t.Skip("add coverage: observers which subscribe in-between notifications should receive specific value ranges") +} + +type observation[V any] struct { + sync.Mutex + observable.Observer[V] + Notifications *[]V +} + +func newObservation[V any]( + ctx context.Context, + observable observable.Observable[V], +) *observation[V] { + return &observation[V]{ + Observer: observable.Subscribe(ctx), + Notifications: new([]V), + } +} + +func (o *observation[V]) notify(value V) { + o.Lock() + defer o.Unlock() + + *o.Notifications = append(*o.Notifications, value) +} + +func goReceiveNotifications[V any](obsvn *observation[V]) { + for notification := range obsvn.Ch() { + obsvn.notify(notification) + } +} + +func syncSendWithDelayFactory[V any](producer chan<- V, delay time.Duration) func(value V) { + return func(value V) { + time.Sleep(delay) + producer <- value + time.Sleep(delay) + } +} + +func goNotifiedOrTimedOutFactory[V any]( + obsvr observable.Observer[V], + next func(index int, output V) error, + done func(outputs []V) error, +) func() error { + var ( + outputIndex int + outputs []V + ) + return func() error { + for { + select { + case output, ok := <-obsvr.Ch(): + if !ok { + break + } + + if err := next(outputIndex, output); err != nil { + return err + } + + outputs = append(outputs, output) + outputIndex++ + continue + case <-time.After(notifyTimeout): + return fmt.Errorf("timed out waiting for observer to be notified") + } + + if err := done(outputs); err != nil { + return err + } + return nil + } + + } +} diff --git a/pkg/observable/channel/observer_test.go b/pkg/observable/channel/observer_test.go index b70e5bd9d..ffc105c17 100644 --- a/pkg/observable/channel/observer_test.go +++ b/pkg/observable/channel/observer_test.go @@ -12,7 +12,7 @@ func TestObserver_Unsubscribe(t *testing.T) { onUnsubscribeCalled = false inputCh = make(chan int, 1) ) - sub := &channelObserver[int]{ + obsvr := &channelObserver[int]{ observerMu: &sync.RWMutex{}, // using a buffered channel to keep the test synchronous observerCh: inputCh, @@ -22,13 +22,13 @@ func TestObserver_Unsubscribe(t *testing.T) { } // should initially be open - require.Equal(t, false, sub.closed) + require.Equal(t, false, obsvr.closed) inputCh <- 1 - require.Equal(t, false, sub.closed) + require.Equal(t, false, obsvr.closed) - sub.Unsubscribe() + obsvr.Unsubscribe() // should be closed after `#Unsubscribe()` - require.Equal(t, true, sub.closed) + require.Equal(t, true, obsvr.closed) require.True(t, onUnsubscribeCalled) }