diff --git a/cmd/waku/server/rest/cache.go b/cmd/waku/server/rest/cache.go new file mode 100644 index 000000000..4a5f22a64 --- /dev/null +++ b/cmd/waku/server/rest/cache.go @@ -0,0 +1,141 @@ +package rest + +import ( + "fmt" + "sync" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" +) + +type MessageCache struct { + capacity int + mu sync.RWMutex + log *zap.Logger + messages map[string]map[string][]*RestWakuMessage + messagesByPubsubTopic map[string][]*RestWakuMessage +} + +func NewMessageCache(capacity int, log *zap.Logger) *MessageCache { + return &MessageCache{ + capacity: capacity, + messages: make(map[string]map[string][]*RestWakuMessage), + messagesByPubsubTopic: make(map[string][]*RestWakuMessage), + log: log.Named("cache"), + } +} + +func (c *MessageCache) Subscribe(contentFilter protocol.ContentFilter) error { + c.mu.Lock() + defer c.mu.Unlock() + + if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 { + // Cache all messages that match a pubsub topic (but no content topic specified) + // Used with named sharding + // Eventually this must be modified once API changes to receive content topics too + if _, ok := c.messages[contentFilter.PubsubTopic]; !ok { + c.messagesByPubsubTopic[contentFilter.PubsubTopic] = []*RestWakuMessage{} + } + } else { + // Cache messages that match a content topic, or pubsub topic + content topic + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return err + } + + for pubsubTopic, contentTopics := range pubSubTopicMap { + if c.messages[pubsubTopic] == nil { + c.messages[pubsubTopic] = make(map[string][]*RestWakuMessage) + } + + for _, topic := range contentTopics { + if c.messages[pubsubTopic][topic] == nil { + c.messages[pubsubTopic][topic] = []*RestWakuMessage{} + } + } + } + } + + return nil +} + +func (c *MessageCache) Unsubscribe(contentFilter protocol.ContentFilter) error { + c.mu.Lock() + defer c.mu.Unlock() + + if contentFilter.PubsubTopic != "" && len(contentFilter.ContentTopics) == 0 { + // Stop caching all messages that match a pubsub topic + // Used with named sharding + // Eventually this must be modified once API changes to receive content topics too + delete(c.messagesByPubsubTopic, contentFilter.PubsubTopic) + } else { + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return err + } + + for pubsubTopic, contentTopics := range pubSubTopicMap { + for _, contentTopic := range contentTopics { + delete(c.messages[pubsubTopic], contentTopic) + } + } + } + + return nil +} + +func (c *MessageCache) AddMessage(envelope *protocol.Envelope) { + c.mu.Lock() + defer c.mu.Unlock() + + pubsubTopic := envelope.PubsubTopic() + contentTopic := envelope.Message().ContentTopic + + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + c.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + return + } + + if _, ok := c.messagesByPubsubTopic[pubsubTopic]; ok { + c.messagesByPubsubTopic[pubsubTopic] = append(c.messagesByPubsubTopic[pubsubTopic], message) + // Keep a specific max number of message per topic + if len(c.messagesByPubsubTopic[pubsubTopic]) >= c.capacity { + c.messagesByPubsubTopic[pubsubTopic] = c.messagesByPubsubTopic[pubsubTopic][1:] + } + } + + if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil { + return + } + + // Keep a specific max number of message per topic + if len(c.messages[pubsubTopic][contentTopic]) >= c.capacity { + c.messages[pubsubTopic][contentTopic] = c.messages[pubsubTopic][contentTopic][1:] + } + + c.messages[pubsubTopic][contentTopic] = append(c.messages[pubsubTopic][contentTopic], message) +} + +func (c *MessageCache) GetMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.messages[pubsubTopic] == nil || c.messages[pubsubTopic][contentTopic] == nil { + return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) + } + msgs := c.messages[pubsubTopic][contentTopic] + c.messages[pubsubTopic][contentTopic] = []*RestWakuMessage{} + return msgs, nil +} + +func (c *MessageCache) GetMessagesWithPubsubTopic(pubsubTopic string) ([]*RestWakuMessage, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.messagesByPubsubTopic[pubsubTopic] == nil { + return nil, fmt.Errorf("not subscribed to pubsubTopic:%s", pubsubTopic) + } + + return c.messagesByPubsubTopic[pubsubTopic], nil +} diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 7718171c4..8d38a3981 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -50,15 +50,17 @@ type FilterService struct { log *zap.Logger - cache *filterCache + cache *MessageCache runner *runnerService } -// Start starts the RelayService +// Start starts the FilterService func (s *FilterService) Start(ctx context.Context) { - for _, sub := range s.node.FilterLightnode().Subscriptions() { - s.cache.subscribe(sub.ContentFilter) + err := s.cache.Subscribe(sub.ContentFilter) + if err != nil { + s.log.Error("subscribing cache failed", zap.Error(err)) + } } ctx, cancel := context.WithCancel(ctx) @@ -81,7 +83,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z s := &FilterService{ node: node, log: logger, - cache: newFilterCache(cacheCapacity, logger), + cache: NewMessageCache(cacheCapacity, logger), } m.Get(filterv2Ping, s.ping) @@ -91,7 +93,7 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic) m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic) - s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage) + s.runner = newRunnerService(node.Broadcaster(), s.cache.AddMessage) return s } @@ -180,7 +182,11 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { } // on success - s.cache.subscribe(contentFilter) + err = s.cache.Subscribe(contentFilter) + if err != nil { + s.log.Error("subscribing cache failed", zap.Error(err)) + } + writeResponse(w, filterSubscriptionResponse{ RequestId: message.RequestId, StatusDesc: http.StatusText(http.StatusOK), @@ -224,7 +230,10 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { // on success for cTopic := range contentFilter.ContentTopics { if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) { - s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic) + err = s.cache.Unsubscribe(protocol.NewContentFilter(contentFilter.PubsubTopic, cTopic)) + if err != nil { + s.log.Error("unsubscribe cache failed", zap.Error(err)) + } } } writeResponse(w, filterSubscriptionResponse{ @@ -357,7 +366,7 @@ func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *htt // 200 on all successful unsubscribe // unsubscribe all subscriptions for a given peer func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) { - msgs, err := s.cache.getMessages(pubsubTopic, contentTopic) + msgs, err := s.cache.GetMessages(pubsubTopic, contentTopic) if err != nil { writeGetMessageErr(w, err, http.StatusNotFound, s.log) return diff --git a/cmd/waku/server/rest/filter_cache.go b/cmd/waku/server/rest/filter_cache.go deleted file mode 100644 index 77144435e..000000000 --- a/cmd/waku/server/rest/filter_cache.go +++ /dev/null @@ -1,84 +0,0 @@ -package rest - -import ( - "fmt" - "sync" - - "github.com/waku-org/go-waku/waku/v2/protocol" - "go.uber.org/zap" -) - -type filterCache struct { - capacity int - mu sync.RWMutex - log *zap.Logger - data map[string]map[string][]*RestWakuMessage -} - -func newFilterCache(capacity int, log *zap.Logger) *filterCache { - return &filterCache{ - capacity: capacity, - data: make(map[string]map[string][]*RestWakuMessage), - log: log.Named("cache"), - } -} - -func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) { - c.mu.Lock() - defer c.mu.Unlock() - - pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) - for pubsubTopic, contentTopics := range pubSubTopicMap { - if c.data[pubsubTopic] == nil { - c.data[pubsubTopic] = make(map[string][]*RestWakuMessage) - } - for _, topic := range contentTopics { - if c.data[pubsubTopic][topic] == nil { - c.data[pubsubTopic][topic] = []*RestWakuMessage{} - } - } - } -} - -func (c *filterCache) unsubscribe(pubsubTopic string, contentTopic string) { - c.mu.Lock() - defer c.mu.Unlock() - - delete(c.data[pubsubTopic], contentTopic) -} - -func (c *filterCache) addMessage(envelope *protocol.Envelope) { - c.mu.Lock() - defer c.mu.Unlock() - - pubsubTopic := envelope.PubsubTopic() - contentTopic := envelope.Message().ContentTopic - if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { - return - } - - // Keep a specific max number of message per topic - if len(c.data[pubsubTopic][contentTopic]) >= c.capacity { - c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:] - } - - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - c.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - return - } - - c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message) -} - -func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { - return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) - } - msgs := c.data[pubsubTopic][contentTopic] - c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{} - return msgs, nil -} diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index cefcbf27b..f5afb5e69 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -25,15 +25,17 @@ type RelayService struct { log *zap.Logger - cacheCapacity uint + cache *MessageCache } // NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService { +func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { + logger := log.Named("relay") + s := &RelayService{ - node: node, - log: log.Named("relay"), - cacheCapacity: cacheCapacity, + node: node, + log: logger, + cache: NewMessageCache(cacheCapacity, logger), } m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) @@ -63,9 +65,15 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re var err error for _, topic := range topics { - err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) + contentFilter := protocol.NewContentFilter(topic) + err = r.node.Relay().Unsubscribe(req.Context(), contentFilter) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) + } else { + err = r.cache.Unsubscribe(contentFilter) + if err != nil { + r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) + } } } @@ -90,13 +98,30 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } else { topicToSubscribe = topic } - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + contentFilter := protocol.NewContentFilter(topicToSubscribe) + subscriptions, err := r.node.Relay().Subscribe(r.node.Relay().Context(), contentFilter) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) continue } + + err = r.cache.Subscribe(contentFilter) + if err != nil { + r.log.Error("subscribing cache to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) + continue + } + successCnt++ + + for _, sub := range subscriptions { + go func(sub *relay.Subscription) { + for msg := range sub.Ch { + r.cache.AddMessage(msg) + } + }(sub) + } + } // on partial subscribe failure @@ -111,43 +136,21 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { - topic := topicFromPath(w, req, "topic", r.log) - if topic == "" { + pubsubTopic := topicFromPath(w, req, "topic", r.log) + if pubsubTopic == "" { return } + //TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well. - sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "") + messages, err := r.cache.GetMessagesWithPubsubTopic(pubsubTopic) if err != nil { w.WriteHeader(http.StatusNotFound) _, err = w.Write([]byte("not subscribed to topic")) r.log.Error("writing response", zap.Error(err)) return } - var response []*RestWakuMessage - select { - case envelope, open := <-sub.Ch: - if !open { - r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) - w.WriteHeader(http.StatusNotFound) - _, err = w.Write([]byte("consume channel is closed for subscription")) - if err != nil { - r.log.Error("writing response", zap.Error(err)) - } - return - } - - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) - } - - default: - break - } - writeErrOrResponse(w, nil, response) + writeErrOrResponse(w, nil, messages) } func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { @@ -196,9 +199,15 @@ func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *htt } defer req.Body.Close() - err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...)) + contentFilter := protocol.NewContentFilter("", cTopics...) + err := r.node.Relay().Unsubscribe(req.Context(), contentFilter) if err != nil { r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } else { + err = r.cache.Unsubscribe(contentFilter) + if err != nil { + r.log.Error("unsubscribing cache", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } } writeErrOrResponse(w, err, true) @@ -213,46 +222,53 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. } defer req.Body.Close() - var err error - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity)) + contentFilter := protocol.NewContentFilter("", cTopics...) + subscriptions, err := r.node.Relay().Subscribe(r.node.Relay().Context(), contentFilter) if err != nil { r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) - } - - if err != nil { w.WriteHeader(http.StatusBadRequest) _, err := w.Write([]byte(err.Error())) r.log.Error("writing response", zap.Error(err)) - } else { - writeErrOrResponse(w, err, true) + return + } + + err = r.cache.Subscribe(contentFilter) + if err != nil { + r.log.Error("subscribing cache failed", zap.Error(err)) + } + + for _, sub := range subscriptions { + go func(sub *relay.Subscription) { + for msg := range sub.Ch { + r.cache.AddMessage(msg) + } + }(sub) } + writeErrOrResponse(w, nil, true) + } func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { + contentTopic := topicFromPath(w, req, "contentTopic", r.log) + + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err = w.Write([]byte("could not determine pubsubtopic")) + r.log.Error("writing response", zap.Error(err)) + return + } - cTopic := topicFromPath(w, req, "contentTopic", r.log) - sub, err := r.node.Relay().GetSubscription(cTopic) + messages, err := r.cache.GetMessages(pubsubTopic, contentTopic) if err != nil { w.WriteHeader(http.StatusNotFound) _, err = w.Write([]byte("not subscribed to topic")) r.log.Error("writing response", zap.Error(err)) return } - var response []*RestWakuMessage - select { - case envelope := <-sub.Ch: - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) - } - default: - break - } - writeErrOrResponse(w, nil, response) + writeErrOrResponse(w, nil, messages) } func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 3126f37eb..c1aeda1c4 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -59,7 +59,7 @@ func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuR wrpc.server = server if node.Relay() != nil { - relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log) + relayService := NewRelayService(node, mux, int(config.RelayCacheCapacity), log) wrpc.relayService = relayService }