Skip to content

Commit

Permalink
Add metrics and telemetry to scrubbing (#1262)
Browse files Browse the repository at this point in the history
scrubbing was pushed with no metrics or telemetry. This follow-up PR
establishes telemetry on time to scrub streams and process individual
stream members, as well as metrics on # pool queue length, # streams
scrubbed, entitlement losses and user boots, slicable by space id,
channel id, and user id where applicable.
  • Loading branch information
clemire authored Oct 17, 2024
1 parent 1282e53 commit 82bf439
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 64 deletions.
3 changes: 3 additions & 0 deletions core/node/rpc/scrub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ func TestScrubStreamTaskProcessor(t *testing.T) {
eventAdder,
tc.mockChainAuth,
service.config,
nil,
nil,
common.Address{},
)
require.NoError(err)

Expand Down
3 changes: 3 additions & 0 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ func (s *Service) initScrubbing(ctx context.Context) (err error) {
s,
s.chainAuth,
s.config,
s.metrics,
s.otelTracer,
s.wallet.Address,
)
if err != nil {
return AsRiverError(err, Err_BAD_CONFIG).Message("Unable to instantiate stream scrub task processor")
Expand Down
251 changes: 187 additions & 64 deletions core/node/scrub/stream_scrub_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/gammazero/workerpool"
"github.com/prometheus/client_golang/prometheus"

"go.opentelemetry.io/otel/attribute"
otelCodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
"github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/events"
"github.com/river-build/river/core/node/infra"
. "github.com/river-build/river/core/node/protocol"
. "github.com/river-build/river/core/node/shared"
)
Expand Down Expand Up @@ -40,6 +48,12 @@ type streamScrubTaskProcessorImpl struct {
eventAdder EventAdder
chainAuth auth.ChainAuth
config *config.Config
tracer trace.Tracer

streamsScrubbed prometheus.Counter
entitlementLosses prometheus.Counter
userBoots prometheus.Counter
scrubQueueLength prometheus.GaugeFunc
}

func NewStreamScrubTasksProcessor(
Expand All @@ -48,6 +62,9 @@ func NewStreamScrubTasksProcessor(
eventAdder EventAdder,
chainAuth auth.ChainAuth,
cfg *config.Config,
metrics infra.MetricsFactory,
tracer trace.Tracer,
nodeAddress common.Address,
) (StreamScrubTaskProcessor, error) {
proc := &streamScrubTaskProcessorImpl{
ctx: ctx,
Expand All @@ -56,15 +73,174 @@ func NewStreamScrubTasksProcessor(
eventAdder: eventAdder,
chainAuth: chainAuth,
config: cfg,

tracer: tracer,
}

if metrics != nil {
streamsScrubbed := metrics.NewCounterEx(
"streams_scrubbed",
"Number of streams scrubbed",
)
entitlementLosses := metrics.NewCounterEx(
"entitlement_losses",
"Number of entitlement losses detected",
)
userBoots := metrics.NewCounterEx(
"user_boots",
"Number of users booted due to stream scrubbing",
)
scrubQueueLength := metrics.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "scrub_queue_length",
Help: "Number of streams with a pending scrub scheduled",
},
func() float64 {
return float64(proc.workerPool.WaitingQueueSize())
},
)
proc.scrubQueueLength = scrubQueueLength
proc.streamsScrubbed = streamsScrubbed
proc.entitlementLosses = entitlementLosses
proc.userBoots = userBoots
}

return proc, nil
}

// processMember checks the individual member for entitlement and attempts to boot them if
// they no longer meet entitlement requirements. This method returns an error for the sake
// of annotating the telemetry span, but in practice it is not used by the caller.
func (tp *streamScrubTaskProcessorImpl) processMember(
task *streamScrubTask,
ctx context.Context,
member string,
) (err error) {
log := dlog.FromCtx(ctx).
With("Func", "streamScrubTask.processMember").
With("channelId", task.channelId).
With("spaceId", task.spaceId).
With("userId", member)

var span trace.Span

if tp.tracer != nil {
ctx, span = tp.tracer.Start(ctx, "member_scrub")
span.SetAttributes(
attribute.String("spaceId", task.spaceId.String()),
attribute.String("channelId", task.channelId.String()),
attribute.String("userId", member),
)
defer func() {
span.RecordError(err)
if err != nil {
span.SetStatus(otelCodes.Error, err.Error())
} else {
span.SetStatus(otelCodes.Ok, "")
}
span.End()
}()
}

var isEntitled bool
if isEntitled, err = tp.chainAuth.IsEntitled(
ctx,
tp.config,
auth.NewChainAuthArgsForChannel(
task.spaceId,
task.channelId,
member,
auth.PermissionRead,
),
); err != nil {
err = base.AsRiverError(err).
Message("unable to evaluate user entitlement").
Func("StreamScrubTaskProcessor.processMember").
Tag("user", member).
LogError(log)
return
}

if span != nil {
span.SetAttributes(attribute.Bool("isEntitled", isEntitled))
}

// In the case that the user is not entitled, they must have lost their entitlement
// after joining the channel, so let's go ahead and boot them.
if !isEntitled {
if tp.entitlementLosses != nil {
tp.entitlementLosses.Inc()
}

var userId []byte
if userId, err = AddressFromUserId(member); err != nil {
err = base.AsRiverError(err).
Message("error converting user id into address").
Func("StreamScrubTaskProcessor.processMember").
Tag("user", member).
LogError(log)
return
}

var userStreamId StreamId
if userStreamId, err = UserStreamIdFromBytes(userId); err != nil {
err = base.AsRiverError(err).
Message("error constructing userid stream from user address").
Func("StreamScrubTaskProcessor.processMember").
Tag("userId", userId).
LogError(log)
return
}

log.Info("Entitlement loss detected; adding LEAVE event for user",
"user",
member,
"userStreamId",
userStreamId,
)

if err = tp.eventAdder.AddEventPayload(
ctx,
userStreamId,
events.Make_UserPayload_Membership(
MembershipOp_SO_LEAVE,
task.channelId,
&member,
task.spaceId[:],
),
); err != nil {
err = base.AsRiverError(err).
Message("unable to add channel leave event to user stream").
Func("StreamScrubTaskProcessor.processMember").
Tag("userStreamId", userStreamId).
LogError(log)
return
}
}

if tp.userBoots != nil {
tp.userBoots.Inc()
}

return err
}

func (tp *streamScrubTaskProcessorImpl) processTask(task *streamScrubTask) {
log := dlog.FromCtx(tp.ctx).
With("Func", "streamScrubTask.process").
With("channelId", task.channelId).
With("spaceId", task.spaceId)
var span trace.Span
ctx := tp.ctx
if tp.tracer != nil {
ctx, span = tp.tracer.Start(tp.ctx, "streamScrubTaskProcess.processTask")
span.SetAttributes(
attribute.String("spaceId", task.spaceId.String()),
attribute.String("channelId", task.channelId.String()),
)
defer span.End()
}

stream, err := tp.cache.GetStream(tp.ctx, task.channelId)
if err != nil {
log.Error("Unable to get stream from cache", "error", err)
Expand Down Expand Up @@ -92,70 +268,17 @@ func (tp *streamScrubTaskProcessorImpl) processTask(task *streamScrubTask) {
log.Error("Failed to fetch stream members", "error", err)
return
}

for member := range (*members).Iter() {
isEntitled, err := tp.chainAuth.IsEntitled(
tp.ctx,
tp.config,
auth.NewChainAuthArgsForChannel(
task.spaceId,
task.channelId,
member,
auth.PermissionRead,
),
)
if err != nil {
log.Error("Scrubbing error: unable to evaluate user entitlement",
"user",
member,
"error",
err,
)
continue
}
// In the case that the user is not entitled, they must have lost their entitlement
// after joining the channel, so let's go ahead and boot them.
if !isEntitled {
userId, err := AddressFromUserId(member)
if err != nil {
log.Error("Error converting user id into address", "member", member, "error", err)
continue
}
userStreamId, err := UserStreamIdFromBytes(userId)
if err != nil {
log.Error(
"Error constructing user id stream from user address",
"userAddress",
userId,
"error",
err,
)
}
log.Info("Entitlement loss detected; adding LEAVE event for user",
"user",
member,
"userStreamId",
userStreamId,
)
err = tp.eventAdder.AddEventPayload(
tp.ctx,
userStreamId,
events.Make_UserPayload_Membership(
MembershipOp_SO_LEAVE,
task.channelId,
&member,
task.spaceId[:],
),
)
if err != nil {
log.Error(
"scrub error: unable to add channel leave event to user stream",
"userStreamId",
userStreamId,
"error",
err,
)
}
}
_ = tp.processMember(task, ctx, member)
}

if span != nil {
span.SetStatus(otelCodes.Ok, "")
}

if tp.streamsScrubbed != nil {
tp.streamsScrubbed.Inc()
}
}

Expand Down Expand Up @@ -209,7 +332,7 @@ func (tp *streamScrubTaskProcessorImpl) TryScheduleScrub(
task := &streamScrubTask{channelId: *streamId, spaceId: *view.StreamParentId(), taskProcessor: tp}
_, alreadyScheduled := tp.pendingTasks.LoadOrStore(streamId, task)
if !alreadyScheduled {
log.Info("Scheduling scrub for stream", "lastScrubbedTime", stream.LastScrubbedTime())
log.Debug("Scheduling scrub for stream", "lastScrubbedTime", stream.LastScrubbedTime())
tp.workerPool.Submit(func() {
task.process()
tp.pendingTasks.Delete(task.channelId)
Expand Down

0 comments on commit 82bf439

Please sign in to comment.