diff --git a/docs/pkg/observable/README.md b/docs/pkg/observable/README.md new file mode 100644 index 000000000..9523ff457 --- /dev/null +++ b/docs/pkg/observable/README.md @@ -0,0 +1,191 @@ +## `pkg/observable` Package + +The `pkg/observable` package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: `Observable` and `Observer`. + +## Overview + +The `Observable` interface is responsible for notifying multiple subscribers about new values asynchronously, while the `Observer` interface allows access to the notified channel and facilitates unsubscribing from an `Observable`. + +## Interfaces and Structures + +### `Observable` Interface + +Represents a publisher in a "Fan-Out" system design, allowing multiple subscribers to be notified of new values asynchronously. + +- **Methods**: + - **Subscribe**: Used to subscribe an observer to the observable. Returns an instance of the `Observer` interface. + ```go + func (o *MyObservableType) Subscribe(ctx context.Context) Observer[MyValueType] + ``` + - **UnsubscribeAll**: Unsubscribes all observers from the observable. + ```go + func (o *MyObservableType) UnsubscribeAll() + ``` + +### `Observer` Interface + +Represents a subscriber in a "Fan-Out" system design, providing access to the notified channel and capabilities to unsubscribe. + +- **Methods**: + - **Unsubscribe**: Used to unsubscribe the observer from its associated observable. + ```go + func (obs *MyObserverType) Unsubscribe() + ``` + - **Ch**: Returns the channel through which the observer receives notifications. + ```go + func (obs *MyObserverType) Ch() <-chan MyValueType + ``` + +## Architecture Diagrams + +Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package: + +### Observable Synchronization + +```mermaid +flowchart + subgraph observable + mu1[read/write mutex] + sub([#subscribe]) + close([#close]) + end + + subgraph observer + mu2[read/write mutex] + notify([#notify]) + unsub([#unsubscribe]) + end + + notify -."read-lock".-> mu2 +sub -."write-lock".-> mu1 +unsub -."write-lock".-> mu1 +unsub -."write-lock".-> mu2 +close -."read-lock".-> mu1 +close -."write-lock".-> mu1 +close --> unsub + +observable --> notify +``` + +> **Figure 1**: This diagram depicts the synchronization mechanisms between the observable and its observers. It specifically showcases the use of read and write locks for different operations in both observable and observer contexts. + +### Observable Buffering + +```mermaid +flowchart + + pub_ctx[publish context] + sub_ctx1[subscribe context 1] + sub_ctx2[subscribe context 2] + + subgraph observable + subgraph pub[publisher channel] + pb[publish buffer] + end + end + + subgraph observer2 + subgraph sub2[subscriber channel] + sb2[subscribe buffer] + end + + notify2([#notify]) + end + + subgraph observer1 + subgraph sub1[subscriber channel] + sb1[subscribe buffer] + end + + notify1([#notify]) + end + + pub_ctx -."source".-> pb +sb1 -."sink".-> sub_ctx1 +sb2 -."sink".-> sub_ctx2 + +pb -."sink".-> notify1 +notify1 -."source".-> sb1 +%% pb -."sink".-> sb2 +pb -."sink".-> notify2 +notify2 -."source".-> sb2 +``` + +> Figure 2: The diagram illustrates the buffering mechanisms within the observable and its observers. It highlights how published messages are buffered and how they propagate to the individual observers' buffers. + +## Usage + +### Basic Example + +```go +package main + +import ( + "context" + "fmt" + "time" + + "poktroll/pkg/observable/channel" +) + +func main() { + // Create a new context that can be cancelled + ctx, cancel := context.WithCancel(context.Background()) + // Ensure to cancel the context to release resources + defer cancel() + + // Create a new Observable and its corresponding publisher + obsvbl, publisher := channel.NewObservable[int]() + // Subscribe the first Observer to the Observable + observer1 := obsvbl.Subscribe(ctx) + + // Start observing with observer1 in a goroutine + go func() { + for v := range observer1.Ch() { + fmt.Println("Observer1 received:", v) + } + }() + + // Publish the first value to the Observable + publisher <- 10 + time.Sleep(time.Millisecond) + + // Now, subscribe the second Observer to the Observable + observer2 := obsvbl.Subscribe(ctx) + + // Start observing with observer2 in a goroutine + go func() { + for v := range observer2.Ch() { + fmt.Println("Observer2 received:", v) + } + }() + + // Publish the second value + publisher <- 20 + time.Sleep(time.Millisecond) + + // Unsubscribe observer1 before the last value is sent + observer1.Unsubscribe() + fmt.Println("Observer1 unsubscribed!") + + // Publish the third value + publisher <- 30 + time.Sleep(time.Millisecond) +} + +// Expected Output: +// Observer1 received: 10 +// Observer2 received: 20 +// Observer1 received: 20 +// Observer1 unsubscribed! +// Observer2 received: 30 + +``` + +## Considerations + +While the `pkg/observable` package is designed to be simple and minimal, developers with more complex requirements may need to consider extending its functionality or exploring other libraries like [RxGo](https://github.com/ReactiveX/RxGo). + +## Conclusion + +The `pkg/observable` package is an intuitive solution for handling asynchronous notifications in Go projects, ensuring efficient communication between observables and observers. diff --git a/docs/template/pkg/README.md b/docs/template/pkg/README.md new file mode 100644 index 000000000..c7f915e9a --- /dev/null +++ b/docs/template/pkg/README.md @@ -0,0 +1,98 @@ +Certainly! I've added a section named "Architecture Diagrams" in the documentation template below: + +```markdown +# Package [PackageName] + +> Brief one-liner or quote about what this package does. + +## Overview + +Provide a few sentences about the purpose and functionality of this package. Consider: +- What problems does it solve? +- Why would someone use this package as opposed to others or implementing their own solution? +- Any unique features or aspects that stand out. + +## Architecture Diagrams + +Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package: + +![Architecture Overview](./path-to-diagram1.png) + +> **Figure 1**: Brief description about what this diagram represents. + +![Another Diagram](./path-to-diagram2.png) + +> **Figure 2**: Brief description about what this other diagram represents. + +If you have multiple diagrams, you can explain each one separately or provide a list. + +## Installation + +```bash +go get github.com/yourusername/yourproject/[PackageName] +``` + +## Features + +- **Feature 1**: A brief description. +- **Feature 2**: Another description. +- ... + +## Usage + +### Basic Example + +```go +// A simple and concise code example showing the most common use case. +``` + +### Advanced Usage + +For complex features or functionalities, it's good to have a separate section: + +```go +// Advanced code example or usage. +``` + +### Configuration + +If the package can be configured in some way, describe it here: + +- **Config Option 1**: Explanation. +- **Config Option 2**: Explanation. + +## API Reference + +While `godoc` will provide the detailed API reference, you can highlight or briefly describe key functions, types, or methods here. + +- `FunctionOrType1()`: A short description of its purpose. +- `FunctionOrType2(param Type)`: Another brief description. + +For the complete API details, see the [godoc](https://pkg.go.dev/github.com/yourusername/yourproject/[PackageName]). + +## Best Practices + +- **Practice 1**: Description and rationale. +- **Practice 2**: Another helpful practice. + +## FAQ + +#### Question 1? + +Answer for question 1. + +#### Question 2? + +Answer for question 2. + +## Contributing + +Briefly describe how others can contribute to this package. Link to the main contributing guide if you have one. + +## Changelog + +For detailed release notes, see the [CHANGELOG](../CHANGELOG.md) at the root level or link to a separate CHANGELOG specific to this package. + +## License + +This package is released under the XYZ License. For more information, see the [LICENSE](../LICENSE) file at the root level. \ No newline at end of file diff --git a/go.mod b/go.mod index 5ab555b8f..3534e7fda 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + golang.org/x/sync v0.3.0 google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 google.golang.org/grpc v1.56.1 gopkg.in/yaml.v2 v2.4.0 @@ -257,7 +258,6 @@ require ( golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.14.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/term v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect diff --git a/internal/testchannel/drain.go b/internal/testchannel/drain.go new file mode 100644 index 000000000..4ea41a297 --- /dev/null +++ b/internal/testchannel/drain.go @@ -0,0 +1,26 @@ +package testchannel + +import ( + "time" + + "pocket/pkg/observable" +) + +// DrainChannel attempts to receive from the given channel, blocking, until it is +// empty. It returns an error if the channel is not closed by the time it's empty. +// TODO_CONSIDERATION: this function could easily take a timeout parameter and add +// a case which returns an error if the timeout is exceeded. This would prevent +// the case where the channel never stops receiving from looping indefinitely. +func DrainChannel[V any](ch <-chan V) error { + for { + select { + case _, ok := <-ch: + if ok { + continue + } + return nil + case <-time.After(time.Millisecond): + return observable.ErrObserverClosed + } + } +} diff --git a/internal/testerrors/require.go b/internal/testerrors/require.go new file mode 100644 index 000000000..624cdaf6e --- /dev/null +++ b/internal/testerrors/require.go @@ -0,0 +1,11 @@ +package testerrors + +import errorsmod "cosmossdk.io/errors" + +var ( + // ErrAsync is returned when a test assertion fails in a goroutine other than + // the main test goroutine. This is done to avoid concurrent usage of + // t.Fatal() which can cause the test binary to exit before cleanup is complete. + ErrAsync = errorsmod.Register(codespace, 1, "required assertion failed") + codespace = "testerrors" +) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go new file mode 100644 index 000000000..a173d8072 --- /dev/null +++ b/pkg/observable/channel/observable.go @@ -0,0 +1,175 @@ +package channel + +import ( + "context" + "pocket/pkg/observable" + "sync" +) + +// TODO_DISCUSS: what should this be? should it be configurable? It seems to be most +// relevant in the context of the behavior of the observable when it has multiple +// observers which consume at different rates. +// defaultSubscribeBufferSize is the buffer size of a observable's publish channel. +const defaultPublishBufferSize = 50 + +var _ observable.Observable[any] = &channelObservable[any]{} + +// option is a function which receives and can modify the channelObservable state. +type option[V any] func(obs *channelObservable[V]) + +// channelObservable implements the observable.Observable interface and can be notified +// by sending on its corresponding publishCh channel. +type channelObservable[V any] struct { + // publishCh is an observable-wide channel that is used to receive values + // which are subsequently fanned out to observers. + publishCh chan V + // observersMu protects observers from concurrent access/updates + observersMu *sync.RWMutex + // observers is a list of channelObservers that will be notified when publishCh + // receives a new value. + observers []*channelObserver[V] +} + +// NewObservable creates a new observable which is notified when the publishCh +// channel receives a value. +func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V) { + // initialize an observable that publishes messages from 1 publishCh to N observers + obs := &channelObservable[V]{ + observersMu: &sync.RWMutex{}, + observers: []*channelObserver[V]{}, + } + + for _, opt := range opts { + opt(obs) + } + + // If the caller does not provide a publishCh, create a new one using the + // defaultPublishBuffer size and return it. + if obs.publishCh == nil { + obs.publishCh = make(chan V, defaultPublishBufferSize) + } + + // start listening to the publishCh and emit values to observers + go obs.goPublish(obs.publishCh) + + return obs, obs.publishCh +} + +// WithPublisher returns an option function which sets the given publishCh of the +// resulting observable when passed to NewObservable(). +func WithPublisher[V any](publishCh chan V) option[V] { + return func(obs *channelObservable[V]) { + obs.publishCh = publishCh + } +} + +// Subscribe returns an observer which is notified when the publishCh channel +// receives a value. +func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { + // must (write) lock observersMu so that we can safely append to the observers list + obsvbl.observersMu.Lock() + defer obsvbl.observersMu.Unlock() + + observer := NewObserver[V](ctx, obsvbl.onUnsubscribe) + obsvbl.observers = append(obsvbl.observers, observer) + + // caller can rely on context cancellation or call UnsubscribeAll() to unsubscribe + // active observers + if ctx != nil { + // asynchronously wait for the context to be done and then unsubscribe + // this observer. + go goUnsubscribeOnDone[V](ctx, observer) + } + return observer +} + +// UnsubscribeAll unsubscribes and removes all observers from the observable. +func (obsvbl *channelObservable[V]) UnsubscribeAll() { + obsvbl.unsubscribeAll() +} + +// unsubscribeAll unsubscribes and removes all observers from the observable. +func (obsvbl *channelObservable[V]) unsubscribeAll() { + // Copy currentObservers to avoid holding the lock while unsubscribing them. + // The observers at the time of locking, prior to copying, are the canonical + // set of observers which are unsubscribed. + // New or existing Observers may (un)subscribe while the observable is closing. + // Any such observers won't be isClosed but will also stop receiving notifications + // immediately (if they receive any at all). + currentObservers := obsvbl.copyObservers() + for _, observer := range currentObservers { + observer.Unsubscribe() + } + + // Reset observers to an empty list. This purges any observers which might have + // subscribed while the observable was closing. + obsvbl.observersMu.Lock() + obsvbl.observers = []*channelObserver[V]{} + obsvbl.observersMu.Unlock() +} + +// goPublish to the publishCh and notify observers when values are received. +// This function is blocking and should be run in a goroutine. +func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) { + for notification := range publisher { + // Copy currentObservers to avoid holding the lock while notifying them. + // New or existing Observers may (un)subscribe while this notification + // is being fanned out. + // The observers at the time of locking, prior to copying, are the canonical + // set of observers which receive this notification. + currentObservers := obsvbl.copyObservers() + for _, obsvr := range currentObservers { + // TODO_CONSIDERATION: perhaps continue trying to avoid making this + // notification async as it would effectively use goroutines + // in memory as a buffer (unbounded). + obsvr.notify(notification) + } + } + + // Here we know that the publisher channel has been closed. + // Unsubscribe all observers as they can no longer receive notifications. + obsvbl.unsubscribeAll() +} + +// copyObservers returns a copy of the current observers list. It is safe to +// call concurrently. +func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserver[V]) { + defer obsvbl.observersMu.RUnlock() + + // This loop blocks on acquiring a read lock on observersMu. If TryRLock + // fails, the loop continues until it succeeds. This is intended to give + // callers a guarantee that this copy operation won't contribute to a deadlock. + for { + // block until a read lock can be acquired + if obsvbl.observersMu.TryRLock() { + break + } + } + + observers = make([]*channelObserver[V], len(obsvbl.observers)) + copy(observers, obsvbl.observers) + + return observers +} + +// goUnsubscribeOnDone unsubscribes from the subscription when the context is done. +// It is a blocking function and intended to be called in a goroutine. +func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) { + <-ctx.Done() + subscription.Unsubscribe() +} + +// onUnsubscribe returns a function that removes a given observer from the +// observable's list of observers. +func (obsvbl *channelObservable[V]) onUnsubscribe(toRemove *channelObserver[V]) { + // must (write) lock to iterate over and modify the observers list + obsvbl.observersMu.Lock() + defer obsvbl.observersMu.Unlock() + + for i, observer := range obsvbl.observers { + if observer == toRemove { + obsvbl.observers = append((obsvbl.observers)[:i], (obsvbl.observers)[i+1:]...) + break + } + } +} diff --git a/pkg/observable/channel/observable_test.go b/pkg/observable/channel/observable_test.go new file mode 100644 index 000000000..c6f27929f --- /dev/null +++ b/pkg/observable/channel/observable_test.go @@ -0,0 +1,373 @@ +package channel_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "pocket/internal/testchannel" + "pocket/internal/testerrors" + "pocket/pkg/observable" + "pocket/pkg/observable/channel" +) + +const ( + publishDelay = 100 * time.Microsecond + notifyTimeout = publishDelay * 20 + cancelUnsubscribeDelay = publishDelay * 2 +) + +func TestChannelObservable_NotifyObservers(t *testing.T) { + type test struct { + name string + publishCh chan int + inputs []int + expectedOutputs []int + setupFn func(t test) + } + + inputs := []int{123, 456, 789} + // NB: see TODO_INCOMPLETE comment below + //fullBlockingPublisher := make(chan *int) + //fullBufferedPublisher := make(chan *int, 1) + + tests := []test{ + { + name: "nil publisher", + publishCh: nil, + inputs: inputs, + expectedOutputs: inputs, + }, + { + name: "empty non-buffered publisher", + publishCh: make(chan int), + inputs: inputs, + expectedOutputs: inputs, + }, + { + name: "empty buffered len 1 publisher", + publishCh: make(chan int, 1), + inputs: inputs, + expectedOutputs: inputs, + }, + // TODO_INCOMPLETE: publisher channels which are full are proving harder to test + // robustly (no flakiness); perhaps it has to do with the lack of some + // kind of guarantee about the receiver order on the consumer side. + // + // The following scenarios should generally pass but are flaky: + // (see: docs/pkg/observable/README.md regarding synchronization and buffering) + // + // { + // name: "full non-buffered publisher", + // publishCh: fullBlockingPublisher, + // inputs: inputs[1:], + // expectedOutputs: inputs, + // setupFn: func(t test) { + // go func() { + // // blocking send + // t.publishCh <- &inputs[0] + // }() + // }, + // }, + // { + // name: "full buffered len 1 publisher", + // publishCh: fullBufferedPublisher, + // inputs: inputs[1:], + // expectedOutputs: inputs, + // setupFn: func(t test) { + // // non-blocking send + // t.publishCh <- &inputs[0] + // }, + // }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.setupFn != nil { + tt.setupFn(tt) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + obsvbl, publisher := channel.NewObservable[int]( + channel.WithPublisher(tt.publishCh), + ) + require.NotNil(t, obsvbl) + require.NotNil(t, publisher) + + // construct 3 distinct observers, each with its own channel + observers := make([]observable.Observer[int], 1) + for i := range observers { + observers[i] = obsvbl.Subscribe(ctx) + } + + group, ctx := errgroup.WithContext(ctx) + + // ensure all observer channels are notified + for obsvrIdx, obsvr := range observers { + // onNext is called for each notification received by the observer + onNext := func(outputIndex int, output int) error { + // obsvr channel should receive notified input + if !assert.Equalf( + t, tt.expectedOutputs[outputIndex], + output, + "obsvr Idx: %d", obsvrIdx, + ) { + return testerrors.ErrAsync + } + return nil + } + + // onDone is called when the observer channel closes + onDone := func(outputs []int) error { + if !assert.Equalf( + t, len(tt.expectedOutputs), + len(outputs), + "obsvr addr: %p", obsvr, + ) { + return testerrors.ErrAsync + } + return nil + } + + // concurrently await notification or timeout to avoid blocking on + // empty and/or non-buffered publishers. + group.Go(goNotifiedOrTimedOutFactory(obsvr, onNext, onDone, notifyTimeout)) + } + + // notify with test input + publish := delayedPublishFactory(publisher, publishDelay) + for _, input := range tt.inputs { + inputPtr := new(int) + *inputPtr = input + + // simulating IO delay in sequential message publishing + publish(input) + } + cancel() + + // wait for obsvbl to be notified or timeout + err := group.Wait() + require.NoError(t, err) + + // unsubscribing should unsubscribeAll obsvr channel(s) + for _, observer := range observers { + observer.Unsubscribe() + + // must drain the channel first to ensure it is isClosed + err := testchannel.DrainChannel(observer.Ch()) + require.NoError(t, err) + } + }) + } +} + +func TestChannelObservable_UnsubscribeObservers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + obsvbl, publishCh := channel.NewObservable[int]() + require.NotNil(t, obsvbl) + require.NotNil(t, publishCh) + + type test struct { + name string + lifecycleFn func() observable.Observer[int] + } + + tests := []test{ + { + name: "nil context", + lifecycleFn: func() observable.Observer[int] { + observer := obsvbl.Subscribe(nil) + observer.Unsubscribe() + return observer + }, + }, + { + name: "only unsubscribe", + lifecycleFn: func() observable.Observer[int] { + observer := obsvbl.Subscribe(ctx) + observer.Unsubscribe() + return observer + }, + }, + { + name: "only cancel", + lifecycleFn: func() observable.Observer[int] { + observer := obsvbl.Subscribe(ctx) + cancel() + return observer + }, + }, + { + name: "cancel then unsubscribe", + lifecycleFn: func() observable.Observer[int] { + observer := obsvbl.Subscribe(ctx) + cancel() + time.Sleep(cancelUnsubscribeDelay) + observer.Unsubscribe() + return observer + }, + }, + { + name: "unsubscribe then cancel", + lifecycleFn: func() observable.Observer[int] { + observer := obsvbl.Subscribe(ctx) + observer.Unsubscribe() + time.Sleep(cancelUnsubscribeDelay) + cancel() + return observer + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + observer := tt.lifecycleFn() + + select { + case value, ok := <-observer.Ch(): + require.Empty(t, value) + require.False(t, ok) + case <-time.After(notifyTimeout): + t.Fatal("observer channel left open") + } + }) + } +} + +// TODO_INCOMPLETE/TODO_TECHDEBT: Implement `TestChannelObservable_ConcurrentSubUnSub` +func TestChannelObservable_ConcurrentSubUnSub(t *testing.T) { + t.Skip("add coverage: subscribing and unsubscribing concurrently should not race") +} + +func TestChannelObservable_SequentialPublishAndUnsubscription(t *testing.T) { + observations := new([]*observation[int]) + expectedNotifications := [][]int{ + {123, 456, 789}, + {456, 789, 987}, + {789, 987, 654}, + {987, 654, 321}, + } + + obsvbl, publishCh := channel.NewObservable[int]() + require.NotNil(t, obsvbl) + require.NotNil(t, publishCh) + // simulate IO delay in sequential message publishing + publish := delayedPublishFactory(publishCh, publishDelay) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + observation0 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation0) + go goReceiveNotifications(observation0) + publish(123) + + observation1 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation1) + go goReceiveNotifications(observation1) + publish(456) + + observation2 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation2) + go goReceiveNotifications(observation2) + publish(789) + + observation3 := newObservation(ctx, obsvbl) + *observations = append(*observations, observation3) + go goReceiveNotifications(observation3) + + observation0.Unsubscribe() + publish(987) + + observation1.Unsubscribe() + publish(654) + + observation2.Unsubscribe() + publish(321) + + observation3.Unsubscribe() + + for obsnIdx, obsrvn := range *observations { + t.Run(fmt.Sprintf("observation%d", obsnIdx), func(t *testing.T) { + msg := "observer %d channel left open" + select { + case _, ok := <-obsrvn.Ch(): + require.Falsef(t, ok, msg, obsnIdx) + default: + t.Fatalf(msg, obsnIdx) + } + + obsrvn.Lock() + defer obsrvn.Unlock() + + require.Equalf( + t, len(expectedNotifications[obsnIdx]), + len(obsrvn.Notifications), + "observation index: %d, expected: %+v, actual: %+v", + obsnIdx, expectedNotifications[obsnIdx], obsrvn.Notifications, + ) + for notificationIdx, expected := range expectedNotifications[obsnIdx] { + require.Equalf( + t, expected, + (obsrvn.Notifications)[notificationIdx], + "allExpected: %+v, allActual: %+v", + expectedNotifications[obsnIdx], obsrvn.Notifications, + ) + } + }) + } +} + +// TODO_TECHDEBT/TODO_INCOMPLETE: add coverage for active observers closing when publishCh closes. +func TestChannelObservable_ObserversCloseOnPublishChannelClose(t *testing.T) { + t.Skip("add coverage: all observers should unsubscribeAll when publishCh closes") +} + +func delayedPublishFactory[V any](publishCh chan<- V, delay time.Duration) func(value V) { + return func(value V) { + publishCh <- value + // simulate IO delay in sequential message publishing + // NB: this make the test code safer as concurrent operations have more + // time to react; i.e. interact with the test harness. + time.Sleep(delay) + } +} + +func goNotifiedOrTimedOutFactory[V any]( + obsvr observable.Observer[V], + onNext func(index int, output V) error, + onDone func(outputs []V) error, + timeoutDuration time.Duration, +) func() error { + var ( + outputIndex int + outputs []V + ) + return func() error { + for { + select { + case output, ok := <-obsvr.Ch(): + if !ok { + return onDone(outputs) + } + + if err := onNext(outputIndex, output); err != nil { + return err + } + + outputs = append(outputs, output) + outputIndex++ + continue + case <-time.After(timeoutDuration): + return fmt.Errorf("timed out waiting for observer to be notified") + } + } + } +} diff --git a/pkg/observable/channel/observation_test.go b/pkg/observable/channel/observation_test.go new file mode 100644 index 000000000..17e20e393 --- /dev/null +++ b/pkg/observable/channel/observation_test.go @@ -0,0 +1,52 @@ +package channel_test + +import ( + "context" + "sync" + + "pocket/pkg/observable" +) + +// NOTE: this file does not contain any tests, only test helpers. + +// observation is a data structure that embeds an observer +// and keeps track of the received notifications. +// It uses generics with type parameter V. +type observation[V any] struct { + // Embeds a mutex for thread-safe operations + sync.Mutex + // Embeds an Observer of type V + observable.Observer[V] + // Notifications is a slice of type V to store received notifications + Notifications []V +} + +// newObservation is a constructor function that returns +// a new observation instance. It subscribes to the provided observable. +func newObservation[V any]( + ctx context.Context, + observable observable.Observable[V], +) *observation[V] { + return &observation[V]{ + Observer: observable.Subscribe(ctx), + Notifications: []V{}, + } +} + +// notify is a method on observation that safely +// appends a received value to the Notifications slice. +func (o *observation[V]) notify(value V) { + o.Lock() // Locks the mutex to prevent concurrent write access + defer o.Unlock() // Unlocks the mutex when the method returns + + o.Notifications = append(o.Notifications, value) // Appends the received value to the Notifications slice +} + +// goReceiveNotifications is a function that listens for +// notifications from the observer's channel and notifies +// the observation instance for each received value. +func goReceiveNotifications[V any](obsvn *observation[V]) { + for notification := range obsvn.Ch() { // Listens for notifications on the channel + obsvn.notify(notification) // Notifies the observation instance with the received value + } +} diff --git a/pkg/observable/channel/observer.go b/pkg/observable/channel/observer.go new file mode 100644 index 000000000..7bd7ddbd3 --- /dev/null +++ b/pkg/observable/channel/observer.go @@ -0,0 +1,138 @@ +package channel + +import ( + "context" + "log" + "sync" + "time" + + "pocket/pkg/observable" +) + +const ( + // TODO_DISCUSS: what should this be? should it be configurable? It seems to be most + // relevant in the context of the behavior of the observable when it has multiple + // observers which consume at different rates. + // defaultSubscribeBufferSize is the buffer size of a channelObserver's channel. + defaultSubscribeBufferSize = 50 + // sendRetryInterval is the duration between attempts to send on the observer's + // channel in the event that it's full. It facilitates a branch in a for loop + // which unlocks the observer's mutex and tries again. + // NOTE: setting this too low can cause the send retry loop to "slip", giving + // up on a send attempt before the channel is ready to receive for multiple + // iterations of the loop. + sendRetryInterval = 100 * time.Millisecond +) + +var _ observable.Observer[any] = &channelObserver[any]{} + +// channelObserver implements the observable.Observer interface. +type channelObserver[V any] struct { + ctx context.Context + // onUnsubscribe is called in Observer#Unsubscribe, removing the respective + // observer from observers in a concurrency-safe manner. + onUnsubscribe func(toRemove *channelObserver[V]) + // observerMu protects the observerCh and isClosed fields. + observerMu *sync.RWMutex + // observerCh is the channel that is used to emit values to the observer. + // I.e. on the "N" side of the 1:N relationship between observable and + // observer. + observerCh chan V + // isClosed indicates whether the observer has been isClosed. It's set in + // unsubscribe; isClosed observers can't be reused. + isClosed bool +} + +type UnsubscribeFunc[V any] func(toRemove *channelObserver[V]) + +func NewObserver[V any]( + ctx context.Context, + onUnsubscribe UnsubscribeFunc[V], +) *channelObserver[V] { + // Create a channel for the observer and append it to the observers list + return &channelObserver[V]{ + ctx: ctx, + observerMu: new(sync.RWMutex), + observerCh: make(chan V, defaultSubscribeBufferSize), + onUnsubscribe: onUnsubscribe, + } +} + +// Unsubscribe closes the subscription channel and removes the subscription from +// the observable. +func (obsvr *channelObserver[V]) Unsubscribe() { + obsvr.unsubscribe() +} + +// Ch returns a receive-only subscription channel. +func (obsvr *channelObserver[V]) Ch() <-chan V { + return obsvr.observerCh +} + +// unsubscribe closes the subscription channel, marks the observer as isClosed, and +// removes the subscription from its observable's observers list via onUnsubscribe. +func (obsvr *channelObserver[V]) unsubscribe() { + obsvr.observerMu.Lock() + defer obsvr.observerMu.Unlock() + + if obsvr.isClosed { + // log the fact that this case was encountered such that an extreme change + // in its frequency would be obvious. + // TODO_TECHDEBT: integrate with structured logger once available + // TODO_CONSIDERATION: alternative perspective: + // 1. this is library code; prefer fewer external dependencies, esp. I/O + // 2. the stdlib log pkg is pretty good, idiomatic, and globally + // configurable; perhaps it is sufficient + log.Printf("%s", observable.ErrObserverClosed.Wrap("redundant unsubscribe")) + return + } + + close(obsvr.observerCh) + obsvr.isClosed = true + obsvr.onUnsubscribe(obsvr) +} + +// notify is called by observable to send a msg on the observer's channel. +// We can't use channelObserver#Ch because it's intended to be a +// receive-only channel. The channel will block if it is full (determined by the buffer +// size) +// if the channel's buffer is full, we will retry after sendRetryInterval/s. +// The other half is spent holding the read-lock and waiting for the (full) channel +// to be ready to receive. +func (obsvr *channelObserver[V]) notify(value V) { + defer obsvr.observerMu.RUnlock() // defer releasing a read lock + + sendRetryTicker := time.NewTicker(sendRetryInterval) + for { + // observerMu must remain read-locked until the value is sent on observerCh + // in the event that it would be isClosed concurrently (i.e. this observer + // unsubscribes), which could cause a "send on isClosed channel" error. + if !obsvr.observerMu.TryRLock() { + continue + } + if obsvr.isClosed { + return + } + + select { + case <-obsvr.ctx.Done(): + // if the context is done just release the read-lock (deferred) + return + case obsvr.observerCh <- value: + // if observerCh has space in its buffer, the value is written to it + return + // if the context isn't done and channel is full (i.e. blocking), + // release the read-lock to give write-lockers a turn. This case + // continues the loop, re-read-locking and trying again. + case <-sendRetryTicker.C: + // TODO_IMPROVE/TODO_CONSIDERATION: this is where we would implement + // some backpressure strategy. It would be good to have a simple fail- + // safe strategy that can be used by default; e.g. dropping the oldest + // value if its buffer is full. + + // This case implies that the (read) lock was acquired, so it must + // be unlocked before continuing the send retry loop. + obsvr.observerMu.RUnlock() + } + } +} diff --git a/pkg/observable/channel/observer_test.go b/pkg/observable/channel/observer_test.go new file mode 100644 index 000000000..f8730a422 --- /dev/null +++ b/pkg/observable/channel/observer_test.go @@ -0,0 +1,81 @@ +package channel + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestObserver_Unsubscribe(t *testing.T) { + var ( + onUnsubscribeCalled = false + publishCh = make(chan int, 1) + ) + obsvr := &channelObserver[int]{ + observerMu: &sync.RWMutex{}, + // using a buffered channel to keep the test synchronous + observerCh: publishCh, + onUnsubscribe: func(toRemove *channelObserver[int]) { + onUnsubscribeCalled = true + }, + } + + // should initially be open + require.Equal(t, false, obsvr.isClosed) + + publishCh <- 1 + require.Equal(t, false, obsvr.isClosed) + + obsvr.Unsubscribe() + // should be isClosed after `#Unsubscribe()` + require.Equal(t, true, obsvr.isClosed) + require.True(t, onUnsubscribeCalled) +} + +func TestObserver_ConcurrentUnsubscribe(t *testing.T) { + var ( + onUnsubscribeCalled = false + publishCh = make(chan int, 1) + ) + obsvr := &channelObserver[int]{ + ctx: context.Background(), + observerMu: &sync.RWMutex{}, + // using a buffered channel to keep the test synchronous + observerCh: publishCh, + onUnsubscribe: func(toRemove *channelObserver[int]) { + onUnsubscribeCalled = true + }, + } + + require.Equal(t, false, obsvr.isClosed, "observer channel should initially be open") + + // concurrently & continuously publish until the test cleanup runs + done := make(chan struct{}, 1) + go func() { + for idx := 0; ; idx++ { + // return when done receives; otherwise, + select { + case <-done: + return + default: + } + + // publish a value + obsvr.notify(idx) + } + }() + // send on done when the test cleans up + t.Cleanup(func() { done <- struct{}{} }) + + // it should still be open after a bit of inactivity + time.Sleep(10 * time.Millisecond) + require.Equal(t, false, obsvr.isClosed) + + obsvr.Unsubscribe() + // should be isClosed after `#Unsubscribe()` + require.Equal(t, true, obsvr.isClosed) + require.True(t, onUnsubscribeCalled) +} diff --git a/pkg/observable/errors.go b/pkg/observable/errors.go new file mode 100644 index 000000000..c34f2f7d6 --- /dev/null +++ b/pkg/observable/errors.go @@ -0,0 +1,8 @@ +package observable + +import errorsmod "cosmossdk.io/errors" + +var ( + ErrObserverClosed = errorsmod.Register(codespace, 1, "observer is closed") + codespace = "observable" +) diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go new file mode 100644 index 000000000..3d4894339 --- /dev/null +++ b/pkg/observable/interface.go @@ -0,0 +1,24 @@ +package observable + +import "context" + +// NOTE: We explicitly decided to write a small and custom notifications package +// to keep logic simple and minimal. If the needs & requirements of this library ever +// grow, other packages (e.g. https://github.com/ReactiveX/RxGo) can be considered. +// (see: https://github.com/ReactiveX/RxGo/pull/377) + +// Observable is a generic interface that allows multiple subscribers to be +// notified of new values asynchronously. +// It is analogous to a publisher in a "Fan-Out" system design. +type Observable[V any] interface { + Subscribe(context.Context) Observer[V] + UnsubscribeAll() +} + +// Observer is a generic interface that provides access to the notified +// channel and allows unsubscribing from an Observable. +// It is analogous to a subscriber in a "Fan-Out" system design. +type Observer[V any] interface { + Unsubscribe() + Ch() <-chan V +}