Skip to content

Commit

Permalink
close subscription if publisher channel is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 5, 2024
1 parent c433a28 commit ca612ce
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions internal/db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-cha
for {
var evt events.Update
select {
case val := <-pub.Event():
evt = val
case <-ctx.Done():
return // context cancelled
case val, ok := <-pub.Event():
if !ok {
return // channel closed
}
evt = val
}

txn, err := db.NewTxn(ctx, false)
Expand Down Expand Up @@ -80,11 +83,11 @@ func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-cha
}

select {
case resCh <- res:
txn.Discard(ctx)
case <-ctx.Done():
txn.Discard(ctx)
return // context cancelled
case resCh <- res:
txn.Discard(ctx)
}
}
}()
Expand Down

0 comments on commit ca612ce

Please sign in to comment.