Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pkg] feat: add channel observable #31

Merged
merged 45 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8ac9ae1
feat: add notifiable observable
bryanchriswhite Oct 9, 2023
316db40
fixup: observable
bryanchriswhite Oct 10, 2023
e50b83c
refactor/fix: notifiable observable improvements
bryanchriswhite Oct 12, 2023
6fe991d
chore: more review improvements
bryanchriswhite Oct 12, 2023
59b4b94
refactor: renaming
bryanchriswhite Oct 13, 2023
84ed42d
chore: update comments
bryanchriswhite Oct 13, 2023
da475a4
refactor: simplify drainCh test helper
bryanchriswhite Oct 13, 2023
2fae1bf
test: fix timeout
bryanchriswhite Oct 13, 2023
5a4f244
test: rename observable test functions
bryanchriswhite Oct 13, 2023
c1d92be
test: add test TODOs
bryanchriswhite Oct 13, 2023
bf8bc8a
chore: update comments
bryanchriswhite Oct 13, 2023
8e17e83
refactor: simplify observable & observer
bryanchriswhite Oct 13, 2023
83d4a20
test: fix & add observable tests
bryanchriswhite Oct 13, 2023
c3b8cea
Merge remote-tracking branch 'pokt/main' into feat/observable
bryanchriswhite Oct 13, 2023
c035b5c
test: cleanup & comment observable tests
bryanchriswhite Oct 13, 2023
3a99221
fixup: observable
bryanchriswhite Oct 17, 2023
aaea7b9
fixup: observable test
bryanchriswhite Oct 17, 2023
678d25e
refactor: simplify & cleanup
bryanchriswhite Oct 17, 2023
631c453
chore: cleanup logs & comments
bryanchriswhite Oct 17, 2023
9340e82
chore: improve comments
bryanchriswhite Oct 18, 2023
b072eeb
refactor: DrainChannel test helper
bryanchriswhite Oct 18, 2023
a487fe0
shore: cleanup & simplify
bryanchriswhite Oct 18, 2023
8df9cbc
test: comment out flaky test cases
bryanchriswhite Oct 18, 2023
6c98752
Merge remote-tracking branch 'pokt/main' into feat/observable
bryanchriswhite Oct 18, 2023
466c508
fixup: drain channel helper
bryanchriswhite Oct 18, 2023
7a399f8
chore: improve var name
bryanchriswhite Oct 18, 2023
d27e114
fixup: drain channel helper
bryanchriswhite Oct 18, 2023
cf8507d
test: shorten timeout
bryanchriswhite Oct 18, 2023
c288595
chore: cleanup
bryanchriswhite Oct 18, 2023
a22f813
chore: cleanup, simplification, review improvements
bryanchriswhite Oct 19, 2023
bad5eef
chore: improve comments
bryanchriswhite Oct 19, 2023
c449f14
chore: improve comments
bryanchriswhite Oct 19, 2023
b97e6e2
refactor: rename `Observable#Close()` to `#UnsubscribeAll()`
bryanchriswhite Oct 19, 2023
bb6055e
chore: improve comments
bryanchriswhite Oct 19, 2023
70b3e59
chore: misc. review feedback improvements
bryanchriswhite Oct 19, 2023
26f1d13
chore: improve comment
bryanchriswhite Oct 19, 2023
c6fe88a
chore: review improvements
bryanchriswhite Oct 19, 2023
835bad0
chore: last minute improvements
bryanchriswhite Oct 19, 2023
ce3b055
Merge branch 'main' into feat/observable
bryanchriswhite Oct 19, 2023
194c59a
docs: add initial docs/pkg/observable docs
bryanchriswhite Oct 20, 2023
fe1b860
chore: add go package README.md template
bryanchriswhite Oct 20, 2023
0d2b090
chore: fix TODO comments
bryanchriswhite Oct 20, 2023
7d4a6a6
Merge remote-tracking branch 'pokt/main' into feat/observable
bryanchriswhite Oct 20, 2023
e60fd35
chore: move & rename pkg readme template
bryanchriswhite Oct 20, 2023
a75576e
test: refactor async test errors
bryanchriswhite Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package observable

bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
import "context"

// Observable is a generic interface that allows multiple subscribers to be
// notified of new values asynchronously.
type Observable[V any] interface {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
Subscribe(context.Context) Subscription[V]
}

// Subscription is a generic interface that provides access to the notified
// channel and allows unsubscribing from an observable.
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
type Subscription[V any] interface {
Unsubscribe()
Ch() <-chan V
}
93 changes: 93 additions & 0 deletions pkg/observable/notifiable/observable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package notifiable

import (
"context"
"sync"

"pocket/pkg/observable"
)

bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// Observable implements the observable.Observable interface and can be notified
// via its corresponding notifier channel.
type Observable[V any] struct {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
mu sync.RWMutex
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ch <-chan V // private channel that is used to emit values to subscribers
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
subscribers []chan V // subscribers is a list of channels that are subscribed to the observable
closed bool
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
}

// NewObservable creates a new observable is notified when the notifier channel
// receives a value.
func NewObservable[V any](notifier chan V) (observable.Observable[V], chan V) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// If the caller does not provide a notifier, create a new one and return it
if notifier == nil {
notifier = make(chan V)
}
notifee := &Observable[V]{sync.RWMutex{}, notifier, []chan V{}, false}
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

// Start listening to the notifier and emit values to subscribers
go notifee.listen(notifier)

return notifee, notifier
}

// Subscribe gets a subscription to the observable.
func (obs *Observable[V]) Subscribe(ctx context.Context) observable.Subscription[V] {
obs.mu.Lock()
defer obs.mu.Unlock()

// Create a channel for the subscriber and append it to the subscribers list
ch := make(chan V, 1)
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
obs.subscribers = append(obs.subscribers, ch)

// Removal function used when unsubscribing from the observable
removeFromObservable := func() {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
obs.mu.Lock()
defer obs.mu.Unlock()

for i, s := range obs.subscribers {
if ch == s {
obs.subscribers = append(obs.subscribers[:i], obs.subscribers[i+1:]...)
break
}
}
}

// Subscription gets its closed state from the observable
subscription := &Subscription[V]{ch, obs.closed, removeFromObservable}

go unsubscribeOnDone[V](ctx, subscription)
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

return subscription
}

// listen to the notifier and notify subscribers when values are received. This
// function is blocking and should be run in a goroutine.
func (obs *Observable[V]) listen(notifier <-chan V) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OPTIONAL NIT: Thoughts on renaming notifier to publisher given that we're using the subsctiber terminology?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it some thought and gave renaming Subscription to Observer a try. I also had this conversation w/ chatGPT and am now considering doing the following as well:

  1. rename notifier to producer
  2. rename the notifier package to channel
  3. rename notifiableObservable to channelObservable
  4. rename any references called notifee to observable (or something similar)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you still contemplating these? I'd personally be in support of not having the notifier/notifiee naming convention as it's too generic IMO but don't want to block things too much.

Also, read through your thread, for future reference, consider just giving it the code rather than explaining the types & names you have: https://chat.openai.com/share/d901a206-f152-4230-9c85-59ec08cde8fb

for v := range notifier {
// Lock for obs.subscribers slice as it can be modified by subscribers
obs.mu.RLock()
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
for _, ch := range obs.subscribers {
ch <- v
}
obs.mu.RUnlock()
}

// Here we know that the notifier has been closed, all subscribers should be closed as well
obs.mu.Lock()
obs.closed = true
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
for _, ch := range obs.subscribers {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
close(ch)
obs.subscribers = []chan V{}
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}
obs.mu.Lock()
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}

// unsubscribeOnDone unsubscribes from the subscription when the context is.
// It is blocking and intended to be called in a goroutine.
func unsubscribeOnDone[V any](ctx context.Context, subscription observable.Subscription[V]) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
if ctx != nil {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
<-ctx.Done()
subscription.Unsubscribe()
}
}
162 changes: 162 additions & 0 deletions pkg/observable/notifiable/observable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package notifiable_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"pocket/pkg/observable"
"pocket/pkg/observable/notifiable"
)

const (
notifyTimeout = 100 * time.Millisecond
unsubscribeSleepDuration = notifyTimeout * 2
)

func TestNewNotifiableObservable(t *testing.T) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
type test struct {
name string
notifier chan int
}

input := 123
nonEmptyBufferedNotifier := make(chan int, 1)
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
nonEmptyBufferedNotifier <- input

tests := []test{
{name: "nil notifier", notifier: nil},
{name: "empty non-buffered notifier", notifier: make(chan int)},
{name: "empty buffered len 1 notifier", notifier: make(chan int, 1)},
{name: "non-empty buffered notifier", notifier: nonEmptyBufferedNotifier},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
notifee, notifier := notifiable.NewObservable[int](tt.notifier)
require.NotNil(t, notifee)
require.NotNil(t, notifier)

// construct 3 distinct subscriptions, each with its own channel
subscriptions := make([]observable.Subscription[int], 3)
for i := range subscriptions {
subscriptions[i] = notifee.Subscribe(ctx)
}

group := errgroup.Group{}
notifiedOrTimedOut := func(subscriptionCh <-chan int) func() error {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
return func() error {
// subscriptionCh should receive notified input
select {
case output := <-subscriptionCh:
require.Equal(t, input, output)
case <-time.After(notifyTimeout):
return fmt.Errorf("timed out waiting for subscription to be notified")
}
return nil
}
}

// ensure all subscription channels are notified
for _, subscription := range subscriptions {
// concurrently await notification or timeout to avoid blocking on
// empty and/or non-buffered notifiers.
group.Go(notifiedOrTimedOut(subscription.Ch()))
}
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

// notify with test input
notifier <- input

// wait for notifee to be notified or timeout
err := group.Wait()
require.NoError(t, err)

// unsubscribing should close subscription channel(s)
for _, subscription := range subscriptions {
subscription.Unsubscribe()

select {
case <-subscription.Ch():
default:
t.Fatal("subscription channel left open")
}
}
})
}
}

func TestSubscription_Unsubscribe(t *testing.T) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
notifee, notifier := notifiable.NewObservable[int](nil)
require.NotNil(t, notifee)
require.NotNil(t, notifier)

tests := []struct {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
name string
lifecycleFn func() observable.Subscription[int]
}{
{
name: "nil context",
lifecycleFn: func() observable.Subscription[int] {
subscription := notifee.Subscribe(nil)
subscription.Unsubscribe()
return subscription
},
},
{
name: "only unsubscribe",
lifecycleFn: func() observable.Subscription[int] {
subscription := notifee.Subscribe(ctx)
subscription.Unsubscribe()
return subscription
},
},
{
name: "only cancel",
lifecycleFn: func() observable.Subscription[int] {
subscription := notifee.Subscribe(ctx)
cancel()
return subscription
},
},
{
name: "cancel then unsubscribe",
lifecycleFn: func() observable.Subscription[int] {
subscription := notifee.Subscribe(ctx)
cancel()
time.Sleep(unsubscribeSleepDuration)
subscription.Unsubscribe()
return subscription
},
},
{
name: "unsubscribe then cancel",
lifecycleFn: func() observable.Subscription[int] {
subscription := notifee.Subscribe(ctx)
subscription.Unsubscribe()
time.Sleep(unsubscribeSleepDuration)
cancel()
return subscription
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subscription := tt.lifecycleFn()

select {
case value, ok := <-subscription.Ch():
require.Empty(t, value)
require.False(t, ok)
case <-time.After(notifyTimeout):
t.Fatal("subscription channel left open")
}
})
}
}
29 changes: 29 additions & 0 deletions pkg/observable/notifiable/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package notifiable

import "pocket/pkg/observable"

var _ observable.Subscription[any] = &Subscription[any]{}

// Subscription implements the observable.Subscription interface.
type Subscription[V any] struct {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ch chan V
closed bool
removeFromObservable func()
}

// Unsubscribe closes the subscription channel and removes the subscription from
// the observable.
func (sub *Subscription[V]) Unsubscribe() {
if sub.closed {
return
}

close(sub.ch)
sub.closed = true
sub.removeFromObservable()
}

// Ch returns the subscription channel.
func (sub *Subscription[V]) Ch() <-chan V {
return sub.ch
}
Loading