Skip to content

Commit

Permalink
Add messages logging subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
vitvly committed Nov 9, 2023
1 parent 3d217ed commit 2bbfb7a
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/waku/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNo
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()),
utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/waku/rlngenerate/command_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func execute(ctx context.Context) error {

if logger.Level() == zap.DebugLevel {
logger.Info("registered credentials into the membership contract",
logging.HexString("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexString("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexString("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexString("IDTrapDoor", identityCredential.IDTrapdoor[:]),
logging.HexBytes("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexBytes("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexBytes("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexBytes("IDTrapDoor", identityCredential.IDTrapdoor[:]),
zap.Uint("index", membershipIndex),
)
} else {
logger.Info("registered credentials into the membership contract", logging.HexString("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
logger.Info("registered credentials into the membership contract", logging.HexBytes("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
}

web3Config.ETHClient.Close()
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/rlngenerate/web3.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func register(ctx context.Context, web3Config *web3.Config, idComm rln.IDCommitm

var eventIDComm rln.IDCommitment = rln.BigIntToBytes32(evt.IdCommitment)

log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexString("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))
log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexBytes("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))

if eventIDComm != idComm {
return 0, errors.New("invalid id commitment key")
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
}
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
r.log.Warn("could not include message in response", logging.HexBytes("hash", msg.Hash()), zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
Expand Down
10 changes: 0 additions & 10 deletions logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
return nil
}

type hexByte []byte

func HexString(key string, byteVal hexByte) zapcore.Field {
return zap.Stringer(key, hexByte(byteVal))
}

func (h hexByte) String() string {
return "0x" + hex.EncodeToString(h)
}

// List of multiaddrs
type multiaddrs []multiaddr.Multiaddr

Expand Down
21 changes: 10 additions & 11 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -217,20 +218,24 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
handle := func(envelope *protocol.Envelope) error {
msg := envelope.Message()
pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash()))
logger := utils.MessagesLogger().With(logging.HexBytes("hash", envelope.Hash()),
zap.String("pubsubTopic", envelope.PubsubTopic()),
zap.String("contentTopic", envelope.Message().ContentTopic),
)
logger.Debug("push message to filter subscribers")

// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for subscriber := range wf.subscriptions.Items(pubsubTopic, msg.ContentTopic) {
logger := logger.With(logging.HostID("subscriber", subscriber))
logger := logger.With(logging.HostID("peer", subscriber))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
// Do a message push to light node
logger.Info("pushing message to light node")
logger.Debug("pushing message to light node")
wf.WaitGroup().Add(1)
go func(subscriber peer.ID) {
defer wf.WaitGroup().Done()
start := time.Now()
err := wf.pushMessage(ctx, subscriber, envelope)
err := wf.pushMessage(ctx, logger, subscriber, envelope)
if err != nil {
logger.Error("pushing message", zap.Error(err))
return
Expand All @@ -249,13 +254,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
}
}

func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
logger := wf.log.With(
logging.HostID("peer", peerID),
logging.HexBytes("envelopeHash", env.Hash()),
zap.String("pubsubTopic", env.PubsubTopic()),
zap.String("contentTopic", env.Message().ContentTopic),
)
func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logger, peerID peer.ID, env *protocol.Envelope) error {
pubSubTopic := env.PubsubTopic()
messagePush := &pb.MessagePush{
PubsubTopic: &pubSubTopic,
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa

if response.IsSuccess {
hash := message.Hash(params.pubsubTopic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
wakuLP.log.Info("waku.lightpush published", logging.HexBytes("hash", hash))
return hash, nil
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
messageSize.Observe(payloadSizeInKb)
pubsubTopic := envelope.PubsubTopic()
messages.WithLabelValues(pubsubTopic).Inc()
m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexString("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("envelopeHash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
}()
}
9 changes: 6 additions & 3 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// WakuRelayID_v200 is the current protocol ID used for WakuRelay
Expand All @@ -41,7 +42,8 @@ type WakuRelay struct {
timesource timesource.Timesource
metrics Metrics

log *zap.Logger
log *zap.Logger
logMessages *zap.Logger

bcaster Broadcaster

Expand Down Expand Up @@ -80,8 +82,9 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.minPeersToPublish = minPeersToPublish
w.CommonService = service.NewCommonService()
w.log = log.Named("relay")
w.logMessages = utils.MessagesLogger()
w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.log)
w.metrics = newMetrics(reg, w.logMessages)

// default options required by WakuRelay
w.opts = append(w.defaultPubsubOptions(), opts...)
Expand Down Expand Up @@ -276,7 +279,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .

hash := message.Hash(params.pubsubTopic)

w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
w.logMessages.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexBytes("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))

return hash, nil
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/relay/waku_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestWakuRelay(t *testing.T) {
go func() {
defer cancel()
env := <-subs[0].Ch
t.Log("received msg", logging.HexString("hash", env.Hash()))
t.Log("received msg", logging.HexBytes("hash", env.Hash()))
}()

msg := &pb.WakuMessage{
Expand Down
10 changes: 10 additions & 0 deletions waku/v2/utils/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

var log *zap.Logger
var logMessages *zap.Logger

// Logger creates a zap.Logger with some reasonable defaults
func Logger() *zap.Logger {
Expand All @@ -18,6 +19,15 @@ func Logger() *zap.Logger {
return log
}

// MessagesLogger returns a logger used for debug logging of receivent/sent messages
func MessagesLogger() *zap.Logger {
if logMessages == nil {
logMessages = logging.Logger("messages").Desugar()
}

return logMessages
}

// InitLogger initializes a global logger using an specific encoding
func InitLogger(encoding string, output string) {
cfg := logging.GetConfig()
Expand Down

0 comments on commit 2bbfb7a

Please sign in to comment.