Skip to content

Commit

Permalink
feat(nwaku): add protected topics
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Dec 16, 2024
1 parent a87bed5 commit a17c2bc
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
72 changes: 58 additions & 14 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ package wakuv2
resp) );
}
static void cGoWakuRelayAddProtectedTopic(void* wakuCtx, char* pubSubTopic, char* publicKey, void* resp) {
WAKU_CALL ( waku_relay_add_protected_topic(wakuCtx,
pubSubTopic,
publicKey,
(WakuCallBack) GoCallback,
resp) );
}
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
Expand Down Expand Up @@ -645,13 +653,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon
opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1)))
}
if appDB != nil {
waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
if err != nil {
return nil, err
}
}
if cfg.EnablePeerExchangeServer {
opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1)))
}
Expand Down Expand Up @@ -965,15 +966,12 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P

topic = w.GetPubsubTopic(topic)

/* TODO nwaku
if pubkey != nil {
err := w.node.Relay().AddSignedTopicValidator(topic, pubkey)
err := w.node.RelayAddProtectedTopic(topic, pubkey)
if err != nil {
return err
}
}
*/

err := w.node.RelaySubscribe(topic)
if err != nil {
Expand Down Expand Up @@ -1693,14 +1691,14 @@ func (w *Waku) Stop() error {
return err
}

/* TODO-nwaku
if w.protectedTopicStore != nil {
err := w.protectedTopicStore.Close()
if err != nil {
return err
}
}

/* TODO-nwaku
close(w.goingOnline)*/

w.wg.Wait()
Expand Down Expand Up @@ -1935,7 +1933,6 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return w.protectedTopicStore.FetchPrivateKey(topic)
}

/* TODO-nwaku
func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
topic = w.GetPubsubTopic(topic)
if w.protectedTopicStore == nil {
Expand All @@ -1952,7 +1949,7 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error {
}

return w.protectedTopicStore.Delete(topic)
} */
}

func (w *Waku) handleNetworkChangeFromApp(state connection.State) {
// TODO-nwaku
Expand Down Expand Up @@ -2306,6 +2303,15 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
return nil, err
}

var protectedTopicStore *persistence.ProtectedTopicsStore
if appDB != nil {
protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
if err != nil {
cancel()
return nil, err
}
}

// Notice that the events for self node are handled by the 'MyEventCallback' method

return &Waku{
Expand Down Expand Up @@ -2335,6 +2341,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
filters: common.NewFilters(cfg.DefaultShardPubsubTopic, logger),
protectedTopicStore: protectedTopicStore,
}, nil

}
Expand Down Expand Up @@ -2593,6 +2600,43 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
return errors.New(errMsg)
}

func (n *WakuNode) RelayAddProtectedTopic(pubsubTopic string, pubkey *ecdsa.PublicKey) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
}

if pubkey == nil {
return nil // Nothing to do here
}

keyHexStr := hex.EncodeToString(crypto.FromECDSAPub(pubkey))

wg := sync.WaitGroup{}

var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
var cPublicKey = C.CString(keyHexStr)

defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
defer C.free(unsafe.Pointer(cPublicKey))

if n.wakuCtx == nil {
return errors.New("wakuCtx is nil")
}

wg.Add(1)
C.cGoWakuRelayAddProtectedTopic(n.wakuCtx, cPubsubTopic, cPublicKey, resp)
wg.Wait()

if C.getRet(resp) == C.RET_OK {
return nil
}

errMsg := "error WakuRelayAddProtectedTopic: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}

func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
Expand Down

0 comments on commit a17c2bc

Please sign in to comment.