Skip to content

Commit

Permalink
[Observable] chore: observable touchup (#83)
Browse files Browse the repository at this point in the history
* chore: add `Observer#IsClosed()` to prevent  redundant unsubscription

(cherry picked from commit 78a9946b3f14353e79b123919416903d4622da4d)

* chore: simplify channel observable

(cherry picked from commit a2629c8bc3decfb5a787e453af67aa78fc8ca1ea)

* test: add case for publisher w/ large buffer size, comment, & cleanup

(cherry picked from commit e97b691e39af8fa1654b8d697a3b34095b32ed82)

* docs: update observable pkg README.md

(cherry picked from commit d5442c7062630d847e048850fa71806086f84172)

* doc: fix pkg README template

* chore: add `Observable#Next()`

(cherry picked from commit cb4142f673fee37ead8520394e314f1fcb9d0dc9)

* chore: update godoc comments
  • Loading branch information
bryanchriswhite authored Oct 23, 2023
1 parent 6e840d6 commit 6c973b1
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 16 deletions.
4 changes: 2 additions & 2 deletions docs/pkg/observable/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## `pkg/observable` Package
## `pocket/pkg/observable` Package

The `pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`.
The `pocket/pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`.

## Overview

Expand Down
17 changes: 12 additions & 5 deletions docs/template/pkg/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
Certainly! I've added a section named "Architecture Diagrams" in the documentation template below:

```markdown
# Package [PackageName]

> Brief one-liner or quote about what this package does.
Expand All @@ -16,11 +13,21 @@ Provide a few sentences about the purpose and functionality of this package. Con

Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package:

![Architecture Overview](./path-to-diagram1.png)
```mermaid
---
title: Architecture Overview
---
flowchart
```

> **Figure 1**: Brief description about what this diagram represents.
![Another Diagram](./path-to-diagram2.png)
```mermaid
---
title: Another Diagram
---
flowchart
```

> **Figure 2**: Brief description about what this other diagram represents.
Expand Down
25 changes: 19 additions & 6 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package channel

import (
"context"
"pocket/pkg/observable"
"sync"

"pocket/pkg/observable"
)

// TODO_DISCUSS: what should this be? should it be configurable? It seems to be most
Expand Down Expand Up @@ -50,7 +51,7 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V
}

// start listening to the publishCh and emit values to observers
go obs.goPublish(obs.publishCh)
go obs.goPublish()

return obs, obs.publishCh
}
Expand All @@ -63,6 +64,15 @@ func WithPublisher[V any](publishCh chan V) option[V] {
}
}

// Next synchronously returns the next value from the observable.
func (obsvbl *channelObservable[V]) Next(ctx context.Context) V {
tempObserver := obsvbl.Subscribe(ctx)
defer tempObserver.Unsubscribe()

val := <-tempObserver.Ch()
return val
}

// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
Expand Down Expand Up @@ -110,8 +120,8 @@ func (obsvbl *channelObservable[V]) unsubscribeAll() {

// goPublish to the publishCh and notify observers when values are received.
// This function is blocking and should be run in a goroutine.
func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) {
for notification := range publisher {
func (obsvbl *channelObservable[V]) goPublish() {
for notification := range obsvbl.publishCh {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
Expand Down Expand Up @@ -154,9 +164,12 @@ func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserve

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) {
func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) {
<-ctx.Done()
subscription.Unsubscribe()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
}

// onUnsubscribe returns a function that removes a given observer from the
Expand Down
16 changes: 13 additions & 3 deletions pkg/observable/channel/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {

tests := []test{
{
name: "nil publisher",
name: "nil publisher (default buffer size)",
publishCh: nil,
inputs: inputs,
expectedOutputs: inputs,
Expand All @@ -55,7 +55,13 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
inputs: inputs,
expectedOutputs: inputs,
},
// TODO_INCOMPLETE: publisher channels which are full are proving harder to test
{
name: "empty buffered len 1000 publisher",
publishCh: make(chan int, 1000),
inputs: inputs,
expectedOutputs: inputs,
},
// TODO_INCOMPLETE(#81): publisher channels which are full are proving harder to test
// robustly (no flakiness); perhaps it has to do with the lack of some
// kind of guarantee about the receiver order on the consumer side.
//
Expand Down Expand Up @@ -156,7 +162,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
err := group.Wait()
require.NoError(t, err)

// unsubscribing should unsubscribeAll obsvr channel(s)
// unsubscribing should close observer channel(s)
for _, observer := range observers {
observer.Unsubscribe()

Expand Down Expand Up @@ -205,6 +211,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) {
},
},
{
// NOTE: this will log a warning that can be ignored:
// > redundant unsubscribe: observer is closed
name: "cancel then unsubscribe",
lifecycleFn: func() observable.Observer[int] {
observer := obsvbl.Subscribe(ctx)
Expand All @@ -215,6 +223,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) {
},
},
{
// NOTE: this will log a warning that can be ignored:
// > redundant unsubscribe: observer is closed
name: "unsubscribe then cancel",
lifecycleFn: func() observable.Observer[int] {
observer := obsvbl.Subscribe(ctx)
Expand Down
9 changes: 9 additions & 0 deletions pkg/observable/channel/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func (obsvr *channelObserver[V]) Ch() <-chan V {
return obsvr.observerCh
}

// IsClosed returns true if the observer has been unsubscribed.
// A closed observer cannot be reused.
func (obsvr *channelObserver[V]) IsClosed() bool {
obsvr.observerMu.Lock()
defer obsvr.observerMu.Unlock()

return obsvr.isClosed
}

// unsubscribe closes the subscription channel, marks the observer as isClosed, and
// removes the subscription from its observable's observers list via onUnsubscribe.
func (obsvr *channelObserver[V]) unsubscribe() {
Expand Down
11 changes: 11 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,25 @@ import "context"
// notified of new values asynchronously.
// It is analogous to a publisher in a "Fan-Out" system design.
type Observable[V any] interface {
// Next synchronously returns the next value from the observable.
Next(context.Context) V
// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
Subscribe(context.Context) Observer[V]
// UnsubscribeAll unsubscribes and removes all observers from the observable.
UnsubscribeAll()
}

// Observer is a generic interface that provides access to the notified
// channel and allows unsubscribing from an Observable.
// It is analogous to a subscriber in a "Fan-Out" system design.
type Observer[V any] interface {
// Unsubscribe closes the subscription channel and removes the subscription from
// the observable.
Unsubscribe()
// Ch returns a receive-only subscription channel.
Ch() <-chan V
// IsClosed returns true if the observer has been unsubscribed.
// A closed observer cannot be reused.
IsClosed() bool
}

0 comments on commit 6c973b1

Please sign in to comment.