Skip to content

Commit

Permalink
refactor: Replace subscription events publisher (#2686)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2685 

## Description

This PR replaces subscription event publishers with a simple go channel.
This is a pre-requisite to a follow up events package refactor.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Jun 5, 2024
1 parent dd0e5af commit a7004b2
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 116 deletions.
4 changes: 2 additions & 2 deletions cli/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ To learn more about the DefraDB GraphQL Query Language, refer to https://docs.so
for _, err := range result.GQL.Errors {
errors = append(errors, err.Error())
}
if result.Pub == nil {
if result.Subscription == nil {
cmd.Print(REQ_RESULTS_HEADER)
return writeJSON(cmd, map[string]any{"data": result.GQL.Data, "errors": errors})
}
cmd.Print(SUB_RESULTS_HEADER)
for item := range result.Pub.Stream() {
for item := range result.Subscription {
writeJSON(cmd, item) //nolint:errcheck
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ type RequestResult struct {
// GQL contains the immediate results of the GQL request.
GQL GQLResult

// Pub contains a pointer to an event stream which channels any subscription results
// if the request was a GQL subscription.
Pub *events.Publisher[events.Update]
// Subscription is an optional channel which returns results
// from a subscription request.
Subscription <-chan GQLResult
}

// CollectionFetchOptions represents a set of options used for fetching collections.
Expand Down
28 changes: 13 additions & 15 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (c *Client) ExecRequest(
return result
}
if res.Header.Get("Content-Type") == "text/event-stream" {
result.Pub = c.execRequestSubscription(res.Body)
result.Subscription = c.execRequestSubscription(res.Body)
return result
}
// ignore close errors because they have
Expand All @@ -389,19 +389,17 @@ func (c *Client) ExecRequest(
return result
}

func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
return nil
}

func (c *Client) execRequestSubscription(r io.ReadCloser) chan client.GQLResult {
resCh := make(chan client.GQLResult)
go func() {
eventReader := sse.NewReadCloser(r)
// ignore close errors because the status
// and body of the request are already
// checked and it cannot be handled properly
defer eventReader.Close() //nolint:errcheck
defer func() {
// ignore close errors because the status
// and body of the request are already
// checked and it cannot be handled properly
eventReader.Close() //nolint:errcheck
close(resCh)
}()

for {
evt, err := eventReader.Next()
Expand All @@ -412,14 +410,14 @@ func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[even
if err := json.Unmarshal(evt.Data, &response); err != nil {
return
}
pub.Publish(client.GQLResult{
resCh <- client.GQLResult{
Errors: response.Errors,
Data: response.Data,
})
}
}
}()

return pub
return resCh
}

func (c *Client) PrintDump(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion http/handler_ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *ccipHandler) ExecCCIP(rw http.ResponseWriter, req *http.Request) {
}

result := store.ExecRequest(req.Context(), request.Query)
if result.Pub != nil {
if result.Subscription != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{ErrStreamingNotSupported})
return
}
Expand Down
4 changes: 2 additions & 2 deletions http/handler_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *storeHandler) ExecRequest(rw http.ResponseWriter, req *http.Request) {

result := store.ExecRequest(req.Context(), request.Query)

if result.Pub == nil {
if result.Subscription == nil {
responseJSON(rw, http.StatusOK, GraphQLResponse{result.GQL.Data, result.GQL.Errors})
return
}
Expand All @@ -335,7 +335,7 @@ func (s *storeHandler) ExecRequest(rw http.ResponseWriter, req *http.Request) {
select {
case <-req.Context().Done():
return
case item, open := <-result.Pub.Stream():
case item, open := <-result.Subscription:
if !open {
return
}
Expand Down
13 changes: 3 additions & 10 deletions internal/db/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,20 @@ func (db *db) execRequest(ctx context.Context, request string) *client.RequestRe
return res
}

pub, subRequest, err := db.checkForClientSubscriptions(parsedRequest)
pub, err := db.handleSubscription(ctx, parsedRequest)
if err != nil {
res.GQL.Errors = []error{err}
return res
}

if pub != nil {
res.Pub = pub
go db.handleSubscription(ctx, pub, subRequest)
res.Subscription = pub
return res
}

txn := mustGetContextTxn(ctx)
identity := GetContextIdentity(ctx)
planner := planner.New(
ctx,
identity,
db.acp,
db,
txn,
)
planner := planner.New(ctx, identity, db.acp, db, txn)

results, err := planner.RunRequest(ctx, parsedRequest)
if err != nil {
Expand Down
127 changes: 61 additions & 66 deletions internal/db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,83 +19,78 @@ import (
"github.com/sourcenetwork/defradb/internal/planner"
)

func (db *db) checkForClientSubscriptions(r *request.Request) (
*events.Publisher[events.Update],
*request.ObjectSubscription,
error,
) {
// handleSubscription checks for a subscription within the given request and
// starts a new go routine that will return all subscription results on the returned
// channel. If a subscription does not exist on the given request nil will be returned.
func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-chan client.GQLResult, error) {
if len(r.Subscription) == 0 || len(r.Subscription[0].Selections) == 0 {
// This is not a subscription request and we have nothing to do here
return nil, nil, nil
return nil, nil // This is not a subscription request and we have nothing to do here
}

if !db.events.Updates.HasValue() {
return nil, nil, ErrSubscriptionsNotAllowed
return nil, ErrSubscriptionsNotAllowed
}

s := r.Subscription[0].Selections[0]
if subRequest, ok := s.(*request.ObjectSubscription); ok {
pub, err := events.NewPublisher(db.events.Updates.Value(), 5)
if err != nil {
return nil, nil, err
}

return pub, subRequest, nil
selections := r.Subscription[0].Selections[0]
subRequest, ok := selections.(*request.ObjectSubscription)
if !ok {
return nil, client.NewErrUnexpectedType[request.ObjectSubscription]("SubscriptionSelection", selections)
}
// unsubscribing from this publisher will cause a race condition
// https://github.com/sourcenetwork/defradb/issues/2687
pub, err := events.NewPublisher(db.events.Updates.Value(), 5)
if err != nil {
return nil, err
}

return nil, nil, client.NewErrUnexpectedType[request.ObjectSubscription]("SubscriptionSelection", s)
}
resCh := make(chan client.GQLResult)
go func() {
defer close(resCh)

func (db *db) handleSubscription(
ctx context.Context,
pub *events.Publisher[events.Update],
r *request.ObjectSubscription,
) {
for evt := range pub.Event() {
txn, err := db.NewTxn(ctx, false)
if err != nil {
log.ErrorContext(ctx, err.Error())
continue
}
// listen for events and send to the result channel
for {
var evt events.Update
select {
case <-ctx.Done():
return // context cancelled
case val, ok := <-pub.Event():
if !ok {
return // channel closed
}
evt = val
}

ctx := SetContextTxn(ctx, txn)
db.handleEvent(ctx, pub, evt, r)
txn.Discard(ctx)
}
}
txn, err := db.NewTxn(ctx, false)
if err != nil {
log.ErrorContext(ctx, err.Error())
continue
}

func (db *db) handleEvent(
ctx context.Context,
pub *events.Publisher[events.Update],
evt events.Update,
r *request.ObjectSubscription,
) {
txn := mustGetContextTxn(ctx)
identity := GetContextIdentity(ctx)
p := planner.New(
ctx,
identity,
db.acp,
db,
txn,
)
ctx := SetContextTxn(ctx, txn)
identity := GetContextIdentity(ctx)

s := r.ToSelect(evt.DocID, evt.Cid.String())
p := planner.New(ctx, identity, db.acp, db, txn)
s := subRequest.ToSelect(evt.DocID, evt.Cid.String())

result, err := p.RunSubscriptionRequest(ctx, s)
if err != nil {
pub.Publish(client.GQLResult{
Errors: []error{err},
})
return
}
result, err := p.RunSubscriptionRequest(ctx, s)
if err == nil && len(result) == 0 {
txn.Discard(ctx)
continue // Don't send anything back to the client if the request yields an empty dataset.
}
res := client.GQLResult{
Data: result,
}
if err != nil {
res.Errors = []error{err}
}

// Don't send anything back to the client if the request yields an empty dataset.
if len(result) == 0 {
return
}
select {
case <-ctx.Done():
txn.Discard(ctx)
return // context cancelled
case resCh <- res:
txn.Discard(ctx)
}
}
}()

pub.Publish(client.GQLResult{
Data: result,
})
return resCh, nil
}
19 changes: 7 additions & 12 deletions tests/clients/cli/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (w *Wrapper) ExecRequest(
return result
}
if header == cli.SUB_RESULTS_HEADER {
result.Pub = w.execRequestSubscription(buffer)
result.Subscription = w.execRequestSubscription(buffer)
return result
}
data, err := io.ReadAll(buffer)
Expand Down Expand Up @@ -439,29 +439,24 @@ func (w *Wrapper) ExecRequest(
return result
}

func (w *Wrapper) execRequestSubscription(r io.Reader) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
return nil
}

func (w *Wrapper) execRequestSubscription(r io.Reader) chan client.GQLResult {
resCh := make(chan client.GQLResult)
go func() {
dec := json.NewDecoder(r)
defer close(resCh)

for {
var response http.GraphQLResponse
if err := dec.Decode(&response); err != nil {
return
}
pub.Publish(client.GQLResult{
resCh <- client.GQLResult{
Errors: response.Errors,
Data: response.Data,
})
}
}
}()

return pub
return resCh
}

func (w *Wrapper) NewTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) {
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/utils2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,13 +1718,11 @@ func executeSubscriptionRequest(

allActionsAreDone := false
expectedDataRecieved := len(action.Results) == 0
stream := result.Pub.Stream()
for {
select {
case s := <-stream:
sResult, _ := s.(client.GQLResult)
sData, _ := sResult.Data.([]map[string]any)
errs = append(errs, sResult.Errors...)
case s := <-result.Subscription:
sData, _ := s.Data.([]map[string]any)
errs = append(errs, s.Errors...)
data = append(data, sData...)

if len(data) >= len(action.Results) {
Expand Down

0 comments on commit a7004b2

Please sign in to comment.