Skip to content

Commit

Permalink
fix(rest): use custom struct for messages instead of protobuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 9, 2023
1 parent 3d217ed commit 3d7d4db
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 50 deletions.
9 changes: 4 additions & 5 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ func (r *FilterService) Stop() {

// NewFilterService returns an instance of FilterService
func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService {
logger := log.Named("filter")

s := &FilterService{
node: node,
log: log.Named("filter"),
cache: newFilterCache(cacheCapacity),
log: logger,
cache: newFilterCache(cacheCapacity, logger),
}

m.Get(filterv2Ping, s.ping)
Expand Down Expand Up @@ -130,9 +132,6 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
}, http.StatusOK)
}

///////////////////////
///////////////////////

// same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct {
RequestId filterRequestId `json:"requestId"`
Expand Down
26 changes: 17 additions & 9 deletions cmd/waku/server/rest/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ import (
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
)

type filterCache struct {
capacity int
mu sync.RWMutex
data map[string]map[string][]*pb.WakuMessage
log *zap.Logger
data map[string]map[string][]*RestWakuMessage
}

func newFilterCache(capacity int) *filterCache {
func newFilterCache(capacity int, log *zap.Logger) *filterCache {
return &filterCache{
capacity: capacity,
data: make(map[string]map[string][]*pb.WakuMessage),
data: make(map[string]map[string][]*RestWakuMessage),
log: log.Named("cache"),
}
}

Expand All @@ -28,11 +30,11 @@ func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage)
c.data[pubsubTopic] = make(map[string][]*RestWakuMessage)
}
for _, topic := range contentTopics {
if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{}
c.data[pubsubTopic][topic] = []*RestWakuMessage{}
}
}
}
Expand Down Expand Up @@ -60,17 +62,23 @@ func (c *filterCache) addMessage(envelope *protocol.Envelope) {
c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:]
}

c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message())
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}

c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message)
}

func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) {
func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{}
c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil
}
14 changes: 9 additions & 5 deletions cmd/waku/server/rest/lightpush_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -38,8 +37,8 @@ func (msg lightpushRequest) Check() error {
}

type lightpushRequest struct {
PubSubTopic string `json:"pubsubTopic"`
Message *pb.WakuMessage `json:"message"`
PubSubTopic string `json:"pubsubTopic"`
Message *RestWakuMessage `json:"message"`
}

// handled error codes are 200, 400, 500, 503
Expand All @@ -58,14 +57,19 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req
serv.log.Error("writing response", zap.Error(err))
return
}
//

if serv.node.Lightpush() == nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

_, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic))
message, err := msg.Message.ToProto()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

_, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(msg.PubSubTopic))
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, err = w.Write([]byte(err.Error()))
Expand Down
3 changes: 1 addition & 2 deletions cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)

Expand Down Expand Up @@ -46,7 +45,7 @@ func TestLightpushMessagev1(t *testing.T) {

msg := lightpushRequest{
PubSubTopic: pubSubTopic,
Message: &pb.WakuMessage{
Message: &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(),
Expand Down
46 changes: 46 additions & 0 deletions cmd/waku/server/rest/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package rest

import (
"errors"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

type RestWakuMessage struct {
Payload server.Base64URLByte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}

func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error {
if err := input.Validate(); err != nil {
return err
}

r.Payload = input.Payload
r.ContentTopic = input.ContentTopic
r.Timestamp = input.Timestamp
r.Version = input.Version
r.Meta = input.Meta

return nil
}

func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) {
if r == nil {
return nil, errors.New("wakumessage is missing")
}

msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Meta: r.Meta,
}

return msg, nil
}
49 changes: 36 additions & 13 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -124,9 +123,9 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
var response []*RestWakuMessage
select {
case msg, open := <-sub.Ch:
case envelope, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
Expand All @@ -136,7 +135,14 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
}
return
}
response = append(response, msg.Message())

message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}

default:
break
}
Expand All @@ -150,9 +156,9 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
return
}

var message *pb.WakuMessage
var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&message); err != nil {
if err := decoder.Decode(&restMessage); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -162,12 +168,18 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}

message, err := restMessage.ToProto()
if err != nil {
writeErrOrResponse(w, err, nil)
return
}

if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
}

_, err := r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
_, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
}
Expand Down Expand Up @@ -227,10 +239,15 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
var response []*RestWakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
case envelope := <-sub.Ch:
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
default:
break
}
Expand All @@ -240,15 +257,21 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques

func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) {

var message *pb.WakuMessage
var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&message); err != nil {
if err := decoder.Decode(&restMessage); err != nil {
r.log.Error("decoding message failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error

message, err := restMessage.ToProto()
if err != nil {
writeErrOrResponse(w, err, nil)
return
}

if err = server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
Expand Down
20 changes: 10 additions & 10 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ type StoreResponse struct {
}

type HistoryCursor struct {
PubsubTopic string `json:"pubsub_topic"`
SenderTime string `json:"sender_time"`
StoreTime string `json:"store_time"`
PubsubTopic string `json:"pubsubTopic"`
SenderTime string `json:"senderTime"`
StoreTime string `json:"storeTime"`
Digest []byte `json:"digest"`
}

type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Meta []byte `json:"meta"`
Payload []byte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}

const routeStoreMessagesV1 = "/store/v1/messages"
Expand Down Expand Up @@ -180,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: m.GetVersion(),
Timestamp: m.GetTimestamp(),
Version: m.Version,
Timestamp: m.Timestamp,
Meta: m.Meta,
})
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/waku/server/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"errors"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb"

Expand All @@ -20,12 +21,12 @@ type RateLimitProof struct {
}

type RPCWakuMessage struct {
Payload Base64URLByte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp,omitempty"`
RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"`
Ephemeral bool `json:"ephemeral,omitempty"`
Payload server.Base64URLByte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp,omitempty"`
RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"`
Ephemeral bool `json:"ephemeral,omitempty"`
}

func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) {
Expand Down
Loading

0 comments on commit 3d7d4db

Please sign in to comment.