Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context propogation to subscribers for local pub-sub #487

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

yashb042
Copy link

@yashb042 yashb042 commented Sep 3, 2024

Solves Issue - https://github.com/ThreeDotsLabs/watermill/issues/464

Update 1 -

Have not added extra variables in Message struct, because the Message struct is used by every publisher-subscriber

Update 2 -
Couldn't test the func - TestMessageCtx in test_pubsub.go
It's mentioned that "ExactlyOnceDelivery test is not supported yet"

pubsub/gochannel/pubsub.go Outdated Show resolved Hide resolved
var cancelCtx context.CancelFunc

if preserveContext {
ctx, cancelCtx = context.WithCancel(msg.Context())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe what's the use case here? 🤔 It seems to be a bit more than just preserving context among messages.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent find, removed the WithCancel when preserving context.

WithCancel makes sense when we are using the subscriber context, and done is called on the subscriber context itself.

A quick question though - Even if the subscriber main context is cancelled, that time the messages will still be processed further.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @yashb042 after considering it, I think your original idea made more sense: The message's context should also be wrapped with the cancel, otherwise closing the subscriber wouldn't close the message's context. Sorry about the confusion!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m110 Could you please tell me what is the point of keeping cancelCtx here, for both the cases
As I see it, the messages are still being passed to the subscriber irrespective of whether the context was cancelled or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yashb042 It's because the context is passed to the message and is likely to be used in the command handler. For example, when executing a database query, you would pass msg.Context(). If the subscriber closes, the context gets canceled, which also cancels the database query.

Let me know if this makes sense?

Copy link

@ccoVeille ccoVeille Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make sense

But let me ask something.

Is there a case where there is a need for the context not to be the one passed to the message ?

I'm asking because the previous code was creating a context. And now we add an option to preserve it.

But why do we need such option? Why not simply fix the code to preserve the context.

I mean unless I'm wrong the code has a bug, and we could simply fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most Watermill Pub/Subs work over the network. You publish a message, it gets marshalled, then it's delivered to the subscriber. But obviously, the context wouldn't survive the marshaling, so you can't rely on it.

The GoChannel Pub/Sub works in memory, so it is possible to pass the context. I suppose it could be useful for passing some arbitrary data instead of marshaling it to []byte, but it behaves by design the same as other Pub/Subs. Because you should be able to replace one Pub/Sub implementation with another with similar results. For example, it's handy to use GoChannel for tests, but you would use something different for production.

A scenario where passing the context may be confusing:

  • An HTTP handler publishes a message passing the request's context.
  • The message is delivered to a subscriber, which appends some data to the database, using the message's context.
  • Meanwhile, the HTTP request is done, and the context is canceled. If you preserved the context in the message, it would also cancel the database query, which is not what you want.

In general, the publishing and subscribing sides should be considered as something separate, working concurrently. Preserving the context can make things confusing.

If you really know what you're doing, I guess it's fine to change this behavior, but you should explicitly flip the config flag then and proceed with caution (e.g., use context.WithoutCancel when passing it to the Publish).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your detailed reply!

Copy link
Author

@yashb042 yashb042 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yashb042 It's because the context is passed to the message and is likely to be used in the command handler. For example, when executing a database query, you would pass msg.Context(). If the subscriber closes, the context gets canceled, which also cancels the database query.

Let me know if this makes sense?

@m110
Agreed, and the database case makes sense as well.
Though, is it there for all the cases, because as I see it, context usually is considered whenever either you do a network call (in database case you mentioned) or you call a library which has put up an exclusive check on the context.Done() field.

For our local pub-sub, I don't see any of these happening that's why a bit confused.

Copy link

codecov bot commented Sep 4, 2024

Codecov Report

Attention: Patch coverage is 95.16129% with 3 lines in your changes missing coverage. Please review.

Project coverage is 67.15%. Comparing base (8d78dbe) to head (867fb77).
Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
pubsub/tests/test_asserts.go 72.72% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #487      +/-   ##
==========================================
+ Coverage   66.96%   67.15%   +0.18%     
==========================================
  Files          64       64              
  Lines        4289     4344      +55     
==========================================
+ Hits         2872     2917      +45     
- Misses       1242     1250       +8     
- Partials      175      177       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@m110
Copy link
Member

m110 commented Sep 4, 2024

Thank you @yashb042! This looks solid to me, just a couple of minor comments.

@yashb042
Copy link
Author

yashb042 commented Sep 5, 2024

Also, @m110

Update 2 -
Couldn't test the func - TestMessageCtx in test_pubsub.go
It's mentioned that "ExactlyOnceDelivery test is not supported yet"

How would we proceed testing this func - TestMessageCtx

Comment on lines 359 to 366
var ctx context.Context

//This is getting the context from the message, not the subscriber
if s.preserveContext {
ctx = msg.Context()
} else {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithCancel(s.ctx)
defer cancelCtx()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to suggest this

Suggested change
var ctx context.Context
//This is getting the context from the message, not the subscriber
if s.preserveContext {
ctx = msg.Context()
} else {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithCancel(s.ctx)
defer cancelCtx()
}
ctx := msg.Context()
//This is getting the context from the message, not the subscriber
if !s.preserveContext {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithCancel(s.ctx)
defer cancelCtx()
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed

A quick question though :-

When we are using the subscriber context, even if the subscriber main context is cancelled, that time the messages will still be processed further.
So why are we even using cancelCtx here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I was surprised to see an option to change it, for me the context should be always the one of the message

The lib is in charge to pass the context to the message, so if the context is cancelled, the message one is too, no ?

I mean couldn't we simply fix it, and not adding the preserveContext boolean ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, however @m110 has a point here -
#464 (comment)

Nonetheless, according to your statement -
The lib is in charge to pass the context to the message, so if the context is cancelled, the message one is too, no ?

I don't see that happening in the code though, even if the main context is cancelled, the messages are passed to the subscriber nonetheless

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are, but the context of the message would be canceled I think/hope

Anyway this comment might change everything #487 (comment)

So, let's wait

Comment on lines 98 to 105
func AssertAllMessagesHaveSameContext(t *testing.T, contextKeyString string, expectedValues map[string]context.Context, received []*message.Message) bool {
assert.Len(t, received, len(expectedValues))

ok := true
for _, msg := range received {
expectedValue := expectedValues[msg.UUID].Value(contextKey(contextKeyString)).(string)
actualValue := msg.Context().Value(contextKeyString)
if !assert.Equal(t, expectedValue, actualValue) {
ok = false
}
}

return ok
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to return a boolean, calling t.Error is enough to invalidate the test

also your code is not using the returned value when you call it

So this would be better I think

Suggested change
func AssertAllMessagesHaveSameContext(t *testing.T, contextKeyString string, expectedValues map[string]context.Context, received []*message.Message) bool {
assert.Len(t, received, len(expectedValues))
ok := true
for _, msg := range received {
expectedValue := expectedValues[msg.UUID].Value(contextKey(contextKeyString)).(string)
actualValue := msg.Context().Value(contextKeyString)
if !assert.Equal(t, expectedValue, actualValue) {
ok = false
}
}
return ok
}
func AssertAllMessagesHaveSameContext(t *testing.T, contextKeyString string, expectedValues map[string]context.Context, received []*message.Message) {
assert.Len(t, received, len(expectedValues))
for _, msg := range received {
expectedValue := expectedValues[msg.UUID].Value(contextKey(contextKeyString)).(string)
actualValue := msg.Context().Value(contextKeyString)
assert.Equal(t, expectedValue, actualValue)
}
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed
Thanks for the suggestion

@ccoVeille
Copy link

Before reviewing, I will wait for you to reply to this message, because I feel like the code might change again #487 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants