Skip to content

Commit

Permalink
feat(rest): use cache for relay API
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 9, 2023
1 parent 3d217ed commit 465a0ba
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 133 deletions.
133 changes: 133 additions & 0 deletions cmd/waku/server/rest/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package rest

import (
"fmt"
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

type MessageCache struct {
capacity int
mu sync.RWMutex
messages map[string]map[string][]*pb.WakuMessage
messagesByPubsubTopic map[string][]*pb.WakuMessage
}

func NewMessageCache(capacity int) *MessageCache {
return &MessageCache{
capacity: capacity,
messages: make(map[string]map[string][]*pb.WakuMessage),
messagesByPubsubTopic: make(map[string][]*pb.WakuMessage),
}
}

func (c *MessageCache) Subscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Cache all messages that match a pubsub topic (but no content topic specified)
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
if _, ok := c.messages[contentFilter.PubsubTopic]; !ok {
c.messagesByPubsubTopic[contentFilter.PubsubTopic] = []*pb.WakuMessage{}
}
} else {
// Cache messages that match a content topic, or pubsub topic + content topic
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.messages[pubsubTopic] == nil {
c.messages[pubsubTopic] = make(map[string][]*pb.WakuMessage)
}

for _, topic := range contentTopics {
if c.messages[pubsubTopic][topic] == nil {
c.messages[pubsubTopic][topic] = []*pb.WakuMessage{}
}
}
}
}

return nil
}

func (c *MessageCache) Unsubscribe(contentFilter protocol.ContentFilter) error {
c.mu.Lock()
defer c.mu.Unlock()

if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 {
// Stop caching all messages that match a pubsub topic
// Used with named sharding
// Eventually this must be modified once API changes to receive content topics too
delete(c.messagesByPubsubTopic, contentFilter.PubsubTopic)
} else {
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}

for pubsubTopic, contentTopics := range pubSubTopicMap {
for _, contentTopic := range contentTopics {
delete(c.messages[pubsubTopic], contentTopic)
}
}
}

return nil
}

func (c *MessageCache) AddMessage(envelope *protocol.Envelope) {
c.mu.Lock()
defer c.mu.Unlock()

pubsubTopic := envelope.PubsubTopic()
contentTopic := envelope.Message().ContentTopic

if _, ok := c.messagesByPubsubTopic[pubsubTopic]; ok {
c.messagesByPubsubTopic[pubsubTopic] = append(c.messagesByPubsubTopic[pubsubTopic], envelope.Message())
// Keep a specific max number of message per topic
if len(c.messagesByPubsubTopic[pubsubTopic]) >= c.capacity {
c.messagesByPubsubTopic[pubsubTopic] = c.messagesByPubsubTopic[pubsubTopic][1:]
}
}

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return
}

// Keep a specific max number of message per topic
if len(c.messages[pubsubTopic][contentTopic]) >= c.capacity {
c.messages[pubsubTopic][contentTopic] = c.messages[pubsubTopic][contentTopic][1:]
}

c.messages[pubsubTopic][contentTopic] = append(c.messages[pubsubTopic][contentTopic], envelope.Message())
}

func (c *MessageCache) GetMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.messages[pubsubTopic][contentTopic]
c.messages[pubsubTopic][contentTopic] = []*pb.WakuMessage{}
return msgs, nil
}

func (c *MessageCache) GetMessagesWithPubsubTopic(pubsubTopic string) ([]*pb.WakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.messagesByPubsubTopic[pubsubTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s", pubsubTopic)
}

return c.messagesByPubsubTopic[pubsubTopic], nil
}
18 changes: 7 additions & 11 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ type FilterService struct {

log *zap.Logger

cache *filterCache
cache *MessageCache
runner *runnerService
}

// Start starts the RelayService
func (s *FilterService) Start(ctx context.Context) {

for _, sub := range s.node.FilterLightnode().Subscriptions() {
s.cache.subscribe(sub.ContentFilter)
s.cache.Subscribe(sub.ContentFilter)
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -79,7 +78,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
s := &FilterService{
node: node,
log: log.Named("filter"),
cache: newFilterCache(cacheCapacity),
cache: NewMessageCache(cacheCapacity),
}

m.Get(filterv2Ping, s.ping)
Expand All @@ -89,7 +88,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic)
m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic)

s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
s.runner = newRunnerService(node.Broadcaster(), s.cache.AddMessage)

return s
}
Expand Down Expand Up @@ -130,9 +129,6 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
}, http.StatusOK)
}

///////////////////////
///////////////////////

// same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct {
RequestId filterRequestId `json:"requestId"`
Expand Down Expand Up @@ -181,7 +177,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
}

// on success
s.cache.subscribe(contentFilter)
s.cache.Subscribe(contentFilter)
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: http.StatusText(http.StatusOK),
Expand Down Expand Up @@ -225,7 +221,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
// on success
for cTopic := range contentFilter.ContentTopics {
if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) {
s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic)
s.cache.Unsubscribe(protocol.NewContentFilter(contentFilter.PubsubTopic, cTopic))
}
}
writeResponse(w, filterSubscriptionResponse{
Expand Down Expand Up @@ -358,7 +354,7 @@ func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *htt
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) {
msgs, err := s.cache.getMessages(pubsubTopic, contentTopic)
msgs, err := s.cache.GetMessages(pubsubTopic, contentTopic)
if err != nil {
writeGetMessageErr(w, err, http.StatusNotFound, s.log)
return
Expand Down
76 changes: 0 additions & 76 deletions cmd/waku/server/rest/filter_cache.go

This file was deleted.

Loading

0 comments on commit 465a0ba

Please sign in to comment.