-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rest): use cache for relay API #890
Conversation
Jenkins BuildsClick to see older builds (6)
|
465a0ba
to
21440f6
Compare
} else { | ||
err = r.cache.Unsubscribe(contentFilter) | ||
if err != nil { | ||
r.log.Error("unsubscribing cache", zap.Strings("contentTopics", cTopics), zap.Error(err)) |
Check failure
Code scanning / CodeQL
Log entries created from user input High
user-provided value
cmd/waku/server/rest/relay.go
Outdated
if err != nil { | ||
w.WriteHeader(http.StatusNotFound) | ||
_, err = w.Write([]byte("not subscribed to topic")) | ||
r.log.Error("writing response", zap.Error(err)) | ||
return | ||
} | ||
var response []*pb.WakuMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just run a loop here until the channel is drained or we reached maxCacheCapacity and return?
This way we can avoid maintaining this additional cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difficulty with trying that approach is that this concept of draining a channel is not something that is used in go. Reading from channels is a blocking operation up to the moment you close the channel, or add some timeout like:
ctx, cancel := context.WithTimeout(request.ctx, 2 * time.Second)
defer cancel()
var messages []*pb.WakuMessage
for {
select {
case msg := <-myCh:
messages = append(messages, msg)
case <-ctx.Done():
return messages
}
}
With the drawback of having the requests potentially take more time than required when there is no messages in the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, Got it.... how about something like this?
Wouldn't this take care of the case where channel doesn't have anything to be read, we return or once we hit cache capacity we return. I may be missing something here though.
var messages []*pb.WakuMessage
for {
if len(messages) > cacheCapacity {
break
}
select {
case msg := <-myCh:
messages = append(messages, msg)
case <-ctx.Done():
break
default:
break
}
}
return messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have to test it, but I believe it would still suffer the same delay, in case the number of messages you retrieve from the channel are less than the cacheCapacity, and also because the length of the function execution would depend on the request context.
So for example if you have a cache limit of 20, and only have received 3 messages, you'd have to wait until ctx.Done()
is triggered, and then, probably there's no one to reply, since the requestor will have closed the connection by then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for example if you have a cache limit of 20, and only have received 3 messages, you'd have to wait until ctx.Done() is triggered, and then, probably there's no one to reply, since the requestor will have closed the connection by then
Wouldn't the default case hit once channel is empty? That should take care of not having to wait till end of context?
I had done this change and tested with defaultTopic and i could see 30 messages(which is the default capacity set) returned in 1 call and that too without waiting for request context to be done.
So looks like this approach works and is simple without having to maintain additional cache layer.
Also increased the default cache capacity from 30 to 1000 or something as underlying subscription's default is that if not set. But since in REST we are setting it to 30, it is very low.
Raised a new PR #898
The relay Subscribe API in waky.relay already uses contentFilters. Do you mean relay REST API? |
yes |
21440f6
to
773fa14
Compare
Replaced by #898 |
Description
Getting relay messages via REST API was not working correctly as it would only return a single message due to having a
select
with a default case ingo-waku/cmd/waku/server/rest/relay.go
Line 129 in 3d217ed
for
would not have helped as the channel on which the subscription receives the message is never closed.I ended up reusing the same cache from filter since it solves the problem for now, and I also added some extra functionality to the cache to store messages for subscriptions that only include pubsub topics. This code will need to be modified once the API for relay is changed to use content filters instead of just a pubsub topic