Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace subscription events publisher #2686

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ To learn more about the DefraDB GraphQL Query Language, refer to https://docs.so
for _, err := range result.GQL.Errors {
errors = append(errors, err.Error())
}
if result.Pub == nil {
if result.Subscription == nil {
cmd.Print(REQ_RESULTS_HEADER)
return writeJSON(cmd, map[string]any{"data": result.GQL.Data, "errors": errors})
}
cmd.Print(SUB_RESULTS_HEADER)
for item := range result.Pub.Stream() {
for item := range result.Subscription {
writeJSON(cmd, item) //nolint:errcheck
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ type RequestResult struct {
// GQL contains the immediate results of the GQL request.
GQL GQLResult

// Pub contains a pointer to an event stream which channels any subscription results
// if the request was a GQL subscription.
Pub *events.Publisher[events.Update]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion: I think you are removing functionality here - IIRC the events package stuff allows for multiple concurrent readers of the stream. The simple Go chan that you are replacing it lacks this functionality and will mean any concurrent readers will be competing with each other for each item.

The CLI/http clients are unaffected, but the embedded Go client is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes more sense to me that we assume one reader per request by default. The consumer of the channel then has the option to decide if they want to create an events.Publisher and publish the channel results to it.

// Subscription is an optional channel which returns results
// from a subscription request.
Subscription <-chan GQLResult
}

// CollectionFetchOptions represents a set of options used for fetching collections.
Expand Down
28 changes: 13 additions & 15 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (c *Client) ExecRequest(
return result
}
if res.Header.Get("Content-Type") == "text/event-stream" {
result.Pub = c.execRequestSubscription(res.Body)
result.Subscription = c.execRequestSubscription(res.Body)
return result
}
// ignore close errors because they have
Expand All @@ -389,19 +389,17 @@ func (c *Client) ExecRequest(
return result
}

func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
return nil
}

func (c *Client) execRequestSubscription(r io.ReadCloser) chan client.GQLResult {
resCh := make(chan client.GQLResult)
go func() {
eventReader := sse.NewReadCloser(r)
// ignore close errors because the status
// and body of the request are already
// checked and it cannot be handled properly
defer eventReader.Close() //nolint:errcheck
defer func() {
// ignore close errors because the status
// and body of the request are already
// checked and it cannot be handled properly
eventReader.Close() //nolint:errcheck
close(resCh)
}()

for {
evt, err := eventReader.Next()
Expand All @@ -412,14 +410,14 @@ func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[even
if err := json.Unmarshal(evt.Data, &response); err != nil {
return
}
pub.Publish(client.GQLResult{
resCh <- client.GQLResult{
Errors: response.Errors,
Data: response.Data,
})
}
}
}()

return pub
return resCh
}

func (c *Client) PrintDump(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion http/handler_ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *ccipHandler) ExecCCIP(rw http.ResponseWriter, req *http.Request) {
}

result := store.ExecRequest(req.Context(), request.Query)
if result.Pub != nil {
if result.Subscription != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{ErrStreamingNotSupported})
return
}
Expand Down
4 changes: 2 additions & 2 deletions http/handler_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *storeHandler) ExecRequest(rw http.ResponseWriter, req *http.Request) {

result := store.ExecRequest(req.Context(), request.Query)

if result.Pub == nil {
if result.Subscription == nil {
responseJSON(rw, http.StatusOK, GraphQLResponse{result.GQL.Data, result.GQL.Errors})
return
}
Expand All @@ -335,7 +335,7 @@ func (s *storeHandler) ExecRequest(rw http.ResponseWriter, req *http.Request) {
select {
case <-req.Context().Done():
return
case item, open := <-result.Pub.Stream():
case item, open := <-result.Subscription:
if !open {
return
}
Expand Down
13 changes: 3 additions & 10 deletions internal/db/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,20 @@ func (db *db) execRequest(ctx context.Context, request string) *client.RequestRe
return res
}

pub, subRequest, err := db.checkForClientSubscriptions(parsedRequest)
pub, err := db.handleSubscription(ctx, parsedRequest)
if err != nil {
res.GQL.Errors = []error{err}
return res
}

if pub != nil {
res.Pub = pub
go db.handleSubscription(ctx, pub, subRequest)
res.Subscription = pub
return res
}

txn := mustGetContextTxn(ctx)
identity := GetContextIdentity(ctx)
planner := planner.New(
ctx,
identity,
db.acp,
db,
txn,
)
planner := planner.New(ctx, identity, db.acp, db, txn)

results, err := planner.RunRequest(ctx, parsedRequest)
if err != nil {
Expand Down
124 changes: 58 additions & 66 deletions internal/db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,83 +19,75 @@
"github.com/sourcenetwork/defradb/internal/planner"
)

func (db *db) checkForClientSubscriptions(r *request.Request) (
*events.Publisher[events.Update],
*request.ObjectSubscription,
error,
) {
// handleSubscription checks for a subscription within the given request and
// starts a new go routine that will return all subscription results on the returned
// channel. If a subscription does not exist on the given request nil will be returned.
func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-chan client.GQLResult, error) {
if len(r.Subscription) == 0 || len(r.Subscription[0].Selections) == 0 {
// This is not a subscription request and we have nothing to do here
return nil, nil, nil
return nil, nil // This is not a subscription request and we have nothing to do here
}

if !db.events.Updates.HasValue() {
return nil, nil, ErrSubscriptionsNotAllowed
return nil, ErrSubscriptionsNotAllowed

Check warning on line 30 in internal/db/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

internal/db/subscriptions.go#L30

Added line #L30 was not covered by tests
}

s := r.Subscription[0].Selections[0]
if subRequest, ok := s.(*request.ObjectSubscription); ok {
pub, err := events.NewPublisher(db.events.Updates.Value(), 5)
if err != nil {
return nil, nil, err
}

return pub, subRequest, nil
selections := r.Subscription[0].Selections[0]
subRequest, ok := selections.(*request.ObjectSubscription)
if !ok {
return nil, client.NewErrUnexpectedType[request.ObjectSubscription]("SubscriptionSelection", selections)

Check warning on line 35 in internal/db/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

internal/db/subscriptions.go#L35

Added line #L35 was not covered by tests
}
// unsubscribing from this publisher will cause a race condition
// https://github.com/sourcenetwork/defradb/issues/2687
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is this a new issue, or was the old code affected to?

And where is the race condition? Is it a test artefact, or production code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is an old issue. The previous code never called Unsubscribe. The race happens between the db.Close and the pub.Unsubscribe so it's possible to happen in production, but likely would cause no issues.

pub, err := events.NewPublisher(db.events.Updates.Value(), 5)
if err != nil {
return nil, err

Check warning on line 41 in internal/db/subscriptions.go

View check run for this annotation

Codecov / codecov/patch

internal/db/subscriptions.go#L41

Added line #L41 was not covered by tests
}

return nil, nil, client.NewErrUnexpectedType[request.ObjectSubscription]("SubscriptionSelection", s)
}
resCh := make(chan client.GQLResult)
go func() {
defer close(resCh)

func (db *db) handleSubscription(
ctx context.Context,
pub *events.Publisher[events.Update],
r *request.ObjectSubscription,
) {
for evt := range pub.Event() {
txn, err := db.NewTxn(ctx, false)
if err != nil {
log.ErrorContext(ctx, err.Error())
continue
}
// listen for events and send to the result channel
for {
var evt events.Update
select {
case val := <-pub.Event():
evt = val
case <-ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: By not looping through the pub.Event() with a direct for loop, I think you may have removed the handling of a timeout - it looks like in the event of a timeout evt will be default/empty and bad things might happen later in this func.

Can you please confirm the behaviour, and make any necessary adjustments please?

(note: IIRC testing of subscriptions is not as comprehensive as most of our other queries, and it uses an old framework)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've updated it to handle the channel closing.

return // context cancelled
}

ctx := SetContextTxn(ctx, txn)
db.handleEvent(ctx, pub, evt, r)
txn.Discard(ctx)
}
}
txn, err := db.NewTxn(ctx, false)
if err != nil {
log.ErrorContext(ctx, err.Error())
continue
}

func (db *db) handleEvent(
ctx context.Context,
pub *events.Publisher[events.Update],
evt events.Update,
r *request.ObjectSubscription,
) {
txn := mustGetContextTxn(ctx)
identity := GetContextIdentity(ctx)
p := planner.New(
ctx,
identity,
db.acp,
db,
txn,
)
ctx := SetContextTxn(ctx, txn)
identity := GetContextIdentity(ctx)

s := r.ToSelect(evt.DocID, evt.Cid.String())
p := planner.New(ctx, identity, db.acp, db, txn)
s := subRequest.ToSelect(evt.DocID, evt.Cid.String())

result, err := p.RunSubscriptionRequest(ctx, s)
if err != nil {
pub.Publish(client.GQLResult{
Errors: []error{err},
})
return
}
result, err := p.RunSubscriptionRequest(ctx, s)
if err == nil && len(result) == 0 {
txn.Discard(ctx)
continue // Don't send anything back to the client if the request yields an empty dataset.
}
res := client.GQLResult{
Data: result,
}
if err != nil {
res.Errors = []error{err}
}

// Don't send anything back to the client if the request yields an empty dataset.
if len(result) == 0 {
return
}
select {
case resCh <- res:
txn.Discard(ctx)
case <-ctx.Done():
txn.Discard(ctx)
return // context cancelled
}
}
}()

pub.Publish(client.GQLResult{
Data: result,
})
return resCh, nil
}
19 changes: 7 additions & 12 deletions tests/clients/cli/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (w *Wrapper) ExecRequest(
return result
}
if header == cli.SUB_RESULTS_HEADER {
result.Pub = w.execRequestSubscription(buffer)
result.Subscription = w.execRequestSubscription(buffer)
return result
}
data, err := io.ReadAll(buffer)
Expand Down Expand Up @@ -439,29 +439,24 @@ func (w *Wrapper) ExecRequest(
return result
}

func (w *Wrapper) execRequestSubscription(r io.Reader) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
return nil
}

func (w *Wrapper) execRequestSubscription(r io.Reader) chan client.GQLResult {
resCh := make(chan client.GQLResult)
go func() {
dec := json.NewDecoder(r)
defer close(resCh)

for {
var response http.GraphQLResponse
if err := dec.Decode(&response); err != nil {
return
}
pub.Publish(client.GQLResult{
resCh <- client.GQLResult{
Errors: response.Errors,
Data: response.Data,
})
}
}
}()

return pub
return resCh
}

func (w *Wrapper) NewTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) {
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/utils2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,13 +1718,11 @@ func executeSubscriptionRequest(

allActionsAreDone := false
expectedDataRecieved := len(action.Results) == 0
stream := result.Pub.Stream()
for {
select {
case s := <-stream:
sResult, _ := s.(client.GQLResult)
sData, _ := sResult.Data.([]map[string]any)
errs = append(errs, sResult.Errors...)
case s := <-result.Subscription:
sData, _ := s.Data.([]map[string]any)
errs = append(errs, s.Errors...)
data = append(data, sData...)

if len(data) >= len(action.Results) {
Expand Down
Loading