Skip to content

Commit

Permalink
refactor: renaming
Browse files Browse the repository at this point in the history
- `notifiable` pkg to `channel`
- `notifiableObservable` struct to `channelObservable`
- `observer` struct to `channelObserver`
- `notifier` vars to `producer`
- `notifee` vars to `observable` (or similar)
  • Loading branch information
bryanchriswhite committed Oct 13, 2023
1 parent 6fe991d commit 59b4b94
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notifiable
package channel

import (
"context"
Expand All @@ -8,52 +8,50 @@ import (
"pocket/pkg/observable"
)

var _ observable.Observable[any] = &notifiableObservable[any]{}
var _ observable.Observable[any] = &channelObservable[any]{}

type option[V any] func(obs *notifiableObservable[V])
type option[V any] func(obs *channelObservable[V])

// notifiableObservable implements the observable.Observable interface and can be notified
// via its corresponding notifier channel.
type notifiableObservable[V any] struct {
notifier chan V // private channel that is used to emit values to observers
// channelObservable implements the observable.Observable interface and can be notified
// via its corresponding producer channel.
type channelObservable[V any] struct {
producer chan V // private channel that is used to emit values to observers
observersMu sync.RWMutex
// TODO_THIS_COMMIT: update comment(s)
// TODO_THIS_COMMIT: consider using interface type
observers *[]*observer[V] // observers is a list of channels that are subscribed to the observable
observers *[]*channelObserver[V] // observers is a list of channels that are subscribed to the observable
}

// NewObservable creates a new observable is notified when the notifier channel
// NewObservable creates a new observable is notified when the producer channel
// receives a value.
// func NewObservable[V any](notifier chan V) (observable.Observable[V], chan<- V) {
// func NewObservable[V any](producer chan V) (observable.Observable[V], chan<- V) {
func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V) {
observable := &notifiableObservable[V]{
obs := &channelObservable[V]{
observersMu: sync.RWMutex{},
observers: new([]*observer[V]),
observers: new([]*channelObserver[V]),
}

for _, opt := range opts {
opt(observable)
opt(obs)
}

// If the caller does not provide a notifier, create a new one and return it
if observable.notifier == nil {
observable.notifier = make(chan V)
// If the caller does not provide a producer, create a new one and return it
if obs.producer == nil {
obs.producer = make(chan V)
}

// Start listening to the notifier and emit values to observers
go observable.goListen(observable.notifier)
// Start listening to the producer and emit values to observers
go obs.goListen(obs.producer)

return observable, observable.notifier
return obs, obs.producer
}

func WithNotifier[V any](notifier chan V) option[V] {
return func(obs *notifiableObservable[V]) {
obs.notifier = notifier
func WithProducer[V any](producer chan V) option[V] {
return func(obs *channelObservable[V]) {
obs.producer = producer
}
}

// Subscribe returns an observer which is notified when notifier receives.
func (obs *notifiableObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
// Subscribe returns an observer which is notified when producer receives.
func (obs *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
obs.observersMu.Lock()
defer func() {
obs.observersMu.Unlock()
Expand All @@ -70,31 +68,31 @@ func (obs *notifiableObservable[V]) Subscribe(ctx context.Context) observable.Ob
return observer
}

func (obs *notifiableObservable[V]) Close() {
func (obs *channelObservable[V]) Close() {
obs.close()
}

// TODO_THIS_COMMIT: decide whether this closes the notifier channel; perhaps not
// TODO_THIS_COMMIT: decide whether this closes the producer channel; perhaps not
// at oll or only if it was provided...
func (obs *notifiableObservable[V]) close() {
func (obs *channelObservable[V]) close() {
obs.observersMu.RLock()
observers := *obs.observers
obs.observersMu.RUnlock()

for _, sub := range observers {
fmt.Printf("notifiableObservable#goListen: unsubscribing %p\n", sub)
fmt.Printf("channelObservable#goListen: unsubscribing %p\n", sub)
sub.Unsubscribe()
}

obs.observersMu.Lock()
defer obs.observersMu.Unlock()
obs.observers = new([]*observer[V])
obs.observers = new([]*channelObserver[V])
}

// goListen to the notifier and notify observers when values are received. This
// goListen to the producer and notify observers when values are received. This
// function is blocking and should be run in a goroutine.
func (obs *notifiableObservable[V]) goListen(notifier <-chan V) {
for notification := range notifier {
func (obs *channelObservable[V]) goListen(producer <-chan V) {
for notification := range producer {
obs.observersMu.RLock()
observers := *obs.observers
obs.observersMu.RUnlock()
Expand All @@ -107,7 +105,7 @@ func (obs *notifiableObservable[V]) goListen(notifier <-chan V) {
}
}

// Here we know that the notifier has been closed, all observers should be closed as well
// Here we know that the producer has been closed, all observers should be closed as well
obs.close()
}

Expand All @@ -118,10 +116,10 @@ func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Obs
subscription.Unsubscribe()
}

// onUnsubscribeFactory returns a function that removes a given observer from the
// onUnsubscribeFactory returns a function that removes a given channelObserver from the
// observable's list of observers.
func (obs *notifiableObservable[V]) onUnsubscribeFactory() UnsubscribeFunc[V] {
return func(toRemove *observer[V]) {
func (obs *channelObservable[V]) onUnsubscribeFactory() UnsubscribeFunc[V] {
return func(toRemove *channelObserver[V]) {
obs.observersMu.Lock()
defer obs.observersMu.Unlock()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notifiable_test
package channel_test

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
"golang.org/x/sync/errgroup"

"pocket/pkg/observable"
"pocket/pkg/observable/notifiable"
"pocket/pkg/observable/channel"
)

const (
Expand All @@ -21,55 +21,55 @@ const (
func TestNewObservable_NotifyObservers(t *testing.T) {
type test struct {
name string
notifier chan *int
producer chan *int
inputs []int
expectedOutputs []int
setupFn func(t test)
}

inputs := []int{123, 456, 789}
queuedNotifier := make(chan *int, 1)
nonEmptyBufferedNotifier := make(chan *int, 1)
queuedProducer := make(chan *int, 1)
nonEmptyBufferedProducer := make(chan *int, 1)

tests := []test{
{
name: "nil notifier",
notifier: nil,
name: "nil producer",
producer: nil,
inputs: inputs,
expectedOutputs: inputs,
},
{
name: "empty non-buffered notifier",
notifier: make(chan *int),
name: "empty non-buffered producer",
producer: make(chan *int),
inputs: inputs,
expectedOutputs: inputs,
},
{
name: "queued non-buffered notifier",
notifier: queuedNotifier,
name: "queued non-buffered producer",
producer: queuedProducer,
inputs: inputs[1:],
expectedOutputs: inputs,
setupFn: func(t test) {
go func() {
// blocking send
t.notifier <- &inputs[0]
t.producer <- &inputs[0]
}()
},
},
{
name: "empty buffered len 1 notifier",
notifier: make(chan *int, 1),
name: "empty buffered len 1 producer",
producer: make(chan *int, 1),
inputs: inputs,
expectedOutputs: inputs,
},
{
name: "non-empty buffered notifier",
notifier: nonEmptyBufferedNotifier,
name: "non-empty buffered producer",
producer: nonEmptyBufferedProducer,
inputs: inputs[1:],
expectedOutputs: inputs,
setupFn: func(t test) {
// non-blocking send
t.notifier <- &inputs[0]
t.producer <- &inputs[0]
},
},
}
Expand All @@ -85,17 +85,17 @@ func TestNewObservable_NotifyObservers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Logf("notifier: %p", tt.notifier)
notifee, notifier := notifiable.NewObservable[*int](
notifiable.WithNotifier(tt.notifier),
t.Logf("producer: %p", tt.producer)
testObs, producer := channel.NewObservable[*int](
channel.WithProducer(tt.producer),
)
require.NotNil(t, notifee)
require.NotNil(t, notifier)
require.NotNil(t, testObs)
require.NotNil(t, producer)

// construct 3 distinct observers, each with its own channel
observers := make([]observable.Observer[*int], 3)
for i := range observers {
observers[i] = notifee.Subscribe(ctx)
observers[i] = testObs.Subscribe(ctx)
}

group, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -124,22 +124,22 @@ func TestNewObservable_NotifyObservers(t *testing.T) {
// ensure all observer channels are notified
for _, observer := range observers {
// concurrently await notification or timeout to avoid blocking on
// empty and/or non-buffered notifiers.
// empty and/or non-buffered producers.
group.Go(notifiedOrTimedOut(observer))
}

// notify with test input
t.Logf("sending to notifier %p", notifier)
t.Logf("sending to producer %p", producer)
for i, input := range tt.inputs[:] {
inputPtr := new(int)
*inputPtr = input
t.Logf("sending input ptr: %d %p", input, inputPtr)
notifier <- inputPtr
producer <- inputPtr
t.Logf("send input %d", i)
}
cancel()

// wait for notifee to be notified or timeout
// wait for testObs to be notified or timeout
err := group.Wait()
require.NoError(t, err)
t.Log("errgroup done")
Expand All @@ -165,13 +165,13 @@ func TestNewObservable_NotifyObservers(t *testing.T) {
// TECHDEBT/INCOMPLETE: add coverage for multiple observers, unsubscribe from one
// and ensure the rest are still notified.

// TECHDEBT\INCOMPLETE: add coverage for active observers closing when notifier closes.
// TECHDEBT\INCOMPLETE: add coverage for active observers closing when producer closes.

func TestNewObservable_UnsubscribeObservers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
notifee, notifier := notifiable.NewObservable[int]()
require.NotNil(t, notifee)
require.NotNil(t, notifier)
testObs, producer := channel.NewObservable[int]()
require.NotNil(t, testObs)
require.NotNil(t, producer)

type test struct {
name string
Expand All @@ -182,31 +182,31 @@ func TestNewObservable_UnsubscribeObservers(t *testing.T) {
{
name: "nil context",
lifecycleFn: func() observable.Observer[int] {
observer := notifee.Subscribe(nil)
observer := testObs.Subscribe(nil)
observer.Unsubscribe()
return observer
},
},
{
name: "only unsubscribe",
lifecycleFn: func() observable.Observer[int] {
observer := notifee.Subscribe(ctx)
observer := testObs.Subscribe(ctx)
observer.Unsubscribe()
return observer
},
},
{
name: "only cancel",
lifecycleFn: func() observable.Observer[int] {
observer := notifee.Subscribe(ctx)
observer := testObs.Subscribe(ctx)
cancel()
return observer
},
},
{
name: "cancel then unsubscribe",
lifecycleFn: func() observable.Observer[int] {
observer := notifee.Subscribe(ctx)
observer := testObs.Subscribe(ctx)
cancel()
time.Sleep(unsubscribeSleepDuration)
observer.Unsubscribe()
Expand All @@ -216,7 +216,7 @@ func TestNewObservable_UnsubscribeObservers(t *testing.T) {
{
name: "unsubscribe then cancel",
lifecycleFn: func() observable.Observer[int] {
observer := notifee.Subscribe(ctx)
observer := testObs.Subscribe(ctx)
observer.Unsubscribe()
time.Sleep(unsubscribeSleepDuration)
cancel()
Expand Down
Loading

0 comments on commit 59b4b94

Please sign in to comment.