From ad77da42d5e3fa06974642c8679dd20cd077e98a Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 20 Oct 2023 11:38:00 +0200 Subject: [PATCH 1/7] chore: add `Observer#IsClosed()` to prevent redundant unsubscription (cherry picked from commit 78a9946b3f14353e79b123919416903d4622da4d) --- pkg/observable/channel/observable.go | 7 +++++-- pkg/observable/channel/observer.go | 9 +++++++++ pkg/observable/interface.go | 1 + 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index a173d8072..06d2f52fd 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -154,9 +154,12 @@ func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserve // goUnsubscribeOnDone unsubscribes from the subscription when the context is done. // It is a blocking function and intended to be called in a goroutine. -func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) { +func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) { <-ctx.Done() - subscription.Unsubscribe() + if observer.IsClosed() { + return + } + observer.Unsubscribe() } // onUnsubscribe returns a function that removes a given observer from the diff --git a/pkg/observable/channel/observer.go b/pkg/observable/channel/observer.go index 7bd7ddbd3..f90bec563 100644 --- a/pkg/observable/channel/observer.go +++ b/pkg/observable/channel/observer.go @@ -69,6 +69,15 @@ func (obsvr *channelObserver[V]) Ch() <-chan V { return obsvr.observerCh } +// IsClosed returns true if the observer has been unsubscribed. +// A closed observer cannot be reused. +func (obsvr *channelObserver[V]) IsClosed() bool { + obsvr.observerMu.Lock() + defer obsvr.observerMu.Unlock() + + return obsvr.isClosed +} + // unsubscribe closes the subscription channel, marks the observer as isClosed, and // removes the subscription from its observable's observers list via onUnsubscribe. func (obsvr *channelObserver[V]) unsubscribe() { diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index 3d4894339..4005190f0 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -21,4 +21,5 @@ type Observable[V any] interface { type Observer[V any] interface { Unsubscribe() Ch() <-chan V + IsClosed() bool } From f88b070f434785668a5102f80160e2e1a44958ad Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 23 Oct 2023 14:04:22 +0200 Subject: [PATCH 2/7] chore: simplify channel observable (cherry picked from commit a2629c8bc3decfb5a787e453af67aa78fc8ca1ea) --- pkg/observable/channel/observable.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index 06d2f52fd..7b8e2d04e 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -2,8 +2,9 @@ package channel import ( "context" - "pocket/pkg/observable" "sync" + + "pocket/pkg/observable" ) // TODO_DISCUSS: what should this be? should it be configurable? It seems to be most @@ -50,7 +51,7 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V } // start listening to the publishCh and emit values to observers - go obs.goPublish(obs.publishCh) + go obs.goPublish() return obs, obs.publishCh } @@ -110,8 +111,8 @@ func (obsvbl *channelObservable[V]) unsubscribeAll() { // goPublish to the publishCh and notify observers when values are received. // This function is blocking and should be run in a goroutine. -func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) { - for notification := range publisher { +func (obsvbl *channelObservable[V]) goPublish() { + for notification := range obsvbl.publishCh { // Copy currentObservers to avoid holding the lock while notifying them. // New or existing Observers may (un)subscribe while this notification // is being fanned out. From dfb2aaefaaff18572c975340525b7ceff97698c6 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 20 Oct 2023 11:39:15 +0200 Subject: [PATCH 3/7] test: add case for publisher w/ large buffer size, comment, & cleanup (cherry picked from commit e97b691e39af8fa1654b8d697a3b34095b32ed82) --- pkg/observable/channel/observable_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/observable/channel/observable_test.go b/pkg/observable/channel/observable_test.go index c6f27929f..6ec301cfa 100644 --- a/pkg/observable/channel/observable_test.go +++ b/pkg/observable/channel/observable_test.go @@ -38,7 +38,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { tests := []test{ { - name: "nil publisher", + name: "nil publisher (default buffer size)", publishCh: nil, inputs: inputs, expectedOutputs: inputs, @@ -55,7 +55,13 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { inputs: inputs, expectedOutputs: inputs, }, - // TODO_INCOMPLETE: publisher channels which are full are proving harder to test + { + name: "empty buffered len 1000 publisher", + publishCh: make(chan int, 1000), + inputs: inputs, + expectedOutputs: inputs, + }, + // TODO_INCOMPLETE(#81): publisher channels which are full are proving harder to test // robustly (no flakiness); perhaps it has to do with the lack of some // kind of guarantee about the receiver order on the consumer side. // @@ -156,7 +162,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { err := group.Wait() require.NoError(t, err) - // unsubscribing should unsubscribeAll obsvr channel(s) + // unsubscribing should close observer channel(s) for _, observer := range observers { observer.Unsubscribe() @@ -205,6 +211,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { }, }, { + // NOTE: this will log a warning that can be ignored: + // > redundant unsubscribe: observer is closed name: "cancel then unsubscribe", lifecycleFn: func() observable.Observer[int] { observer := obsvbl.Subscribe(ctx) @@ -215,6 +223,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) { }, }, { + // NOTE: this will log a warning that can be ignored: + // > redundant unsubscribe: observer is closed name: "unsubscribe then cancel", lifecycleFn: func() observable.Observer[int] { observer := obsvbl.Subscribe(ctx) From 0ea010ac63ff6bdde9e763673b6c471e67f79944 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 20 Oct 2023 11:39:58 +0200 Subject: [PATCH 4/7] docs: update observable pkg README.md (cherry picked from commit d5442c7062630d847e048850fa71806086f84172) --- docs/pkg/observable/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/pkg/observable/README.md b/docs/pkg/observable/README.md index 9523ff457..3234b627b 100644 --- a/docs/pkg/observable/README.md +++ b/docs/pkg/observable/README.md @@ -1,6 +1,6 @@ -## `pkg/observable` Package +## `pocket/pkg/observable` Package -The `pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`. +The `pocket/pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`. ## Overview From 17b0ce56c35d21a7863609b568176193e61c39bf Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 23 Oct 2023 17:41:48 +0200 Subject: [PATCH 5/7] doc: fix pkg README template --- docs/template/pkg/README.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/docs/template/pkg/README.md b/docs/template/pkg/README.md index c7f915e9a..44f41885a 100644 --- a/docs/template/pkg/README.md +++ b/docs/template/pkg/README.md @@ -1,6 +1,3 @@ -Certainly! I've added a section named "Architecture Diagrams" in the documentation template below: - -```markdown # Package [PackageName] > Brief one-liner or quote about what this package does. @@ -16,11 +13,21 @@ Provide a few sentences about the purpose and functionality of this package. Con Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package: -![Architecture Overview](./path-to-diagram1.png) +```mermaid +--- +title: Architecture Overview +--- +flowchart +``` > **Figure 1**: Brief description about what this diagram represents. -![Another Diagram](./path-to-diagram2.png) +```mermaid +--- +title: Another Diagram +--- +flowchart +``` > **Figure 2**: Brief description about what this other diagram represents. From ebb30f96f41920a5c1b6463fc26c12a0567f7269 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 23 Oct 2023 14:03:58 +0200 Subject: [PATCH 6/7] chore: add `Observable#Next()` (cherry picked from commit cb4142f673fee37ead8520394e314f1fcb9d0dc9) --- pkg/observable/channel/observable.go | 8 ++++++++ pkg/observable/interface.go | 1 + 2 files changed, 9 insertions(+) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index 7b8e2d04e..48539622f 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -64,6 +64,14 @@ func WithPublisher[V any](publishCh chan V) option[V] { } } +func (obsvbl *channelObservable[V]) Next(ctx context.Context) V { + tempObserver := obsvbl.Subscribe(ctx) + defer tempObserver.Unsubscribe() + + val := <-tempObserver.Ch() + return val +} + // Subscribe returns an observer which is notified when the publishCh channel // receives a value. func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index 4005190f0..45384b7b4 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -11,6 +11,7 @@ import "context" // notified of new values asynchronously. // It is analogous to a publisher in a "Fan-Out" system design. type Observable[V any] interface { + Next(context.Context) V Subscribe(context.Context) Observer[V] UnsubscribeAll() } From 1cca119cfbcf13746866634d35a6d5827b71e789 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 23 Oct 2023 18:25:31 +0200 Subject: [PATCH 7/7] chore: update godoc comments --- pkg/observable/channel/observable.go | 1 + pkg/observable/interface.go | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index 48539622f..8e17ad9fb 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -64,6 +64,7 @@ func WithPublisher[V any](publishCh chan V) option[V] { } } +// Next synchronously returns the next value from the observable. func (obsvbl *channelObservable[V]) Next(ctx context.Context) V { tempObserver := obsvbl.Subscribe(ctx) defer tempObserver.Unsubscribe() diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index 45384b7b4..452c18dcd 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -11,8 +11,12 @@ import "context" // notified of new values asynchronously. // It is analogous to a publisher in a "Fan-Out" system design. type Observable[V any] interface { + // Next synchronously returns the next value from the observable. Next(context.Context) V + // Subscribe returns an observer which is notified when the publishCh channel + // receives a value. Subscribe(context.Context) Observer[V] + // UnsubscribeAll unsubscribes and removes all observers from the observable. UnsubscribeAll() } @@ -20,7 +24,12 @@ type Observable[V any] interface { // channel and allows unsubscribing from an Observable. // It is analogous to a subscriber in a "Fan-Out" system design. type Observer[V any] interface { + // Unsubscribe closes the subscription channel and removes the subscription from + // the observable. Unsubscribe() + // Ch returns a receive-only subscription channel. Ch() <-chan V + // IsClosed returns true if the observer has been unsubscribed. + // A closed observer cannot be reused. IsClosed() bool }