Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Observable] chore: observable touchup #83

Merged
merged 8 commits into from
Oct 23, 2023
4 changes: 2 additions & 2 deletions docs/pkg/observable/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## `pkg/observable` Package
## `pocket/pkg/observable` Package

The `pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`.
The `pocket/pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`.

## Overview

Expand Down
17 changes: 12 additions & 5 deletions docs/template/pkg/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
Certainly! I've added a section named "Architecture Diagrams" in the documentation template below:

```markdown
# Package [PackageName]

> Brief one-liner or quote about what this package does.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to add TODOs here?

Expand All @@ -16,11 +13,21 @@ Provide a few sentences about the purpose and functionality of this package. Con

Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package:

![Architecture Overview](./path-to-diagram1.png)
```mermaid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this file is intended to be a template for generating go package READMEs with LLMs.

---
title: Architecture Overview
---
flowchart
```

> **Figure 1**: Brief description about what this diagram represents.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plaeholder?

![Another Diagram](./path-to-diagram2.png)
```mermaid
---
title: Another Diagram
---
flowchart
```

> **Figure 2**: Brief description about what this other diagram represents.
Expand Down
25 changes: 19 additions & 6 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package channel

import (
"context"
"pocket/pkg/observable"
"sync"

"pocket/pkg/observable"
)

// TODO_DISCUSS: what should this be? should it be configurable? It seems to be most
Expand Down Expand Up @@ -50,7 +51,7 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V
}

// start listening to the publishCh and emit values to observers
go obs.goPublish(obs.publishCh)
go obs.goPublish()

return obs, obs.publishCh
}
Expand All @@ -63,6 +64,15 @@ func WithPublisher[V any](publishCh chan V) option[V] {
}
}

// Next synchronously returns the next value from the observable.
func (obsvbl *channelObservable[V]) Next(ctx context.Context) V {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a godoc on what this is or why its needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a godoc comment. It's not strictly necessary but will be convenient in forthcoming PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not strictly necessary but will be convenient in forthcoming PRs.

I'm a bit confused as to when we should or shouldn't use it. #PUC when you have time, but not a blocker in this PR.

tempObserver := obsvbl.Subscribe(ctx)
defer tempObserver.Unsubscribe()

val := <-tempObserver.Ch()
return val
}

// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
Expand Down Expand Up @@ -110,8 +120,8 @@ func (obsvbl *channelObservable[V]) unsubscribeAll() {

// goPublish to the publishCh and notify observers when values are received.
// This function is blocking and should be run in a goroutine.
func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) {
for notification := range publisher {
func (obsvbl *channelObservable[V]) goPublish() {
for notification := range obsvbl.publishCh {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
Expand Down Expand Up @@ -154,9 +164,12 @@ func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserve

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) {
func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) {
<-ctx.Done()
subscription.Unsubscribe()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
}

// onUnsubscribe returns a function that removes a given observer from the
Expand Down
16 changes: 13 additions & 3 deletions pkg/observable/channel/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {

tests := []test{
{
name: "nil publisher",
name: "nil publisher (default buffer size)",
publishCh: nil,
inputs: inputs,
expectedOutputs: inputs,
Expand All @@ -55,7 +55,13 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
inputs: inputs,
expectedOutputs: inputs,
},
// TODO_INCOMPLETE: publisher channels which are full are proving harder to test
{
name: "empty buffered len 1000 publisher",
publishCh: make(chan int, 1000),
inputs: inputs,
expectedOutputs: inputs,
},
// TODO_INCOMPLETE(#81): publisher channels which are full are proving harder to test
// robustly (no flakiness); perhaps it has to do with the lack of some
// kind of guarantee about the receiver order on the consumer side.
//
Expand Down Expand Up @@ -156,7 +162,7 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
err := group.Wait()
require.NoError(t, err)

// unsubscribing should unsubscribeAll obsvr channel(s)
// unsubscribing should close observer channel(s)
for _, observer := range observers {
observer.Unsubscribe()

Expand Down Expand Up @@ -205,6 +211,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) {
},
},
{
// NOTE: this will log a warning that can be ignored:
// > redundant unsubscribe: observer is closed
name: "cancel then unsubscribe",
lifecycleFn: func() observable.Observer[int] {
observer := obsvbl.Subscribe(ctx)
Expand All @@ -215,6 +223,8 @@ func TestChannelObservable_UnsubscribeObservers(t *testing.T) {
},
},
{
// NOTE: this will log a warning that can be ignored:
// > redundant unsubscribe: observer is closed
name: "unsubscribe then cancel",
lifecycleFn: func() observable.Observer[int] {
observer := obsvbl.Subscribe(ctx)
Expand Down
9 changes: 9 additions & 0 deletions pkg/observable/channel/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func (obsvr *channelObserver[V]) Ch() <-chan V {
return obsvr.observerCh
}

// IsClosed returns true if the observer has been unsubscribed.
// A closed observer cannot be reused.
func (obsvr *channelObserver[V]) IsClosed() bool {
obsvr.observerMu.Lock()
defer obsvr.observerMu.Unlock()

return obsvr.isClosed
}

// unsubscribe closes the subscription channel, marks the observer as isClosed, and
// removes the subscription from its observable's observers list via onUnsubscribe.
func (obsvr *channelObserver[V]) unsubscribe() {
Expand Down
11 changes: 11 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,25 @@ import "context"
// notified of new values asynchronously.
// It is analogous to a publisher in a "Fan-Out" system design.
type Observable[V any] interface {
// Next synchronously returns the next value from the observable.
Next(context.Context) V
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
Subscribe(context.Context) Observer[V]
// UnsubscribeAll unsubscribes and removes all observers from the observable.
UnsubscribeAll()
}

// Observer is a generic interface that provides access to the notified
// channel and allows unsubscribing from an Observable.
// It is analogous to a subscriber in a "Fan-Out" system design.
type Observer[V any] interface {
// Unsubscribe closes the subscription channel and removes the subscription from
// the observable.
Unsubscribe()
// Ch returns a receive-only subscription channel.
Ch() <-chan V
// IsClosed returns true if the observer has been unsubscribed.
// A closed observer cannot be reused.
IsClosed() bool
}