Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rest): use cache for relay API #890

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions cmd/waku/server/rest/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package rest

import (
"fmt"
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)

type MessageCache struct {
capacity int
mu sync.RWMutex
log *zap.Logger
messages map[string]map[string][]*RestWakuMessage
messagesByPubsubTopic map[string][]*RestWakuMessage
}

func NewMessageCache(capacity int, log *zap.Logger) *MessageCache {
return &MessageCache{
capacity: capacity,
messages: make(map[string]map[string][]*RestWakuMessage),
messagesByPubsubTopic: make(map[string][]*RestWakuMessage),
log: log.Named("cache"),
}
}

func (c *MessageCache) Subscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Cache all messages that match a pubsub topic (but no content topic specified)
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
if _, ok := c.messages[contentFilter.PubsubTopic]; !ok {
c.messagesByPubsubTopic[contentFilter.PubsubTopic] = []*RestWakuMessage{}
}
} else {
// Cache messages that match a content topic, or pubsub topic + content topic
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.messages[pubsubTopic] == nil {
c.messages[pubsubTopic] = make(map[string][]*RestWakuMessage)
}

for _, topic := range contentTopics {
if c.messages[pubsubTopic][topic] == nil {
c.messages[pubsubTopic][topic] = []*RestWakuMessage{}
}
}
}
}

return nil
}

func (c *MessageCache) Unsubscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Stop caching all messages that match a pubsub topic
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
delete(c.messagesByPubsubTopic, contentFilter.PubsubTopic)
} else {
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
for _, contentTopic := range contentTopics {
delete(c.messages[pubsubTopic], contentTopic)
}
}
}

return nil
}

func (c *MessageCache) AddMessage(envelope *protocol.Envelope) {
c.mu.Lock()
defer c.mu.Unlock()

pubsubTopic := envelope.PubsubTopic()
contentTopic := envelope.Message().ContentTopic

message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}

if _, ok := c.messagesByPubsubTopic[pubsubTopic]; ok {
c.messagesByPubsubTopic[pubsubTopic] = append(c.messagesByPubsubTopic[pubsubTopic], message)
// Keep a specific max number of message per topic
if len(c.messagesByPubsubTopic[pubsubTopic]) >= c.capacity {
c.messagesByPubsubTopic[pubsubTopic] = c.messagesByPubsubTopic[pubsubTopic][1:]
}
}

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return
}

// Keep a specific max number of message per topic
if len(c.messages[pubsubTopic][contentTopic]) >= c.capacity {
c.messages[pubsubTopic][contentTopic] = c.messages[pubsubTopic][contentTopic][1:]
}

c.messages[pubsubTopic][contentTopic] = append(c.messages[pubsubTopic][contentTopic], message)
}

func (c *MessageCache) GetMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.messages[pubsubTopic][contentTopic]
c.messages[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil
}

func (c *MessageCache) GetMessagesWithPubsubTopic(pubsubTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messagesByPubsubTopic[pubsubTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s", pubsubTopic)
}

return c.messagesByPubsubTopic[pubsubTopic], nil
}
27 changes: 18 additions & 9 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ type FilterService struct {

log *zap.Logger

cache *filterCache
cache *MessageCache
runner *runnerService
}

// Start starts the RelayService
// Start starts the FilterService
func (s *FilterService) Start(ctx context.Context) {

for _, sub := range s.node.FilterLightnode().Subscriptions() {
s.cache.subscribe(sub.ContentFilter)
err := s.cache.Subscribe(sub.ContentFilter)
if err != nil {
s.log.Error("subscribing cache failed", zap.Error(err))
}
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -81,7 +83,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
s := &FilterService{
node: node,
log: logger,
cache: newFilterCache(cacheCapacity, logger),
cache: NewMessageCache(cacheCapacity, logger),
}

m.Get(filterv2Ping, s.ping)
Expand All @@ -91,7 +93,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic)
m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic)

s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
s.runner = newRunnerService(node.Broadcaster(), s.cache.AddMessage)

return s
}
Expand Down Expand Up @@ -180,7 +182,11 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
}

// on success
s.cache.subscribe(contentFilter)
err = s.cache.Subscribe(contentFilter)
if err != nil {
s.log.Error("subscribing cache failed", zap.Error(err))
}

writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: http.StatusText(http.StatusOK),
Expand Down Expand Up @@ -224,7 +230,10 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
// on success
for cTopic := range contentFilter.ContentTopics {
if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) {
s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic)
err = s.cache.Unsubscribe(protocol.NewContentFilter(contentFilter.PubsubTopic, cTopic))
if err != nil {
s.log.Error("unsubscribe cache failed", zap.Error(err))
}
}
}
writeResponse(w, filterSubscriptionResponse{
Expand Down Expand Up @@ -357,7 +366,7 @@ func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *htt
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) {
msgs, err := s.cache.getMessages(pubsubTopic, contentTopic)
msgs, err := s.cache.GetMessages(pubsubTopic, contentTopic)
if err != nil {
writeGetMessageErr(w, err, http.StatusNotFound, s.log)
return
Expand Down
84 changes: 0 additions & 84 deletions cmd/waku/server/rest/filter_cache.go

This file was deleted.

Loading