diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index d8eeae71b..56a8def3c 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -549,7 +549,7 @@ var ( }) RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ Name: "rest-relay-cache-capacity", - Value: 30, + Value: 1000, Usage: "Capacity of the Relay REST API message cache", Destination: &options.RESTServer.RelayCacheCapacity, EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"}, diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index cefcbf27b..63922b539 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -124,27 +124,34 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { return } var response []*RestWakuMessage - select { - case envelope, open := <-sub.Ch: - if !open { - r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) - w.WriteHeader(http.StatusNotFound) - _, err = w.Write([]byte("consume channel is closed for subscription")) - if err != nil { - r.log.Error("writing response", zap.Error(err)) - } - return + done := false + for { + if done || len(response) > int(r.cacheCapacity) { + break } + select { + case envelope, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("consume channel is closed for subscription")) + if err != nil { + r.log.Error("writing response", zap.Error(err)) + } + return + } - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + } else { + response = append(response, message) + } + case <-req.Context().Done(): + done = true + default: + done = true } - - default: - break } writeErrOrResponse(w, nil, response) @@ -240,16 +247,24 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques return } var response []*RestWakuMessage - select { - case envelope := <-sub.Ch: - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) + done := false + for { + if done || len(response) > int(r.cacheCapacity) { + break + } + select { + case envelope := <-sub.Ch: + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + } else { + response = append(response, message) + } + case <-req.Context().Done(): + done = true + default: + done = true } - default: - break } writeErrOrResponse(w, nil, response)