Skip to content

Commit

Permalink
[CONTROLLER/PROMETHEUS] supports updating label index
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya authored and ZhengYa-0110 committed Aug 30, 2023
1 parent b3bd039 commit 77b4bef
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 74 deletions.
9 changes: 8 additions & 1 deletion server/controller/controller/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func checkAndStartMasterFunctions(
// - 控制器和数据节点检查
// - license分配和检查
// - resource id manager
// - clean deleted resources
// - clean deleted/dirty resource data
// - prometheus encoder
// - prometheus app label layout updater
// - http resource refresh task manager

// 从区域控制器无需判断是否为master controller
if !IsMasterRegion(cfg) {
Expand All @@ -95,6 +98,8 @@ func checkAndStartMasterFunctions(
recorderResource := recorder.GetSingletonResource()
domainChecker := resoureservice.NewDomainCheck(ctx)
prometheus := prometheus.GetSingleton()
prometheus.APPLabelLayoutUpdater.Init(ctx, &cfg.PrometheusCfg)

httpService := http.GetSingleton()

masterController := ""
Expand Down Expand Up @@ -146,6 +151,7 @@ func checkAndStartMasterFunctions(
domainChecker.Start()

prometheus.Encoder.Start()
prometheus.APPLabelLayoutUpdater.Start()

if cfg.DFWebService.Enabled {
httpService.TaskManager.Start(ctx, cfg.FPermit, cfg.RedisCfg)
Expand Down Expand Up @@ -176,6 +182,7 @@ func checkAndStartMasterFunctions(
recorderResource.IDManager.Stop()

prometheus.Encoder.Stop()
prometheus.APPLabelLayoutUpdater.Stop()

if cfg.DFWebService.Enabled {
httpService.TaskManager.Stop()
Expand Down
4 changes: 4 additions & 0 deletions server/controller/prometheus/cache/metric_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (ml *metricLabel) Add(batch []MetricLabelDetailKey) {
}
}

func (ml *metricLabel) GetMetricLabelDetailKeys() mapset.Set[MetricLabelDetailKey] {
return ml.metricLabelDetailKeys
}

func (ml *metricLabel) refresh(args ...interface{}) error {
metricLabels, err := ml.load()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions server/controller/prometheus/cache/metric_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (mt *metricTarget) IfLabelIsTargetType(mn, ln string) bool {
return false
}

func (mt *metricTarget) GetMetricNameToTargetIDs() map[string]mapset.Set[int] {
return mt.metricNameToTargetIDs.Get()
}

func (mt *metricTarget) refresh(args ...interface{}) error {
mts, err := mt.load()
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion server/controller/prometheus/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (t *target) Add(batch []*controller.PrometheusTarget) {
}
}

func (t *target) GetTargetIDToLabelNames() map[int]mapset.Set[string] {
return t.targetIDToLabelNames.Get()
}

func (t *target) refresh(args ...interface{}) error {
recorderTargets, selfTargets, err := t.load()
if err != nil {
Expand Down Expand Up @@ -212,5 +216,5 @@ func (t *target) load() (recorderTargets, selfTargets []*mysql.PrometheusTarget,
}

func (t *target) dedup(ids []int) error {
return mysql.Db.Where("id in (?)", ids).Delete(&mysql.PrometheusTarget{}).Error // TODO 强删?
return mysql.Db.Where("id in (?)", ids).Delete(&mysql.PrometheusTarget{}).Error
}
1 change: 1 addition & 0 deletions server/controller/prometheus/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ type Config struct {
EncoderCacheRefreshInterval int `default:"3600" yaml:"encoder_cache_refresh_interval"`
ResourceMaxID0 int `default:"64000" yaml:"resource_max_id_0"`
ResourceMaxID1 int `default:"499999" yaml:"resource_max_id_1"`
APPLabelIndexMax int `default:"256" yaml:"app_label_index"`
}
14 changes: 9 additions & 5 deletions server/controller/prometheus/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Encoder struct {
metricName *metricName
labelName *labelName
labelValue *labelValue
labelLayout *labelLayout
LabelLayout *labelLayout
label *label
metricLabel *metricLabel
metricTarget *metricTarget
Expand All @@ -70,7 +70,7 @@ func (e *Encoder) Init(ctx context.Context, cfg *prometheuscfg.Config) {
e.labelName = newLabelName(cfg.ResourceMaxID0)
e.labelValue = newLabelValue(cfg.ResourceMaxID1)
e.label = newLabel()
e.labelLayout = newLabelLayout()
e.LabelLayout = newLabelLayout(cfg)
e.metricLabel = newMetricLabel(e.label)
e.target = newTarget(cfg.ResourceMaxID1)
e.metricTarget = newMetricTarget(e.target)
Expand All @@ -90,6 +90,7 @@ func (e *Encoder) Start() error {
e.refresh()
go func() {
ticker := time.NewTicker(e.refreshInterval)
defer ticker.Stop()
for {
select {
case <-e.ctx.Done():
Expand All @@ -113,16 +114,19 @@ func (e *Encoder) Stop() {
}

func (e *Encoder) refresh() error {
log.Info("prometheus encoder refresh started")
e.label.refresh()
eg := &errgroup.Group{}
AppendErrGroup(eg, e.metricName.refresh)
AppendErrGroup(eg, e.labelName.refresh)
AppendErrGroup(eg, e.labelValue.refresh)
AppendErrGroup(eg, e.labelLayout.refresh)
AppendErrGroup(eg, e.LabelLayout.refresh)
AppendErrGroup(eg, e.metricLabel.refresh)
AppendErrGroup(eg, e.metricTarget.refresh)
AppendErrGroup(eg, e.target.refresh)
return eg.Wait()
err := eg.Wait()
log.Info("prometheus encoder refresh completed")
return err
}

func (e *Encoder) Encode(req *controller.SyncPrometheusRequest) (*controller.SyncPrometheusResponse, error) {
Expand Down Expand Up @@ -181,7 +185,7 @@ func (e *Encoder) encodeLabelValue(args ...interface{}) error {
func (e *Encoder) encodeLabelIndex(args ...interface{}) error {
resp := args[0].(*controller.SyncPrometheusResponse)
layouts := args[1].([]*controller.PrometheusMetricAPPLabelLayoutRequest)
lis, err := e.labelLayout.encode(layouts)
lis, err := e.LabelLayout.encode(layouts)
if err != nil {
return err
}
Expand Down
20 changes: 13 additions & 7 deletions server/controller/prometheus/encoder/id_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type sorter interface {
getUsedIDSet(sortedUsableIDs []int, inUseIDSet mapset.Set[int]) mapset.Set[int]
}

type rawDataProvider interface {
load() (mapset.Set[int], error)
check([]int) ([]int, error)
}

type idAllocator struct {
resourceType string
min int
Expand Down Expand Up @@ -73,14 +78,20 @@ func (ia *idAllocator) allocate(count int) (ids []int, err error) {
return
}

func (p *idAllocator) recycle(ids []int) {
p.sorter.sort(ids)
p.usableIDs = append(p.usableIDs, ids...)
log.Infof("recycle %s ids: %v", p.resourceType, ids)
}

func (ia *idAllocator) refresh() error {
log.Infof("refresh %s id pools started", ia.resourceType)
log.Debugf("refresh %s id pools started", ia.resourceType)
inUseIDSet, err := ia.rawDataProvider.load()
if err != nil {
return err
}
ia.usableIDs = ia.getSortedUsableIDs(ia.getAllIDSet(), inUseIDSet)
log.Infof("refresh %s id pools (usable ids count: %d) completed", ia.resourceType, len(ia.usableIDs))
log.Debugf("refresh %s id pools (usable ids count: %d) completed", ia.resourceType, len(ia.usableIDs))
return nil
}

Expand Down Expand Up @@ -155,11 +166,6 @@ func (ia *ascIDAllocator) sort(ints []int) {
sort.Ints(ints)
}

type rawDataProvider interface {
load() (mapset.Set[int], error)
check([]int) ([]int, error)
}

type descIDAllocator struct {
idAllocator
}
Expand Down
Loading

0 comments on commit 77b4bef

Please sign in to comment.