-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Observable] chore: observable touchup #83
Changes from all commits
ad77da4
f88b070
dfb2aae
0ea010a
17b0ce5
ebb30f9
d90c3d7
1cca119
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,3 @@ | ||
Certainly! I've added a section named "Architecture Diagrams" in the documentation template below: | ||
|
||
```markdown | ||
# Package [PackageName] | ||
|
||
> Brief one-liner or quote about what this package does. | ||
|
@@ -16,11 +13,21 @@ Provide a few sentences about the purpose and functionality of this package. Con | |
|
||
Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package: | ||
|
||
![Architecture Overview](./path-to-diagram1.png) | ||
```mermaid | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these changes intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this file is intended to be a template for generating go package READMEs with LLMs. |
||
--- | ||
title: Architecture Overview | ||
--- | ||
flowchart | ||
``` | ||
|
||
> **Figure 1**: Brief description about what this diagram represents. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Plaeholder? |
||
|
||
![Another Diagram](./path-to-diagram2.png) | ||
```mermaid | ||
--- | ||
title: Another Diagram | ||
--- | ||
flowchart | ||
``` | ||
|
||
> **Figure 2**: Brief description about what this other diagram represents. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,9 @@ package channel | |
|
||
import ( | ||
"context" | ||
"pocket/pkg/observable" | ||
"sync" | ||
|
||
"pocket/pkg/observable" | ||
) | ||
|
||
// TODO_DISCUSS: what should this be? should it be configurable? It seems to be most | ||
|
@@ -50,7 +51,7 @@ func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V | |
} | ||
|
||
// start listening to the publishCh and emit values to observers | ||
go obs.goPublish(obs.publishCh) | ||
go obs.goPublish() | ||
|
||
return obs, obs.publishCh | ||
} | ||
|
@@ -63,6 +64,15 @@ func WithPublisher[V any](publishCh chan V) option[V] { | |
} | ||
} | ||
|
||
// Next synchronously returns the next value from the observable. | ||
func (obsvbl *channelObservable[V]) Next(ctx context.Context) V { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a godoc on what this is or why its needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a godoc comment. It's not strictly necessary but will be convenient in forthcoming PRs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused as to when we should or shouldn't use it. #PUC when you have time, but not a blocker in this PR. |
||
tempObserver := obsvbl.Subscribe(ctx) | ||
defer tempObserver.Unsubscribe() | ||
|
||
val := <-tempObserver.Ch() | ||
return val | ||
} | ||
|
||
// Subscribe returns an observer which is notified when the publishCh channel | ||
// receives a value. | ||
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { | ||
|
@@ -110,8 +120,8 @@ func (obsvbl *channelObservable[V]) unsubscribeAll() { | |
|
||
// goPublish to the publishCh and notify observers when values are received. | ||
// This function is blocking and should be run in a goroutine. | ||
func (obsvbl *channelObservable[V]) goPublish(publisher <-chan V) { | ||
for notification := range publisher { | ||
func (obsvbl *channelObservable[V]) goPublish() { | ||
for notification := range obsvbl.publishCh { | ||
// Copy currentObservers to avoid holding the lock while notifying them. | ||
// New or existing Observers may (un)subscribe while this notification | ||
// is being fanned out. | ||
|
@@ -154,9 +164,12 @@ func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserve | |
|
||
// goUnsubscribeOnDone unsubscribes from the subscription when the context is done. | ||
// It is a blocking function and intended to be called in a goroutine. | ||
func goUnsubscribeOnDone[V any](ctx context.Context, subscription observable.Observer[V]) { | ||
func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) { | ||
<-ctx.Done() | ||
subscription.Unsubscribe() | ||
if observer.IsClosed() { | ||
return | ||
} | ||
observer.Unsubscribe() | ||
} | ||
|
||
// onUnsubscribe returns a function that removes a given observer from the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to add TODOs here?