Skip to content

Commit

Permalink
chore: update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Oct 13, 2023
1 parent 59b4b94 commit 84ed42d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
26 changes: 18 additions & 8 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,27 @@ import (

var _ observable.Observable[any] = &channelObservable[any]{}

// option is a function which receives and can modify the channelObservable state.
type option[V any] func(obs *channelObservable[V])

// channelObservable implements the observable.Observable interface and can be notified
// via its corresponding producer channel.
type channelObservable[V any] struct {
producer chan V // private channel that is used to emit values to observers
// producer is an observable-wide channel that is used to receive values
// which are subsequently re-sent to observers.
producer chan V
// observersMu protects observers from concurrent access/updates
observersMu sync.RWMutex
observers *[]*channelObserver[V] // observers is a list of channels that are subscribed to the observable
// observers is a list of channelObservers that will be notified with producer
// receives a value.
observers *[]*channelObserver[V]
}

// NewObservable creates a new observable is notified when the producer channel
// receives a value.
// func NewObservable[V any](producer chan V) (observable.Observable[V], chan<- V) {
func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V) {
// initialize an observer that publishes messages from 1 producer to N observers
obs := &channelObservable[V]{
observersMu: sync.RWMutex{},
observers: new([]*channelObserver[V]),
Expand All @@ -33,24 +40,27 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V
opt(obs)
}

// If the caller does not provide a producer, create a new one and return it
// if the caller does not provide a producer, create a new one and return it
if obs.producer == nil {
obs.producer = make(chan V)
}

// Start listening to the producer and emit values to observers
// start listening to the producer and emit values to observers
go obs.goListen(obs.producer)

return obs, obs.producer
}

// WithProducer returns an option function sets the given producer in an observable
// when passed to NewObservable().
func WithProducer[V any](producer chan V) option[V] {
return func(obs *channelObservable[V]) {
obs.producer = producer
}
}

// Subscribe returns an observer which is notified when producer receives.
// Subscribe returns an observer which is notified when the producer channel
// receives a value.
func (obs *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
obs.observersMu.Lock()
defer func() {
Expand Down Expand Up @@ -98,9 +108,9 @@ func (obs *channelObservable[V]) goListen(producer <-chan V) {
obs.observersMu.RUnlock()

for _, sub := range observers {
// CONSIDERATION: perhaps try to avoid making this notification async
// as it effectively uses goroutines in memory as a buffer (with
// little control surface).
// CONSIDERATION: perhaps continue trying to avoid making this
// notification async as it would effectively use goroutines
// in memory as a buffer (with little control surface).
sub.notify(notification)
}
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/observable/channel/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ var _ observable.Observer[any] = &channelObserver[any]{}

// channelObserver implements the observable.Observer interface.
type channelObserver[V any] struct {
ctx context.Context
ctx context.Context
// onUnsubscribe is called in Observer#Unsubscribe, removing the respective
// observer from observers in a concurrency-safe manner.
onUnsubscribe func(toRemove *channelObserver[V])
// observerMu protects the observerCh and closed fields.
observerMu *sync.RWMutex
// observerCh is the channel that is used to emit values to the observer.
// I.e. on the "N" side of the 1:N relationship between observable and
// observer.
observerCh chan V
// TODO_THIS_COMMIT: add comment
onUnsubscribe func(toRemove *channelObserver[V])
closed bool
// closed indicates whether the observer has been closed. It's set in
// unsubscribe; closed observers can't be reused.
closed bool
}

type UnsubscribeFactory[V any] func() UnsubscribeFunc[V]
Expand Down
5 changes: 5 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package observable

import "context"

// NOTE: We explicitly decided to write a small and custom notifications package
// to keep logic simple and minimal. If the needs & requirements of this library ever
// grow, other packages (e.g. https://github.com/ReactiveX/RxGo) can be considered.
// (see: https://github.com/ReactiveX/RxGo/pull/377)

// Observable is a generic interface that allows multiple subscribers to be
// notified of new values asynchronously.
type Observable[V any] interface {
Expand Down

0 comments on commit 84ed42d

Please sign in to comment.