Skip to content

Commit

Permalink
Metrics for shared channels (mattermost#26199)
Browse files Browse the repository at this point in the history
* add metrics definitions for shared channels
  • Loading branch information
wiggin77 authored Feb 21, 2024
1 parent f90b3d4 commit 38bbf04
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 8 deletions.
8 changes: 8 additions & 0 deletions server/einterfaces/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ type MetricsInterface interface {
ObserveRemoteClusterClockSkew(remoteID string, skew float64)
IncrementRemoteClusterConnStateChangeCounter(remoteID string, online bool)

IncrementSharedChannelsSyncCounter(remoteID string)
ObserveSharedChannelsTaskInQueueDuration(elapsed float64)
ObserveSharedChannelsQueueSize(size int64)
ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64)
ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64)
ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64)
ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64)

IncrementJobActive(jobType string)
DecrementJobActive(jobType string)

Expand Down
35 changes: 35 additions & 0 deletions server/einterfaces/mocks/MetricsInterface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 134 additions & 1 deletion server/enterprise/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
MetricsSubsystemSearch = "search"
MetricsSubsystemLogging = "logging"
MetricsSubsystemRemoteCluster = "remote_cluster"
MetricsSubsystemSharedChannels = "shared_channels"
MetricsSubsystemSystem = "system"
MetricsSubsystemJobs = "jobs"
MetricsCloudInstallationLabel = "installationId"
Expand Down Expand Up @@ -189,6 +190,14 @@ type MetricsInterfaceImpl struct {
RemoteClusterClockSkewHistograms *prometheus.HistogramVec
RemoteClusterConnStateChangeCounter *prometheus.CounterVec

SharedChannelsSyncCount *prometheus.CounterVec
SharedChannelsTaskInQueueHistogram prometheus.Histogram
SharedChannelsQueueSize prometheus.Gauge
SharedChannelsSyncCollectionHistogram *prometheus.HistogramVec
SharedChannelsSyncSendHistogram *prometheus.HistogramVec
SharedChannelsSyncCollectionStepHistogram *prometheus.HistogramVec
SharedChannelsSyncSendStepHistogram *prometheus.HistogramVec

ServerStartTime prometheus.Gauge

JobsActive *prometheus.GaugeVec
Expand Down Expand Up @@ -853,7 +862,7 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf
)
m.Registry.MustRegister(m.LoggerBlockedCounters.counter)

// Remote Cluster subsystem
// Remote Cluster service

m.RemoteClusterMsgSentCounters = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -927,6 +936,90 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf
)
m.Registry.MustRegister(m.RemoteClusterConnStateChangeCounter)

// Shared Channel service

m.SharedChannelsSyncCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_count",
Help: "Count of sync events processed for each remote",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCount)

m.SharedChannelsTaskInQueueHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "task_in_queue_duration_seconds",
Help: "Duration tasks spend in queue (seconds)",
ConstLabels: additionalLabels,
},
)
m.Registry.MustRegister(m.SharedChannelsTaskInQueueHistogram)

m.SharedChannelsQueueSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "task_queue_size",
Help: "Current number of tasks in queue",
ConstLabels: additionalLabels,
},
)
m.Registry.MustRegister(m.SharedChannelsQueueSize)

m.SharedChannelsSyncCollectionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_collection_duration_seconds",
Help: "Duration tasks spend collecting sync data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCollectionHistogram)

m.SharedChannelsSyncSendHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_send_duration_seconds",
Help: "Duration tasks spend sending sync data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncSendHistogram)

m.SharedChannelsSyncCollectionStepHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_collection_step_duration_seconds",
Help: "Duration tasks spend in each step collecting data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id", "step"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCollectionStepHistogram)

m.SharedChannelsSyncSendStepHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_send_step_duration_seconds",
Help: "Duration tasks spend in each step sending data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id", "step"},
)
m.Registry.MustRegister(m.SharedChannelsSyncSendStepHistogram)

m.ServerStartTime = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSystem,
Expand Down Expand Up @@ -1324,6 +1417,46 @@ func (mi *MetricsInterfaceImpl) IncrementRemoteClusterConnStateChangeCounter(rem
}).Inc()
}

func (mi *MetricsInterfaceImpl) IncrementSharedChannelsSyncCounter(remoteID string) {
mi.SharedChannelsSyncCount.With(prometheus.Labels{
"remote_id": remoteID,
}).Inc()
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsTaskInQueueDuration(elapsed float64) {
mi.SharedChannelsTaskInQueueHistogram.Observe(elapsed)
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsQueueSize(size int64) {
mi.SharedChannelsQueueSize.Set(float64(size))
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) {
mi.SharedChannelsSyncCollectionHistogram.With(prometheus.Labels{
"remote_id": remoteID,
}).Observe(elapsed)
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) {
mi.SharedChannelsSyncSendHistogram.With(prometheus.Labels{
"remote_id": remoteID,
}).Observe(elapsed)
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) {
mi.SharedChannelsSyncCollectionStepHistogram.With(prometheus.Labels{
"remote_id": remoteID,
"step": step,
}).Observe(elapsed)
}

func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) {
mi.SharedChannelsSyncSendStepHistogram.With(prometheus.Labels{
"remote_id": remoteID,
"step": step,
}).Observe(elapsed)
}

// SetReplicaLagAbsolute sets the absolute replica lag for a given node.
func (mi *MetricsInterfaceImpl) SetReplicaLagAbsolute(node string, value float64) {
mi.DbReplicaLagGaugeAbs.With(prometheus.Labels{"node": node}).Set(value)
Expand Down
7 changes: 7 additions & 0 deletions server/platform/services/remotecluster/sendfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (rcs *Service) sendFile(task sendFileTask) {
}

func (rcs *Service) sendFileToRemote(timeout time.Duration, task sendFileTask) (*model.FileInfo, error) {
start := time.Now()
defer func() {
if metrics := rcs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "Attachments", time.Since(start).Seconds())
}
}()

rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending file to remote...",
mlog.String("remote", task.rc.DisplayName),
mlog.String("uploadId", task.us.Id),
Expand Down
7 changes: 7 additions & 0 deletions server/platform/services/remotecluster/sendprofileImage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (rcs *Service) sendProfileImage(task sendProfileImageTask) {
}

func (rcs *Service) sendProfileImageToRemote(timeout time.Duration, task sendProfileImageTask) error {
start := time.Now()
defer func() {
if metrics := rcs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "ProfileImages", time.Since(start).Seconds())
}
}()

rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending profile image to remote...",
mlog.String("remote", task.rc.DisplayName),
mlog.String("UserId", task.userID),
Expand Down
18 changes: 18 additions & 0 deletions server/platform/services/sharedchannel/mock_ServerIface_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/platform/services/sharedchannel/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/einterfaces"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
)
Expand Down Expand Up @@ -42,6 +43,7 @@ type ServerIface interface {
GetStore() store.Store
Log() *mlog.Logger
GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace
GetMetrics() einterfaces.MetricsInterface
}

type PlatformIface interface {
Expand Down
31 changes: 25 additions & 6 deletions server/platform/services/sharedchannel/sync_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,25 @@ func (scs *Service) doSync() time.Duration {
var task syncTask
var ok bool
var shortestWait time.Duration
metrics := scs.server.GetMetrics()

if metrics != nil {
scs.mux.Lock()
size := len(scs.tasks)
scs.mux.Unlock()
metrics.ObserveSharedChannelsQueueSize(int64(size))
}

for {
task, ok, shortestWait = scs.removeOldestTask()
if !ok {
break
}

if metrics != nil {
metrics.ObserveSharedChannelsTaskInQueueDuration(time.Since(task.AddedAt).Seconds())
}

if err := scs.processTask(task); err != nil {
// put task back into map so it will update again
if task.incRetry() {
Expand Down Expand Up @@ -236,18 +249,22 @@ func (scs *Service) removeOldestTask() (syncTask, bool, time.Duration) {

// processTask updates one or more remote clusters with any new channel content.
func (scs *Service) processTask(task syncTask) error {
var err error
var remotes []*model.RemoteCluster
// map is used to ensure remotes don't get sync'd twice, such as when
// they have the autoinvited flag and have explicitly subscribed to a channel.
remotesMap := make(map[string]*model.RemoteCluster)

if task.remoteID == "" {
filter := model.RemoteClusterQueryFilter{
InChannel: task.channelID,
OnlyConfirmed: true,
}
remotes, err = scs.server.GetStore().RemoteCluster().GetAll(filter)
remotes, err := scs.server.GetStore().RemoteCluster().GetAll(filter)
if err != nil {
return err
}
for _, r := range remotes {
remotesMap[r.RemoteId] = r
}

// add all remotes that have the autoinvited option.
filter = model.RemoteClusterQueryFilter{
Expand All @@ -257,7 +274,9 @@ func (scs *Service) processTask(task syncTask) error {
if err != nil {
return err
}
remotes = append(remotes, remotesAutoInvited...)
for _, r := range remotesAutoInvited {
remotesMap[r.RemoteId] = r
}
} else {
rc, err := scs.server.GetStore().RemoteCluster().Get(task.remoteID)
if err != nil {
Expand All @@ -266,10 +285,10 @@ func (scs *Service) processTask(task syncTask) error {
if !rc.IsOnline() {
return fmt.Errorf("Failed updating shared channel '%s' for offline remote cluster '%s'", task.channelID, rc.DisplayName)
}
remotes = []*model.RemoteCluster{rc}
remotesMap[rc.RemoteId] = rc
}

for _, rc := range remotes {
for _, rc := range remotesMap {
rtask := task
rtask.remoteID = rc.RemoteId
if err := scs.syncForRemote(rtask, rc); err != nil {
Expand Down
Loading

0 comments on commit 38bbf04

Please sign in to comment.