Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nwaku): add protected topics #6220

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 63 additions & 14 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ package wakuv2
resp) );
}

static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) {
WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx,
clusterId,
shardId,
publicKey,
(WakuCallBack) GoCallback,
resp) );
}

static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {

WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
Expand Down Expand Up @@ -685,13 +694,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon
opts = append(opts, node.WithMessageProvider(dbStore))
}

if appDB != nil {
waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
if err != nil {
return nil, err
}
}

waku.options = opts

waku.logger.Info("setup the go-waku node successfully")
Expand Down Expand Up @@ -1001,17 +1003,24 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P

topic = w.GetPubsubTopic(topic)

/* TODO nwaku
rs, err := protocol.TopicsToRelayShards(topic)
if err != nil {
return err
}

if len(rs) == 0 {
w.logger.Warn("could not obtain shards from topic", zap.String("topic", topic))
return nil
}

if pubkey != nil {
err := w.node.Relay().AddSignedTopicValidator(topic, pubkey)
err := w.node.RelayAddProtectedShard(rs[0].ClusterID, rs[0].ShardIDs[0], pubkey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also make sure rs[0].ShardIDs.len != 0

if err != nil {
return err
}
}
*/

err := w.node.RelaySubscribe(topic)
err = w.node.RelaySubscribe(topic)
if err != nil {
return err
}
Expand Down Expand Up @@ -1729,14 +1738,14 @@ func (w *Waku) Stop() error {
return err
}

/* TODO-nwaku
if w.protectedTopicStore != nil {
err := w.protectedTopicStore.Close()
if err != nil {
return err
}
}

/* TODO-nwaku
close(w.goingOnline)*/

w.wg.Wait()
Expand Down Expand Up @@ -1971,7 +1980,6 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return w.protectedTopicStore.FetchPrivateKey(topic)
}

/* TODO-nwaku
func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
topic = w.GetPubsubTopic(topic)
if w.protectedTopicStore == nil {
Expand All @@ -1988,7 +1996,7 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error {
}

return w.protectedTopicStore.Delete(topic)
} */
}

func (w *Waku) handleNetworkChangeFromApp(state connection.State) {
// TODO-nwaku
Expand Down Expand Up @@ -2355,6 +2363,15 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
return nil, err
}

var protectedTopicStore *persistence.ProtectedTopicsStore
if appDB != nil {
protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
if err != nil {
cancel()
return nil, err
}
}

// Notice that the events for self node are handled by the 'MyEventCallback' method

return &Waku{
Expand Down Expand Up @@ -2384,6 +2401,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
filters: common.NewFilters(cfg.DefaultShardPubsubTopic, logger),
protectedTopicStore: protectedTopicStore,
}, nil

}
Expand Down Expand Up @@ -2642,6 +2660,37 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
return errors.New(errMsg)
}

func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error {
if pubkey == nil {
return nil // Nothing to do here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to return an error in this case?

}

keyHexStr := hex.EncodeToString(crypto.FromECDSAPub(pubkey))

wg := sync.WaitGroup{}

var resp = C.allocResp(unsafe.Pointer(&wg))
var cPublicKey = C.CString(keyHexStr)

defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPublicKey))

if n.wakuCtx == nil {
return errors.New("wakuCtx is nil")
}

wg.Add(1)
C.cGoWakuRelayAddProtectedShard(n.wakuCtx, C.int(clusterId), C.int(shardId), cPublicKey, resp)
wg.Wait()

if C.getRet(resp) == C.RET_OK {
return nil
}

errMsg := "error WakuRelayAddProtectedShard: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}

func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
Expand Down
2 changes: 1 addition & 1 deletion wakuv2/nwaku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestBasicWakuV2(t *testing.T) {
}

nwakuConfig := WakuConfig{
Port: 30303,
TcpPort: 30303,
NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
EnableRelay: true,
LogLevel: "DEBUG",
Expand Down
Loading