Skip to content

Commit

Permalink
feat(rest): use cache for relay API
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 10, 2023
1 parent b6d9e3d commit 773fa14
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 151 deletions.
141 changes: 141 additions & 0 deletions cmd/waku/server/rest/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package rest

import (
"fmt"
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)

type MessageCache struct {
capacity int
mu sync.RWMutex
log *zap.Logger
messages map[string]map[string][]*RestWakuMessage
messagesByPubsubTopic map[string][]*RestWakuMessage
}

func NewMessageCache(capacity int, log *zap.Logger) *MessageCache {
return &MessageCache{
capacity: capacity,
messages: make(map[string]map[string][]*RestWakuMessage),
messagesByPubsubTopic: make(map[string][]*RestWakuMessage),
log: log.Named("cache"),
}
}

func (c *MessageCache) Subscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Cache all messages that match a pubsub topic (but no content topic specified)
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
if _, ok := c.messages[contentFilter.PubsubTopic]; !ok {
c.messagesByPubsubTopic[contentFilter.PubsubTopic] = []*RestWakuMessage{}
}
} else {
// Cache messages that match a content topic, or pubsub topic + content topic
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.messages[pubsubTopic] == nil {
c.messages[pubsubTopic] = make(map[string][]*RestWakuMessage)
}

for _, topic := range contentTopics {
if c.messages[pubsubTopic][topic] == nil {
c.messages[pubsubTopic][topic] = []*RestWakuMessage{}
}
}
}
}

return nil
}

func (c *MessageCache) Unsubscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Stop caching all messages that match a pubsub topic
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
delete(c.messagesByPubsubTopic, contentFilter.PubsubTopic)
} else {
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
for _, contentTopic := range contentTopics {
delete(c.messages[pubsubTopic], contentTopic)
}
}
}

return nil
}

func (c *MessageCache) AddMessage(envelope *protocol.Envelope) {
c.mu.Lock()
defer c.mu.Unlock()

pubsubTopic := envelope.PubsubTopic()
contentTopic := envelope.Message().ContentTopic

message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}

if _, ok := c.messagesByPubsubTopic[pubsubTopic]; ok {
c.messagesByPubsubTopic[pubsubTopic] = append(c.messagesByPubsubTopic[pubsubTopic], message)
// Keep a specific max number of message per topic
if len(c.messagesByPubsubTopic[pubsubTopic]) >= c.capacity {
c.messagesByPubsubTopic[pubsubTopic] = c.messagesByPubsubTopic[pubsubTopic][1:]
}
}

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return
}

// Keep a specific max number of message per topic
if len(c.messages[pubsubTopic][contentTopic]) >= c.capacity {
c.messages[pubsubTopic][contentTopic] = c.messages[pubsubTopic][contentTopic][1:]
}

c.messages[pubsubTopic][contentTopic] = append(c.messages[pubsubTopic][contentTopic], message)
}

func (c *MessageCache) GetMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.messages[pubsubTopic][contentTopic]
c.messages[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil
}

func (c *MessageCache) GetMessagesWithPubsubTopic(pubsubTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messagesByPubsubTopic[pubsubTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s", pubsubTopic)
}

return c.messagesByPubsubTopic[pubsubTopic], nil
}
27 changes: 18 additions & 9 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ type FilterService struct {

log *zap.Logger

cache *filterCache
cache *MessageCache
runner *runnerService
}

// Start starts the RelayService
// Start starts the FilterService
func (s *FilterService) Start(ctx context.Context) {

for _, sub := range s.node.FilterLightnode().Subscriptions() {
s.cache.subscribe(sub.ContentFilter)
err := s.cache.Subscribe(sub.ContentFilter)
if err != nil {
s.log.Error("subscribing cache failed", zap.Error(err))
}
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -81,7 +83,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
s := &FilterService{
node: node,
log: logger,
cache: newFilterCache(cacheCapacity, logger),
cache: NewMessageCache(cacheCapacity, logger),
}

m.Get(filterv2Ping, s.ping)
Expand All @@ -91,7 +93,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic)
m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic)

s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
s.runner = newRunnerService(node.Broadcaster(), s.cache.AddMessage)

return s
}
Expand Down Expand Up @@ -180,7 +182,11 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
}

// on success
s.cache.subscribe(contentFilter)
err = s.cache.Subscribe(contentFilter)
if err != nil {
s.log.Error("subscribing cache failed", zap.Error(err))
}

writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: http.StatusText(http.StatusOK),
Expand Down Expand Up @@ -224,7 +230,10 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
// on success
for cTopic := range contentFilter.ContentTopics {
if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) {
s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic)
err = s.cache.Unsubscribe(protocol.NewContentFilter(contentFilter.PubsubTopic, cTopic))
if err != nil {
s.log.Error("unsubscribe cache failed", zap.Error(err))
}
}
}
writeResponse(w, filterSubscriptionResponse{
Expand Down Expand Up @@ -357,7 +366,7 @@ func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *htt
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) {
msgs, err := s.cache.getMessages(pubsubTopic, contentTopic)
msgs, err := s.cache.GetMessages(pubsubTopic, contentTopic)
if err != nil {
writeGetMessageErr(w, err, http.StatusNotFound, s.log)
return
Expand Down
84 changes: 0 additions & 84 deletions cmd/waku/server/rest/filter_cache.go

This file was deleted.

Loading

0 comments on commit 773fa14

Please sign in to comment.