diff --git a/docs/pkg/observable/README.md b/docs/pkg/observable/README.md index 9523ff457..3234b627b 100644 --- a/docs/pkg/observable/README.md +++ b/docs/pkg/observable/README.md @@ -1,6 +1,6 @@ -## `pkg/observable` Package +## `pocket/pkg/observable` Package -The `pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`. +The `pocket/pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`. ## Overview diff --git a/docs/template/pkg/README.md b/docs/template/pkg/README.md index c7f915e9a..44f41885a 100644 --- a/docs/template/pkg/README.md +++ b/docs/template/pkg/README.md @@ -1,6 +1,3 @@ -Certainly! I've added a section named "Architecture Diagrams" in the documentation template below: - -```markdown # Package [PackageName] > Brief one-liner or quote about what this package does. @@ -16,11 +13,21 @@ Provide a few sentences about the purpose and functionality of this package. Con Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package: -![Architecture Overview](./path-to-diagram1.png) +```mermaid +--- +title: Architecture Overview +--- +flowchart +``` > **Figure 1**: Brief description about what this diagram represents. -![Another Diagram](./path-to-diagram2.png) +```mermaid +--- +title: Another Diagram +--- +flowchart +``` > **Figure 2**: Brief description about what this other diagram represents. diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index a173d8072..8e17ad9fb 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -2,8 +2,9 @@ package channel import ( "context" - "pocket/pkg/observable" "sync" + + "pocket/pkg/observable" ) // TODO_DISCUSS: what should this be? should it be configurable? It seems to be most @@ -50,7 +51,7 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V } // start listening to the publishCh and emit values to observers - go obs.goPublish(obs.publishCh) + go obs.goPublish() return obs, obs.publishCh } @@ -63,6 +64,15 @@ func WithPublisher[V any](publishCh chan V) option[V] { } } +// Next synchronously returns the next value from the observable. +func (obsvbl *channelObservable[V]) Next(ctx context.Context) V { + tempObserver := obsvbl.Subscribe(ctx) + defer tempObserver.Unsubscribe() + + val := <-tempObserver.Ch() + return val +} + // Subscribe returns an observer which is notified when the publishCh channel // receives a value. func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { @@ -110,8 +120,8 @@ func (obsvbl *channelObservable[V]) unsubscribeAll() { // goPublish to the publishCh and notify observers when values are received. // This function is blocking and should be run in a goroutine. -func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) { - for notification := range publisher { +func (obsvbl *channelObservable[V]) goPublish() { + for notification := range obsvbl.publishCh { // Copy currentObservers to avoid holding the lock while notifying them. // New or existing Observers may (un)subscribe while this notification // is being fanned out. @@ -154,9 +164,12 @@ func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserve // goUnsubscribeOnDone unsubscribes from the subscription when the context is done. // It is a blocking function and intended to be called in a goroutine. -func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) { +func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) { <-ctx.Done() - subscription.Unsubscribe() + if observer.IsClosed() { + return + } + observer.Unsubscribe() } // onUnsubscribe returns a function that removes a given observer from the diff --git a/pkg/observable/channel/observable_test.go b/pkg/observable/channel/observable_test.go index c6f27929f..6ec301cfa 100644 --- a/pkg/observable/channel/observable_test.go +++ b/pkg/observable/channel/observable_test.go @@ -38,7 +38,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { tests := []test{ { - name: "nil publisher", + name: "nil publisher (default buffer size)", publishCh: nil, inputs: inputs, expectedOutputs: inputs, @@ -55,7 +55,13 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { inputs: inputs, expectedOutputs: inputs, }, - // TODO_INCOMPLETE: publisher channels which are full are proving harder to test + { + name: "empty buffered len 1000 publisher", + publishCh: make(chan int, 1000), + inputs: inputs, + expectedOutputs: inputs, + }, + // TODO_INCOMPLETE(#81): publisher channels which are full are proving harder to test // robustly (no flakiness); perhaps it has to do with the lack of some // kind of guarantee about the receiver order on the consumer side. // @@ -156,7 +162,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { err := group.Wait() require.NoError(t, err) - // unsubscribing should unsubscribeAll obsvr channel(s) + // unsubscribing should close observer channel(s) for _, observer := range observers { observer.Unsubscribe() @@ -205,6 +211,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { }, }, { + // NOTE: this will log a warning that can be ignored: + // > redundant unsubscribe: observer is closed name: "cancel then unsubscribe", lifecycleFn: func() observable.Observer[int] { observer := obsvbl.Subscribe(ctx) @@ -215,6 +223,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { }, }, { + // NOTE: this will log a warning that can be ignored: + // > redundant unsubscribe: observer is closed name: "unsubscribe then cancel", lifecycleFn: func() observable.Observer[int] { observer := obsvbl.Subscribe(ctx) diff --git a/pkg/observable/channel/observer.go b/pkg/observable/channel/observer.go index 7bd7ddbd3..f90bec563 100644 --- a/pkg/observable/channel/observer.go +++ b/pkg/observable/channel/observer.go @@ -69,6 +69,15 @@ func (obsvr *channelObserver[V]) Ch() <-chan V { return obsvr.observerCh } +// IsClosed returns true if the observer has been unsubscribed. +// A closed observer cannot be reused. +func (obsvr *channelObserver[V]) IsClosed() bool { + obsvr.observerMu.Lock() + defer obsvr.observerMu.Unlock() + + return obsvr.isClosed +} + // unsubscribe closes the subscription channel, marks the observer as isClosed, and // removes the subscription from its observable's observers list via onUnsubscribe. func (obsvr *channelObserver[V]) unsubscribe() { diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index 3d4894339..452c18dcd 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -11,7 +11,12 @@ import "context" // notified of new values asynchronously. // It is analogous to a publisher in a "Fan-Out" system design. type Observable[V any] interface { + // Next synchronously returns the next value from the observable. + Next(context.Context) V + // Subscribe returns an observer which is notified when the publishCh channel + // receives a value. Subscribe(context.Context) Observer[V] + // UnsubscribeAll unsubscribes and removes all observers from the observable. UnsubscribeAll() } @@ -19,6 +24,12 @@ type Observable[V any] interface { // channel and allows unsubscribing from an Observable. // It is analogous to a subscriber in a "Fan-Out" system design. type Observer[V any] interface { + // Unsubscribe closes the subscription channel and removes the subscription from + // the observable. Unsubscribe() + // Ch returns a receive-only subscription channel. Ch() <-chan V + // IsClosed returns true if the observer has been unsubscribed. + // A closed observer cannot be reused. + IsClosed() bool }