From 3d7d4dba2a46464b4e1dcfdc2fdeea0fb43ec9ba Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 9 Nov 2023 09:34:29 -0400 Subject: [PATCH] fix(rest): use custom struct for messages instead of protobuffer --- cmd/waku/server/rest/filter.go | 9 ++-- cmd/waku/server/rest/filter_cache.go | 26 +++++++---- cmd/waku/server/rest/lightpush_rest.go | 14 +++--- cmd/waku/server/rest/lightpush_rest_test.go | 3 +- cmd/waku/server/rest/message.go | 46 +++++++++++++++++++ cmd/waku/server/rest/relay.go | 49 +++++++++++++++------ cmd/waku/server/rest/store.go | 20 ++++----- cmd/waku/server/rpc/utils.go | 13 +++--- cmd/waku/server/utils.go | 31 +++++++++++++ 9 files changed, 161 insertions(+), 50 deletions(-) create mode 100644 cmd/waku/server/rest/message.go diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 6394e9b45..7718171c4 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -76,10 +76,12 @@ func (r *FilterService) Stop() { // NewFilterService returns an instance of FilterService func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService { + logger := log.Named("filter") + s := &FilterService{ node: node, - log: log.Named("filter"), - cache: newFilterCache(cacheCapacity), + log: logger, + cache: newFilterCache(cacheCapacity, logger), } m.Get(filterv2Ping, s.ping) @@ -130,9 +132,6 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) { }, http.StatusOK) } -/////////////////////// -/////////////////////// - // same for FilterUnsubscribeRequest type filterSubscriptionRequest struct { RequestId filterRequestId `json:"requestId"` diff --git a/cmd/waku/server/rest/filter_cache.go b/cmd/waku/server/rest/filter_cache.go index 6e684ab8e..77144435e 100644 --- a/cmd/waku/server/rest/filter_cache.go +++ b/cmd/waku/server/rest/filter_cache.go @@ -5,19 +5,21 @@ import ( "sync" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" ) type filterCache struct { capacity int mu sync.RWMutex - data map[string]map[string][]*pb.WakuMessage + log *zap.Logger + data map[string]map[string][]*RestWakuMessage } -func newFilterCache(capacity int) *filterCache { +func newFilterCache(capacity int, log *zap.Logger) *filterCache { return &filterCache{ capacity: capacity, - data: make(map[string]map[string][]*pb.WakuMessage), + data: make(map[string]map[string][]*RestWakuMessage), + log: log.Named("cache"), } } @@ -28,11 +30,11 @@ func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) { pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) for pubsubTopic, contentTopics := range pubSubTopicMap { if c.data[pubsubTopic] == nil { - c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage) + c.data[pubsubTopic] = make(map[string][]*RestWakuMessage) } for _, topic := range contentTopics { if c.data[pubsubTopic][topic] == nil { - c.data[pubsubTopic][topic] = []*pb.WakuMessage{} + c.data[pubsubTopic][topic] = []*RestWakuMessage{} } } } @@ -60,10 +62,16 @@ func (c *filterCache) addMessage(envelope *protocol.Envelope) { c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:] } - c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message()) + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + c.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + return + } + + c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message) } -func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) { +func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -71,6 +79,6 @@ func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*p return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) } msgs := c.data[pubsubTopic][contentTopic] - c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{} + c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{} return msgs, nil } diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index e33f4b7a2..b52460d6c 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -8,7 +8,6 @@ import ( "github.com/go-chi/chi/v5" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" ) @@ -38,8 +37,8 @@ func (msg lightpushRequest) Check() error { } type lightpushRequest struct { - PubSubTopic string `json:"pubsubTopic"` - Message *pb.WakuMessage `json:"message"` + PubSubTopic string `json:"pubsubTopic"` + Message *RestWakuMessage `json:"message"` } // handled error codes are 200, 400, 500, 503 @@ -58,14 +57,19 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req serv.log.Error("writing response", zap.Error(err)) return } - // if serv.node.Lightpush() == nil { w.WriteHeader(http.StatusServiceUnavailable) return } - _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) + message, err := msg.Message.ToProto() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + _, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(msg.PubSubTopic)) if err != nil { w.WriteHeader(http.StatusServiceUnavailable) _, err = w.Write([]byte(err.Error())) diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index 69a71d497..95b89e660 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -14,7 +14,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -46,7 +45,7 @@ func TestLightpushMessagev1(t *testing.T) { msg := lightpushRequest{ PubSubTopic: pubSubTopic, - Message: &pb.WakuMessage{ + Message: &RestWakuMessage{ Payload: []byte{1, 2, 3}, ContentTopic: "abc", Timestamp: utils.GetUnixEpoch(), diff --git a/cmd/waku/server/rest/message.go b/cmd/waku/server/rest/message.go new file mode 100644 index 000000000..55521ac09 --- /dev/null +++ b/cmd/waku/server/rest/message.go @@ -0,0 +1,46 @@ +package rest + +import ( + "errors" + + "github.com/waku-org/go-waku/cmd/waku/server" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +type RestWakuMessage struct { + Payload server.Base64URLByte `json:"payload"` + ContentTopic string `json:"contentTopic"` + Version *uint32 `json:"version,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + Meta []byte `json:"meta,omitempty"` +} + +func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error { + if err := input.Validate(); err != nil { + return err + } + + r.Payload = input.Payload + r.ContentTopic = input.ContentTopic + r.Timestamp = input.Timestamp + r.Version = input.Version + r.Meta = input.Meta + + return nil +} + +func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) { + if r == nil { + return nil, errors.New("wakumessage is missing") + } + + msg := &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Version: r.Version, + Timestamp: r.Timestamp, + Meta: r.Meta, + } + + return msg, nil +} diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 0da7cc0f6..cefcbf27b 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -9,7 +9,6 @@ import ( "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -124,9 +123,9 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { r.log.Error("writing response", zap.Error(err)) return } - var response []*pb.WakuMessage + var response []*RestWakuMessage select { - case msg, open := <-sub.Ch: + case envelope, open := <-sub.Ch: if !open { r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) w.WriteHeader(http.StatusNotFound) @@ -136,7 +135,14 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { } return } - response = append(response, msg.Message()) + + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + } else { + response = append(response, message) + } + default: break } @@ -150,9 +156,9 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { return } - var message *pb.WakuMessage + var restMessage *RestWakuMessage decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&message); err != nil { + if err := decoder.Decode(&restMessage); err != nil { w.WriteHeader(http.StatusBadRequest) return } @@ -162,12 +168,18 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { topic = relay.DefaultWakuTopic } + message, err := restMessage.ToProto() + if err != nil { + writeErrOrResponse(w, err, nil) + return + } + if err := server.AppendRLNProof(r.node, message); err != nil { writeErrOrResponse(w, err, nil) return } - _, err := r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) + _, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) if err != nil { r.log.Error("publishing message", zap.Error(err)) } @@ -227,10 +239,15 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques r.log.Error("writing response", zap.Error(err)) return } - var response []*pb.WakuMessage + var response []*RestWakuMessage select { - case msg := <-sub.Ch: - response = append(response, msg.Message()) + case envelope := <-sub.Ch: + message := &RestWakuMessage{} + if err := message.FromProto(envelope.Message()); err != nil { + r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) + } else { + response = append(response, message) + } default: break } @@ -240,15 +257,21 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { - var message *pb.WakuMessage + var restMessage *RestWakuMessage decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&message); err != nil { + if err := decoder.Decode(&restMessage); err != nil { r.log.Error("decoding message failure", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } defer req.Body.Close() - var err error + + message, err := restMessage.ToProto() + if err != nil { + writeErrOrResponse(w, err, nil) + return + } + if err = server.AppendRLNProof(r.node, message); err != nil { writeErrOrResponse(w, err, nil) return diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index b50bc8f1a..ce31e8c35 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -30,18 +30,18 @@ type StoreResponse struct { } type HistoryCursor struct { - PubsubTopic string `json:"pubsub_topic"` - SenderTime string `json:"sender_time"` - StoreTime string `json:"store_time"` + PubsubTopic string `json:"pubsubTopic"` + SenderTime string `json:"senderTime"` + StoreTime string `json:"storeTime"` Digest []byte `json:"digest"` } type StoreWakuMessage struct { - Payload []byte `json:"payload"` - ContentTopic string `json:"content_topic"` - Version uint32 `json:"version"` - Timestamp int64 `json:"timestamp"` - Meta []byte `json:"meta"` + Payload []byte `json:"payload"` + ContentTopic string `json:"contentTopic"` + Version *uint32 `json:"version,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + Meta []byte `json:"meta,omitempty"` } const routeStoreMessagesV1 = "/store/v1/messages" @@ -180,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse { response.Messages = append(response.Messages, StoreWakuMessage{ Payload: m.Payload, ContentTopic: m.ContentTopic, - Version: m.GetVersion(), - Timestamp: m.GetTimestamp(), + Version: m.Version, + Timestamp: m.Timestamp, Meta: m.Meta, }) } diff --git a/cmd/waku/server/rpc/utils.go b/cmd/waku/server/rpc/utils.go index 2b509aa5b..912718979 100644 --- a/cmd/waku/server/rpc/utils.go +++ b/cmd/waku/server/rpc/utils.go @@ -3,6 +3,7 @@ package rpc import ( "errors" + "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/protocol/pb" rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb" @@ -20,12 +21,12 @@ type RateLimitProof struct { } type RPCWakuMessage struct { - Payload Base64URLByte `json:"payload,omitempty"` - ContentTopic string `json:"contentTopic,omitempty"` - Version uint32 `json:"version"` - Timestamp int64 `json:"timestamp,omitempty"` - RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"` - Ephemeral bool `json:"ephemeral,omitempty"` + Payload server.Base64URLByte `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Version uint32 `json:"version"` + Timestamp int64 `json:"timestamp,omitempty"` + RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"` + Ephemeral bool `json:"ephemeral,omitempty"` } func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) { diff --git a/cmd/waku/server/utils.go b/cmd/waku/server/utils.go index 121dd084e..f11cee5c4 100644 --- a/cmd/waku/server/utils.go +++ b/cmd/waku/server/utils.go @@ -1,6 +1,9 @@ package server import ( + "encoding/base64" + "strings" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" @@ -17,3 +20,31 @@ func IsWakuProtocol(protocol protocol.ID) bool { protocol == lightpush.LightPushID_v20beta1 || protocol == store.StoreID_v20beta4 } + +type Base64URLByte []byte + +// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard +// base64 encoded strings with and without padding +func (h *Base64URLByte) UnmarshalText(b []byte) error { + inputValue := "" + if b != nil { + inputValue = string(b) + } + + enc := base64.StdEncoding + if strings.ContainsAny(inputValue, "-_") { + enc = base64.URLEncoding + } + if len(inputValue)%4 != 0 { + enc = enc.WithPadding(base64.NoPadding) + } + + decodedBytes, err := enc.DecodeString(inputValue) + if err != nil { + return err + } + + *h = decodedBytes + + return nil +}