From 82bf439f62ca6b25dd5f250e515785e4595b730e Mon Sep 17 00:00:00 2001 From: Crystal Lemire Date: Wed, 16 Oct 2024 17:50:08 -0700 Subject: [PATCH] Add metrics and telemetry to scrubbing (#1262) scrubbing was pushed with no metrics or telemetry. This follow-up PR establishes telemetry on time to scrub streams and process individual stream members, as well as metrics on # pool queue length, # streams scrubbed, entitlement losses and user boots, slicable by space id, channel id, and user id where applicable. --- core/node/rpc/scrub_test.go | 3 + core/node/rpc/server.go | 3 + core/node/scrub/stream_scrub_task.go | 251 ++++++++++++++++++++------- 3 files changed, 193 insertions(+), 64 deletions(-) diff --git a/core/node/rpc/scrub_test.go b/core/node/rpc/scrub_test.go index 62606bb14..805903c47 100644 --- a/core/node/rpc/scrub_test.go +++ b/core/node/rpc/scrub_test.go @@ -294,6 +294,9 @@ func TestScrubStreamTaskProcessor(t *testing.T) { eventAdder, tc.mockChainAuth, service.config, + nil, + nil, + common.Address{}, ) require.NoError(err) diff --git a/core/node/rpc/server.go b/core/node/rpc/server.go index 533d26fdb..9e6be5af1 100644 --- a/core/node/rpc/server.go +++ b/core/node/rpc/server.go @@ -597,6 +597,9 @@ func (s *Service) initScrubbing(ctx context.Context) (err error) { s, s.chainAuth, s.config, + s.metrics, + s.otelTracer, + s.wallet.Address, ) if err != nil { return AsRiverError(err, Err_BAD_CONFIG).Message("Unable to instantiate stream scrub task processor") diff --git a/core/node/scrub/stream_scrub_task.go b/core/node/scrub/stream_scrub_task.go index 3cc9279b3..a2608be3d 100644 --- a/core/node/scrub/stream_scrub_task.go +++ b/core/node/scrub/stream_scrub_task.go @@ -5,12 +5,20 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/gammazero/workerpool" + "github.com/prometheus/client_golang/prometheus" + + "go.opentelemetry.io/otel/attribute" + otelCodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/river-build/river/core/config" "github.com/river-build/river/core/node/auth" + "github.com/river-build/river/core/node/base" "github.com/river-build/river/core/node/dlog" "github.com/river-build/river/core/node/events" + "github.com/river-build/river/core/node/infra" . "github.com/river-build/river/core/node/protocol" . "github.com/river-build/river/core/node/shared" ) @@ -40,6 +48,12 @@ type streamScrubTaskProcessorImpl struct { eventAdder EventAdder chainAuth auth.ChainAuth config *config.Config + tracer trace.Tracer + + streamsScrubbed prometheus.Counter + entitlementLosses prometheus.Counter + userBoots prometheus.Counter + scrubQueueLength prometheus.GaugeFunc } func NewStreamScrubTasksProcessor( @@ -48,6 +62,9 @@ func NewStreamScrubTasksProcessor( eventAdder EventAdder, chainAuth auth.ChainAuth, cfg *config.Config, + metrics infra.MetricsFactory, + tracer trace.Tracer, + nodeAddress common.Address, ) (StreamScrubTaskProcessor, error) { proc := &streamScrubTaskProcessorImpl{ ctx: ctx, @@ -56,15 +73,174 @@ func NewStreamScrubTasksProcessor( eventAdder: eventAdder, chainAuth: chainAuth, config: cfg, + + tracer: tracer, } + + if metrics != nil { + streamsScrubbed := metrics.NewCounterEx( + "streams_scrubbed", + "Number of streams scrubbed", + ) + entitlementLosses := metrics.NewCounterEx( + "entitlement_losses", + "Number of entitlement losses detected", + ) + userBoots := metrics.NewCounterEx( + "user_boots", + "Number of users booted due to stream scrubbing", + ) + scrubQueueLength := metrics.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "scrub_queue_length", + Help: "Number of streams with a pending scrub scheduled", + }, + func() float64 { + return float64(proc.workerPool.WaitingQueueSize()) + }, + ) + proc.scrubQueueLength = scrubQueueLength + proc.streamsScrubbed = streamsScrubbed + proc.entitlementLosses = entitlementLosses + proc.userBoots = userBoots + } + return proc, nil } +// processMember checks the individual member for entitlement and attempts to boot them if +// they no longer meet entitlement requirements. This method returns an error for the sake +// of annotating the telemetry span, but in practice it is not used by the caller. +func (tp *streamScrubTaskProcessorImpl) processMember( + task *streamScrubTask, + ctx context.Context, + member string, +) (err error) { + log := dlog.FromCtx(ctx). + With("Func", "streamScrubTask.processMember"). + With("channelId", task.channelId). + With("spaceId", task.spaceId). + With("userId", member) + + var span trace.Span + + if tp.tracer != nil { + ctx, span = tp.tracer.Start(ctx, "member_scrub") + span.SetAttributes( + attribute.String("spaceId", task.spaceId.String()), + attribute.String("channelId", task.channelId.String()), + attribute.String("userId", member), + ) + defer func() { + span.RecordError(err) + if err != nil { + span.SetStatus(otelCodes.Error, err.Error()) + } else { + span.SetStatus(otelCodes.Ok, "") + } + span.End() + }() + } + + var isEntitled bool + if isEntitled, err = tp.chainAuth.IsEntitled( + ctx, + tp.config, + auth.NewChainAuthArgsForChannel( + task.spaceId, + task.channelId, + member, + auth.PermissionRead, + ), + ); err != nil { + err = base.AsRiverError(err). + Message("unable to evaluate user entitlement"). + Func("StreamScrubTaskProcessor.processMember"). + Tag("user", member). + LogError(log) + return + } + + if span != nil { + span.SetAttributes(attribute.Bool("isEntitled", isEntitled)) + } + + // In the case that the user is not entitled, they must have lost their entitlement + // after joining the channel, so let's go ahead and boot them. + if !isEntitled { + if tp.entitlementLosses != nil { + tp.entitlementLosses.Inc() + } + + var userId []byte + if userId, err = AddressFromUserId(member); err != nil { + err = base.AsRiverError(err). + Message("error converting user id into address"). + Func("StreamScrubTaskProcessor.processMember"). + Tag("user", member). + LogError(log) + return + } + + var userStreamId StreamId + if userStreamId, err = UserStreamIdFromBytes(userId); err != nil { + err = base.AsRiverError(err). + Message("error constructing userid stream from user address"). + Func("StreamScrubTaskProcessor.processMember"). + Tag("userId", userId). + LogError(log) + return + } + + log.Info("Entitlement loss detected; adding LEAVE event for user", + "user", + member, + "userStreamId", + userStreamId, + ) + + if err = tp.eventAdder.AddEventPayload( + ctx, + userStreamId, + events.Make_UserPayload_Membership( + MembershipOp_SO_LEAVE, + task.channelId, + &member, + task.spaceId[:], + ), + ); err != nil { + err = base.AsRiverError(err). + Message("unable to add channel leave event to user stream"). + Func("StreamScrubTaskProcessor.processMember"). + Tag("userStreamId", userStreamId). + LogError(log) + return + } + } + + if tp.userBoots != nil { + tp.userBoots.Inc() + } + + return err +} + func (tp *streamScrubTaskProcessorImpl) processTask(task *streamScrubTask) { log := dlog.FromCtx(tp.ctx). With("Func", "streamScrubTask.process"). With("channelId", task.channelId). With("spaceId", task.spaceId) + var span trace.Span + ctx := tp.ctx + if tp.tracer != nil { + ctx, span = tp.tracer.Start(tp.ctx, "streamScrubTaskProcess.processTask") + span.SetAttributes( + attribute.String("spaceId", task.spaceId.String()), + attribute.String("channelId", task.channelId.String()), + ) + defer span.End() + } + stream, err := tp.cache.GetStream(tp.ctx, task.channelId) if err != nil { log.Error("Unable to get stream from cache", "error", err) @@ -92,70 +268,17 @@ func (tp *streamScrubTaskProcessorImpl) processTask(task *streamScrubTask) { log.Error("Failed to fetch stream members", "error", err) return } + for member := range (*members).Iter() { - isEntitled, err := tp.chainAuth.IsEntitled( - tp.ctx, - tp.config, - auth.NewChainAuthArgsForChannel( - task.spaceId, - task.channelId, - member, - auth.PermissionRead, - ), - ) - if err != nil { - log.Error("Scrubbing error: unable to evaluate user entitlement", - "user", - member, - "error", - err, - ) - continue - } - // In the case that the user is not entitled, they must have lost their entitlement - // after joining the channel, so let's go ahead and boot them. - if !isEntitled { - userId, err := AddressFromUserId(member) - if err != nil { - log.Error("Error converting user id into address", "member", member, "error", err) - continue - } - userStreamId, err := UserStreamIdFromBytes(userId) - if err != nil { - log.Error( - "Error constructing user id stream from user address", - "userAddress", - userId, - "error", - err, - ) - } - log.Info("Entitlement loss detected; adding LEAVE event for user", - "user", - member, - "userStreamId", - userStreamId, - ) - err = tp.eventAdder.AddEventPayload( - tp.ctx, - userStreamId, - events.Make_UserPayload_Membership( - MembershipOp_SO_LEAVE, - task.channelId, - &member, - task.spaceId[:], - ), - ) - if err != nil { - log.Error( - "scrub error: unable to add channel leave event to user stream", - "userStreamId", - userStreamId, - "error", - err, - ) - } - } + _ = tp.processMember(task, ctx, member) + } + + if span != nil { + span.SetStatus(otelCodes.Ok, "") + } + + if tp.streamsScrubbed != nil { + tp.streamsScrubbed.Inc() } } @@ -209,7 +332,7 @@ func (tp *streamScrubTaskProcessorImpl) TryScheduleScrub( task := &streamScrubTask{channelId: *streamId, spaceId: *view.StreamParentId(), taskProcessor: tp} _, alreadyScheduled := tp.pendingTasks.LoadOrStore(streamId, task) if !alreadyScheduled { - log.Info("Scheduling scrub for stream", "lastScrubbedTime", stream.LastScrubbedTime()) + log.Debug("Scheduling scrub for stream", "lastScrubbedTime", stream.LastScrubbedTime()) tp.workerPool.Submit(func() { task.process() tp.pendingTasks.Delete(task.channelId)