diff --git a/server/controller/manager/task.go b/server/controller/manager/task.go index fe7995da120..aedc23c9c3d 100644 --- a/server/controller/manager/task.go +++ b/server/controller/manager/task.go @@ -155,6 +155,7 @@ func (t *Task) startSubDomainRefreshMonitor() { } func (t *Task) Stop() { + t.Recorder.Stop() t.Cloud.Stop() if t.tCancel != nil { t.tCancel() diff --git a/server/controller/recorder/cleaner.go b/server/controller/recorder/cleaner/cleaner.go similarity index 74% rename from server/controller/recorder/cleaner.go rename to server/controller/recorder/cleaner/cleaner.go index 56040d72eff..1aba9b5db20 100644 --- a/server/controller/recorder/cleaner.go +++ b/server/controller/recorder/cleaner/cleaner.go @@ -14,12 +14,10 @@ * limitations under the License. */ -// 永久删除MySQL中超过7天的软删除云平台资源数据 -package recorder +package cleaner import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -31,12 +29,11 @@ import ( mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" "github.com/deepflowio/deepflow/server/controller/recorder/common" "github.com/deepflowio/deepflow/server/controller/recorder/config" - "github.com/deepflowio/deepflow/server/controller/recorder/constraint" - "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" - "github.com/deepflowio/deepflow/server/controller/tagrecorder" - "github.com/deepflowio/deepflow/server/libs/stats" + "github.com/deepflowio/deepflow/server/libs/logger" ) +var log = logger.MustGetLogger("recorder.cleaner") + var ( cleanersOnce sync.Once cleaners *Cleaners @@ -594,229 +591,3 @@ func (c *Cleaner) cleanVInterfaceDirty(domainLcuuid string) { } } } - -const ( - tagTypeDeviceIPConn = "device_ip_connection" - tagTypeCHostPodNodeConn = "chost_pod_node_connection" -) - -type domainStatsd struct { - org *common.ORG - lcuuid string - name string - teamID int - - deviceIPConn *CleanerCounter - chostPodNodeConn *CleanerCounter -} - -func newDomainStatsd(org *common.ORG, domain *mysqlmodel.Domain) *domainStatsd { - return &domainStatsd{ - org: org, - lcuuid: domain.Lcuuid, - name: domain.Name, - teamID: domain.TeamID, - - deviceIPConn: newCleanerCounter(), - chostPodNodeConn: newCleanerCounter(), - } -} - -func (d *domainStatsd) close() { - log.Info("close cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) - d.deviceIPConn.Closed() - d.chostPodNodeConn.Closed() -} - -func (d *domainStatsd) get(tagType string) *CleanerCounter { - switch tagType { - case tagTypeDeviceIPConn: - return d.deviceIPConn - case tagTypeCHostPodNodeConn: - return d.chostPodNodeConn - } - return nil -} - -func (d *domainStatsd) start() { - log.Infof("start cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) - err := stats.RegisterCountableWithModulePrefix( - "controller_", - "resource_relation_exception", - d.deviceIPConn, - stats.OptionStatTags{ - "tenant_org_id": fmt.Sprintf("%d", d.org.ID), - "tenant_team_id": fmt.Sprintf("%d", d.teamID), - "domain": d.name, - "type": tagTypeDeviceIPConn, - }, - ) - if err != nil { - log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) device_ip_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) - } - - err = stats.RegisterCountableWithModulePrefix( - "controller_", - "resource_relation_exception", - d.chostPodNodeConn, - stats.OptionStatTags{ - "tenant_org_id": fmt.Sprintf("%d", d.org.ID), - "tenant_team_id": fmt.Sprintf("%d", d.teamID), - "domain": d.name, - "type": tagTypeCHostPodNodeConn, - }, - ) - if err != nil { - log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) chost_pod_node_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) - } -} - -type TmpCounter struct { - Count uint64 `statsd:"count"` -} - -func (c *TmpCounter) Fill(count int) { - atomic.AddUint64(&c.Count, uint64(count)) -} - -type CleanerCounter struct { - *TmpCounter -} - -func newCleanerCounter() *CleanerCounter { - return &CleanerCounter{ - TmpCounter: &TmpCounter{}, - } -} - -func (c *CleanerCounter) GetCounter() interface{} { - counter := &TmpCounter{} - counter, c.TmpCounter = c.TmpCounter, counter - if counter.Count != 0 { - log.Infof("cleaner counter count: %d", counter.Count) - } - return counter -} - -func (c *CleanerCounter) Closed() bool { - return false -} - -func WhereFindPtr[T any](db *mysql.DB, query interface{}, args ...interface{}) ([]*T, error) { - var result []*T - err := db.Where(query, args...).Find(&result).Error - return result, err -} - -func formatLogDeleteABecauseBHasGone[MT constraint.MySQLModel](a, b string, items []*MT) string { - var str string - for _, item := range items { - str += fmt.Sprintf("%+v ", item) - } - return fmt.Sprintf("%s: %+v because %s has gone", common.LogDelete(a), str, b) -} - -func deleteExpired[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time) []*MT { - var dbItems []*MT - err := db.Unscoped().Where("deleted_at < ?", expiredAt).Find(&dbItems).Error - if err != nil { - log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID) - return nil - } - if len(dbItems) == 0 { - return nil - } - if err := db.Unscoped().Delete(&dbItems).Error; err != nil { - log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID) - return nil - } - return dbItems -} - -func getIDs[MT constraint.MySQLModel](db *mysql.DB, domainLcuuid string) (ids []int) { - var dbItems []*MT - db.Where("domain = ?", domainLcuuid).Select("id").Find(&dbItems) - for _, item := range dbItems { - ids = append(ids, (*item).GetID()) - } - return -} - -func deleteAndPublish[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time, resourceType string, toolData *toolData) { - dbItems := deleteExpired[MT](db, expiredAt) - publishTagrecorder(db, dbItems, resourceType, toolData) - log.Infof("clean %s completed: %d", resourceType, len(dbItems), db.LogPrefixORGID) -} - -func publishTagrecorder[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, dbItems []*MT, resourceType string, toolData *toolData) { - msgMetadataToDBItems := make(map[*message.Metadata][]*MT) - for _, item := range dbItems { - var msgMetadata *message.Metadata - if (*item).GetSubDomainLcuuid() != "" { - msgMetadata = toolData.subDomainLcuuidToMsgMetadata[(*item).GetSubDomainLcuuid()] - } else { - msgMetadata = toolData.domainLcuuidToMsgMetadata[(*item).GetDomainLcuuid()] - } - if msgMetadata == nil { - log.Errorf("failed to get metadata for %s: %#v", resourceType, item, db.LogPrefixORGID) - continue - } - msgMetadataToDBItems[msgMetadata] = append(msgMetadataToDBItems[msgMetadata], item) - } - if len(msgMetadataToDBItems) == 0 { - return - } - for _, sub := range tagrecorder.GetSubscriberManager().GetSubscribers(resourceType) { - for msgMetadata, dbItems := range msgMetadataToDBItems { - sub.OnResourceBatchDeleted(msgMetadata, dbItems) - } - } -} - -type toolData struct { - mux sync.Mutex - - domainLcuuidToMsgMetadata map[string]*message.Metadata - subDomainLcuuidToMsgMetadata map[string]*message.Metadata -} - -func newToolData() *toolData { - return &toolData{ - domainLcuuidToMsgMetadata: make(map[string]*message.Metadata), - subDomainLcuuidToMsgMetadata: make(map[string]*message.Metadata), - } -} - -func (t *toolData) clean() { - t.domainLcuuidToMsgMetadata = make(map[string]*message.Metadata) - t.subDomainLcuuidToMsgMetadata = make(map[string]*message.Metadata) -} - -func (t *toolData) load(db *mysql.DB) error { - t.mux.Lock() - defer t.mux.Unlock() - - t.clean() - - var domains []*mysqlmodel.Domain - if err := db.Find(&domains).Error; err != nil { - log.Errorf("failed to get domain: %s", err.Error(), db.LogPrefixORGID) - return err - } - domainLcuuidToID := make(map[string]int) - for _, domain := range domains { - domainLcuuidToID[domain.Lcuuid] = domain.ID - t.domainLcuuidToMsgMetadata[domain.Lcuuid] = message.NewMetadata(db.ORGID, message.MetadataTeamID(domain.TeamID), message.MetadataDomainID(domain.ID)) - } - var subDomains []*mysqlmodel.SubDomain - if err := db.Find(&subDomains).Error; err != nil { - log.Errorf("failed to get sub_domain: %s", err.Error(), db.LogPrefixORGID) - return err - } - for _, subDomain := range subDomains { - t.subDomainLcuuidToMsgMetadata[subDomain.Lcuuid] = message.NewMetadata( - db.ORGID, message.MetadataTeamID(subDomain.TeamID), message.MetadataDomainID(domainLcuuidToID[subDomain.Domain]), message.MetadataSubDomainID(subDomain.ID), - ) - } - return nil -} diff --git a/server/controller/recorder/cleaner/statsd.go b/server/controller/recorder/cleaner/statsd.go new file mode 100644 index 00000000000..4bd07bf2baf --- /dev/null +++ b/server/controller/recorder/cleaner/statsd.go @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cleaner + +import ( + "fmt" + "sync/atomic" + + mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" + "github.com/deepflowio/deepflow/server/controller/recorder/common" + "github.com/deepflowio/deepflow/server/libs/stats" +) + +const ( + tagTypeDeviceIPConn = "device_ip_connection" + tagTypeCHostPodNodeConn = "chost_pod_node_connection" +) + +type domainStatsd struct { + org *common.ORG + lcuuid string + name string + teamID int + + deviceIPConn *CleanerCounter + chostPodNodeConn *CleanerCounter +} + +func newDomainStatsd(org *common.ORG, domain *mysqlmodel.Domain) *domainStatsd { + return &domainStatsd{ + org: org, + lcuuid: domain.Lcuuid, + name: domain.Name, + teamID: domain.TeamID, + + deviceIPConn: newCleanerCounter(), + chostPodNodeConn: newCleanerCounter(), + } +} + +func (d *domainStatsd) close() { + log.Info("close cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) + d.deviceIPConn.Closed() + d.chostPodNodeConn.Closed() +} + +func (d *domainStatsd) get(tagType string) *CleanerCounter { + switch tagType { + case tagTypeDeviceIPConn: + return d.deviceIPConn + case tagTypeCHostPodNodeConn: + return d.chostPodNodeConn + } + return nil +} + +func (d *domainStatsd) start() { + log.Infof("start cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) + err := stats.RegisterCountableWithModulePrefix( + "controller_", + "resource_relation_exception", + d.deviceIPConn, + stats.OptionStatTags{ + "tenant_org_id": fmt.Sprintf("%d", d.org.ID), + "tenant_team_id": fmt.Sprintf("%d", d.teamID), + "domain": d.name, + "type": tagTypeDeviceIPConn, + }, + ) + if err != nil { + log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) device_ip_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) + } + + err = stats.RegisterCountableWithModulePrefix( + "controller_", + "resource_relation_exception", + d.chostPodNodeConn, + stats.OptionStatTags{ + "tenant_org_id": fmt.Sprintf("%d", d.org.ID), + "tenant_team_id": fmt.Sprintf("%d", d.teamID), + "domain": d.name, + "type": tagTypeCHostPodNodeConn, + }, + ) + if err != nil { + log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) chost_pod_node_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) + } +} + +type TmpCounter struct { + Count uint64 `statsd:"count"` +} + +func (c *TmpCounter) Fill(count int) { + atomic.AddUint64(&c.Count, uint64(count)) +} + +type CleanerCounter struct { + *TmpCounter +} + +func newCleanerCounter() *CleanerCounter { + return &CleanerCounter{ + TmpCounter: &TmpCounter{}, + } +} + +func (c *CleanerCounter) GetCounter() interface{} { + counter := &TmpCounter{} + counter, c.TmpCounter = c.TmpCounter, counter + if counter.Count != 0 { + log.Infof("cleaner counter count: %d", counter.Count) + } + return counter +} + +func (c *CleanerCounter) Closed() bool { + return false +} diff --git a/server/controller/recorder/cleaner/tools.go b/server/controller/recorder/cleaner/tools.go new file mode 100644 index 00000000000..4f0cd54728c --- /dev/null +++ b/server/controller/recorder/cleaner/tools.go @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cleaner + +import ( + "fmt" + "sync" + "time" + + "github.com/deepflowio/deepflow/server/controller/db/mysql" + mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" + "github.com/deepflowio/deepflow/server/controller/recorder/common" + "github.com/deepflowio/deepflow/server/controller/recorder/constraint" + "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" + "github.com/deepflowio/deepflow/server/controller/tagrecorder" +) + +func WhereFindPtr[T any](db *mysql.DB, query interface{}, args ...interface{}) ([]*T, error) { + var result []*T + err := db.Where(query, args...).Find(&result).Error + return result, err +} + +func formatLogDeleteABecauseBHasGone[MT constraint.MySQLModel](a, b string, items []*MT) string { + var str string + for _, item := range items { + str += fmt.Sprintf("%+v ", item) + } + return fmt.Sprintf("%s: %+v because %s has gone", common.LogDelete(a), str, b) +} + +func deleteExpired[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time) []*MT { + var dbItems []*MT + err := db.Unscoped().Where("deleted_at < ?", expiredAt).Find(&dbItems).Error + if err != nil { + log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID) + return nil + } + if len(dbItems) == 0 { + return nil + } + if err := db.Unscoped().Delete(&dbItems).Error; err != nil { + log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID) + return nil + } + return dbItems +} + +func getIDs[MT constraint.MySQLModel](db *mysql.DB, domainLcuuid string) (ids []int) { + var dbItems []*MT + db.Where("domain = ?", domainLcuuid).Select("id").Find(&dbItems) + for _, item := range dbItems { + ids = append(ids, (*item).GetID()) + } + return +} + +func deleteAndPublish[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time, resourceType string, toolData *toolData) { + dbItems := deleteExpired[MT](db, expiredAt) + publishTagrecorder(db, dbItems, resourceType, toolData) + log.Infof("clean %s completed: %d", resourceType, len(dbItems), db.LogPrefixORGID) +} + +func publishTagrecorder[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, dbItems []*MT, resourceType string, toolData *toolData) { + msgMetadataToDBItems := make(map[*message.Metadata][]*MT) + for _, item := range dbItems { + var msgMetadata *message.Metadata + if (*item).GetSubDomainLcuuid() != "" { + msgMetadata = toolData.subDomainLcuuidToMsgMetadata[(*item).GetSubDomainLcuuid()] + } else { + msgMetadata = toolData.domainLcuuidToMsgMetadata[(*item).GetDomainLcuuid()] + } + if msgMetadata == nil { + log.Errorf("failed to get metadata for %s: %#v", resourceType, item, db.LogPrefixORGID) + continue + } + msgMetadataToDBItems[msgMetadata] = append(msgMetadataToDBItems[msgMetadata], item) + } + if len(msgMetadataToDBItems) == 0 { + return + } + for _, sub := range tagrecorder.GetSubscriberManager().GetSubscribers(resourceType) { + for msgMetadata, dbItems := range msgMetadataToDBItems { + sub.OnResourceBatchDeleted(msgMetadata, dbItems) + } + } +} + +type toolData struct { + mux sync.Mutex + + domainLcuuidToMsgMetadata map[string]*message.Metadata + subDomainLcuuidToMsgMetadata map[string]*message.Metadata +} + +func newToolData() *toolData { + return &toolData{ + domainLcuuidToMsgMetadata: make(map[string]*message.Metadata), + subDomainLcuuidToMsgMetadata: make(map[string]*message.Metadata), + } +} + +func (t *toolData) clean() { + t.domainLcuuidToMsgMetadata = make(map[string]*message.Metadata) + t.subDomainLcuuidToMsgMetadata = make(map[string]*message.Metadata) +} + +func (t *toolData) load(db *mysql.DB) error { + t.mux.Lock() + defer t.mux.Unlock() + + t.clean() + + var domains []*mysqlmodel.Domain + if err := db.Find(&domains).Error; err != nil { + log.Errorf("failed to get domain: %s", err.Error(), db.LogPrefixORGID) + return err + } + domainLcuuidToID := make(map[string]int) + for _, domain := range domains { + domainLcuuidToID[domain.Lcuuid] = domain.ID + t.domainLcuuidToMsgMetadata[domain.Lcuuid] = message.NewMetadata(db.ORGID, message.MetadataTeamID(domain.TeamID), message.MetadataDomainID(domain.ID)) + } + var subDomains []*mysqlmodel.SubDomain + if err := db.Find(&subDomains).Error; err != nil { + log.Errorf("failed to get sub_domain: %s", err.Error(), db.LogPrefixORGID) + return err + } + for _, subDomain := range subDomains { + t.subDomainLcuuidToMsgMetadata[subDomain.Lcuuid] = message.NewMetadata( + db.ORGID, message.MetadataTeamID(subDomain.TeamID), message.MetadataDomainID(domainLcuuidToID[subDomain.Domain]), message.MetadataSubDomainID(subDomain.ID), + ) + } + return nil +} diff --git a/server/controller/recorder/domain.go b/server/controller/recorder/domain.go index 80caf68bb1e..a9f6eae2270 100644 --- a/server/controller/recorder/domain.go +++ b/server/controller/recorder/domain.go @@ -32,6 +32,7 @@ import ( rcommon "github.com/deepflowio/deepflow/server/controller/recorder/common" "github.com/deepflowio/deepflow/server/controller/recorder/config" "github.com/deepflowio/deepflow/server/controller/recorder/listener" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" "github.com/deepflowio/deepflow/server/controller/recorder/updater" "github.com/deepflowio/deepflow/server/controller/trisolaris/refresh" "github.com/deepflowio/deepflow/server/libs/queue" @@ -45,6 +46,7 @@ const ( type domain struct { metadata *rcommon.Metadata + statsd *statsd.DomainStatsd eventQueue *queue.OverwriteQueue cache *cache.Cache @@ -55,6 +57,7 @@ func newDomain(ctx context.Context, cfg config.RecorderConfig, eventQueue *queue cacheMng := cache.NewCacheManager(ctx, cfg, md) return &domain{ metadata: md, + statsd: statsd.NewDomainStatsd(md), eventQueue: eventQueue, cache: cacheMng.DomainCache, @@ -62,6 +65,11 @@ func newDomain(ctx context.Context, cfg config.RecorderConfig, eventQueue *queue } } +func (d *domain) CloseStatsd() { + d.statsd.Close() + d.subDomains.CloseStatsd() +} + func (d *domain) Refresh(target string, cloudData cloudmodel.Resource) error { log.Infof("refresh target: %s", target, d.metadata.LogPrefixes) switch target { @@ -157,7 +165,7 @@ func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.Res updater.NewHost(d.cache, cloudData.Hosts).RegisterListener( listener.NewHost(d.cache, d.eventQueue)), updater.NewVM(d.cache, cloudData.VMs).RegisterListener( - listener.NewVM(d.cache, d.eventQueue)), + listener.NewVM(d.cache, d.eventQueue)).BuildStatsd(d.statsd), updater.NewPodCluster(d.cache, cloudData.PodClusters).RegisterListener( listener.NewPodCluster(d.cache)), updater.NewPodNode(d.cache, cloudData.PodNodes).RegisterListener( @@ -181,7 +189,7 @@ func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.Res updater.NewPodReplicaSet(d.cache, cloudData.PodReplicaSets).RegisterListener( listener.NewPodReplicaSet(d.cache)), updater.NewPod(d.cache, cloudData.Pods).RegisterListener( - listener.NewPod(d.cache, d.eventQueue)), + listener.NewPod(d.cache, d.eventQueue)).BuildStatsd(d.statsd), updater.NewNetwork(d.cache, cloudData.Networks).RegisterListener( listener.NewNetwork(d.cache)), updater.NewSubnet(d.cache, cloudData.Subnets).RegisterListener( @@ -260,6 +268,10 @@ func (d *domain) updateSyncedAt(syncAt time.Time) { if syncAt.IsZero() { return } + + log.Infof("update domain synced_at: %s", syncAt.Format(common.GO_BIRTHDAY), d.metadata.LogPrefixes) + d.fillStatsd(syncAt) + var domain mysqlmodel.Domain err := d.metadata.DB.Where("lcuuid = ?", d.metadata.Domain.Lcuuid).First(&domain).Error if err != nil { @@ -271,6 +283,12 @@ func (d *domain) updateSyncedAt(syncAt time.Time) { log.Debugf("update domain (%+v)", domain, d.metadata.LogPrefixes) } +func (d *domain) fillStatsd(syncAt time.Time) { + log.Infof("time now: %s", time.Now().Format(common.GO_BIRTHDAY), d.metadata.LogPrefixes) + cost := time.Since(syncAt).Seconds() + d.statsd.GetMonitor(statsd.TagTypeSyncCost).Fill(int(cost)) +} + func (d *domain) updateStateInfo(cloudData cloudmodel.Resource) { var domain mysqlmodel.Domain err := d.metadata.DB.Where("lcuuid = ?", d.metadata.Domain.Lcuuid).First(&domain).Error diff --git a/server/controller/recorder/recorder.go b/server/controller/recorder/recorder.go index 6269ede9881..0e0ba2c455c 100644 --- a/server/controller/recorder/recorder.go +++ b/server/controller/recorder/recorder.go @@ -61,3 +61,11 @@ func NewRecorder(ctx context.Context, cfg config.RecorderConfig, eventQueue *que func (r *Recorder) Refresh(target string, cloudData cloudmodel.Resource) error { return r.domainRefresher.Refresh(target, cloudData) } + +func (r *Recorder) Stop() { + r.CloseStatsd() +} + +func (r *Recorder) CloseStatsd() { + r.domainRefresher.CloseStatsd() +} diff --git a/server/controller/recorder/resource.go b/server/controller/recorder/resource.go index 76dc68f8797..4521794ac21 100644 --- a/server/controller/recorder/resource.go +++ b/server/controller/recorder/resource.go @@ -20,6 +20,7 @@ import ( "context" "sync" + "github.com/deepflowio/deepflow/server/controller/recorder/cleaner" "github.com/deepflowio/deepflow/server/controller/recorder/config" "github.com/deepflowio/deepflow/server/controller/recorder/db/idmng" ) @@ -30,14 +31,14 @@ var ( ) type Resource struct { - Cleaners *Cleaners + Cleaners *cleaner.Cleaners IDManagers *idmng.IDManagers } func GetResource() *Resource { resourceOnce.Do(func() { resource = &Resource{ - Cleaners: GetCleaners(), + Cleaners: cleaner.GetCleaners(), IDManagers: idmng.GetIDManagers(), } }) diff --git a/server/controller/recorder/statsd/measurement.go b/server/controller/recorder/statsd/measurement.go new file mode 100644 index 00000000000..c4f5d73cba8 --- /dev/null +++ b/server/controller/recorder/statsd/measurement.go @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package statsd + +import ( + "sync/atomic" +) + +type Monitor interface { + Fill(int) +} + +type LocalResourceSyncDelay struct { + Count uint64 `statsd:"count"` + AvgDelay uint64 `statsd:"avg_delay"` + MaxDelay uint64 `statsd:"max_delay"` + sumDelay uint64 +} + +func (d *LocalResourceSyncDelay) Fill(delay int) { + atomic.AddUint64(&d.Count, 1) + atomic.AddUint64(&d.sumDelay, uint64(delay)) + if atomic.LoadUint64(&d.MaxDelay) < uint64(delay) { + atomic.StoreUint64(&d.MaxDelay, uint64(delay)) + } +} + +type ResourceSyncDelay struct { + *LocalResourceSyncDelay +} + +func newResourceDalay() *ResourceSyncDelay { + return &ResourceSyncDelay{ + LocalResourceSyncDelay: &LocalResourceSyncDelay{}, + } +} + +func (d *ResourceSyncDelay) GetCounter() interface{} { + local := &LocalResourceSyncDelay{} + local, d.LocalResourceSyncDelay = d.LocalResourceSyncDelay, local + if local.Count > 0 { + local.AvgDelay = local.sumDelay / local.Count + } + return local +} + +func (r *ResourceSyncDelay) Closed() bool { + return false +} + +type LocalSyncCost struct { + Cost uint64 `statsd:"cost"` +} + +func (c *LocalSyncCost) Fill(cost int) { + atomic.StoreUint64(&c.Cost, uint64(cost)) + log.Infof("fill cost: %d", cost) // TODO +} + +type SyncCost struct { + *LocalSyncCost +} + +func newSyncCost() *SyncCost { + return &SyncCost{ + LocalSyncCost: &LocalSyncCost{}, + } +} + +func (c *SyncCost) GetCounter() interface{} { + local := &LocalSyncCost{} + local, c.LocalSyncCost = c.LocalSyncCost, local + if local.Cost > 0 { // TODO + log.Infof("local.Cost: %d", local.Cost) + } + return local +} + +func (c *SyncCost) Closed() bool { + return false +} diff --git a/server/controller/recorder/statsd/statsd.go b/server/controller/recorder/statsd/statsd.go new file mode 100644 index 00000000000..f2140b2809b --- /dev/null +++ b/server/controller/recorder/statsd/statsd.go @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package statsd + +import ( + "fmt" + + "github.com/deepflowio/deepflow/server/controller/recorder/common" + "github.com/deepflowio/deepflow/server/libs/logger" + "github.com/deepflowio/deepflow/server/libs/stats" +) + +var log = logger.MustGetLogger("recorder.statsd") + +const ( + TagTypeSyncCost = "sync_cost" + TagTypeVMSyncDelay = "vm_sync_delay" + TagTypePodSyncDelay = "pod_sync_delay" + + counterModulePrefix = "controller_" + counterModuleTypeCloudTaskCost = "cloud_task_cost" + counterModuleTypeResourceSyncDelay = "resource_sync_delay" + resourceSyncDelayTypeChost = "chost" + resourceSyncDelayTypePod = "pod" + + optStatTagORGID = "tenant_org_id" + optStatTagTeamID = "tenant_team_id" + optStatTagDomain = "domain" + optStatTagSubDomain = "sub_domain" + optStatTagType = "type" +) + +type Statsd interface { + GetMonitor(tagType string) Monitor + GetMetadata() *common.Metadata +} + +func NewDomainStatsd(md *common.Metadata) *DomainStatsd { + s := &DomainStatsd{ + md: md, + + syncCost: newSyncCost(), + vmSyncDelay: newResourceDalay(), + podSyncDelay: newResourceDalay(), + } + s.Start() + return s +} + +type DomainStatsd struct { + md *common.Metadata + + syncCost *SyncCost + vmSyncDelay *ResourceSyncDelay + podSyncDelay *ResourceSyncDelay +} + +func (r *DomainStatsd) GetMetadata() *common.Metadata { + return r.md +} + +func (r *DomainStatsd) Start() { + log.Infof("start statsd", r.md.Domain.Lcuuid, r.md.LogPrefixes) + err := stats.RegisterCountableWithModulePrefix( + counterModulePrefix, + counterModuleTypeCloudTaskCost, + r.syncCost, + stats.OptionStatTags{ + optStatTagORGID: fmt.Sprintf("%d", r.md.GetORGID()), + optStatTagTeamID: fmt.Sprintf("%d", r.md.GetTeamID()), + optStatTagDomain: r.md.Domain.Name, + }, + ) + if err != nil { + log.Errorf("failed to register statsd %s: %s", TagTypeSyncCost, err.Error(), r.md.LogPrefixes) + } + + err = stats.RegisterCountableWithModulePrefix( + counterModulePrefix, + counterModuleTypeResourceSyncDelay, + r.vmSyncDelay, + stats.OptionStatTags{ + optStatTagORGID: fmt.Sprintf("%d", r.md.GetORGID()), + optStatTagTeamID: fmt.Sprintf("%d", r.md.GetTeamID()), + optStatTagDomain: r.md.Domain.Name, + optStatTagType: resourceSyncDelayTypeChost, + }, + ) + if err != nil { + log.Errorf("failed to register statsd %s: %s", TagTypeVMSyncDelay, err.Error(), r.md.LogPrefixes) + } + + err = stats.RegisterCountableWithModulePrefix( + counterModulePrefix, + counterModuleTypeResourceSyncDelay, + r.podSyncDelay, + stats.OptionStatTags{ + optStatTagORGID: fmt.Sprintf("%d", r.md.GetORGID()), + optStatTagTeamID: fmt.Sprintf("%d", r.md.GetTeamID()), + optStatTagDomain: r.md.Domain.Name, + optStatTagType: resourceSyncDelayTypePod, + }, + ) + if err != nil { + log.Errorf("failed to register statsd %s: %s", TagTypePodSyncDelay, err.Error(), r.md.LogPrefixes) + } +} + +func (r *DomainStatsd) GetMonitor(tagType string) Monitor { + switch tagType { + case TagTypeSyncCost: + return r.syncCost + case TagTypeVMSyncDelay: + return r.vmSyncDelay + case TagTypePodSyncDelay: + return r.podSyncDelay + default: + return nil + } +} + +func (r *DomainStatsd) Close() { + r.syncCost.Closed() + r.vmSyncDelay.Closed() +} + +func NewSubDomainStatsd(md *common.Metadata) *SubDomainStatsd { + s := &SubDomainStatsd{ + md: md, + + podSyncDelay: newResourceDalay(), + } + s.Start() + return s +} + +type SubDomainStatsd struct { + md *common.Metadata + + podSyncDelay *ResourceSyncDelay +} + +func (r *SubDomainStatsd) GetMetadata() *common.Metadata { + return r.md +} + +func (r *SubDomainStatsd) Start() { + log.Infof("start statsd", r.md.LogPrefixes) + err := stats.RegisterCountableWithModulePrefix( + counterModulePrefix, + counterModuleTypeResourceSyncDelay, + r.podSyncDelay, + stats.OptionStatTags{ + optStatTagORGID: fmt.Sprintf("%d", r.md.GetORGID()), + optStatTagTeamID: fmt.Sprintf("%d", r.md.GetTeamID()), + optStatTagDomain: r.md.Domain.Name, + optStatTagSubDomain: r.md.SubDomain.Name, + optStatTagType: resourceSyncDelayTypePod, + }, + ) + if err != nil { + log.Errorf("failed to register statsd %s: %s", TagTypePodSyncDelay, err.Error(), r.md.LogPrefixes) + } +} + +func (r *SubDomainStatsd) GetMonitor(tagType string) Monitor { + switch tagType { + case TagTypePodSyncDelay: + return r.podSyncDelay + default: + return nil + } +} + +func (r *SubDomainStatsd) Close() { + r.podSyncDelay.Closed() +} diff --git a/server/controller/recorder/sub_domain.go b/server/controller/recorder/sub_domain.go index 0bd9c4cc269..a8fa094afc8 100644 --- a/server/controller/recorder/sub_domain.go +++ b/server/controller/recorder/sub_domain.go @@ -30,6 +30,7 @@ import ( rcommon "github.com/deepflowio/deepflow/server/controller/recorder/common" "github.com/deepflowio/deepflow/server/controller/recorder/config" "github.com/deepflowio/deepflow/server/controller/recorder/listener" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" "github.com/deepflowio/deepflow/server/controller/recorder/updater" "github.com/deepflowio/deepflow/server/controller/trisolaris/refresh" "github.com/deepflowio/deepflow/server/libs/queue" @@ -55,6 +56,12 @@ func newSubDomains(ctx context.Context, cfg config.RecorderConfig, eventQueue *q } } +func (s *subDomains) CloseStatsd() { + for _, refresher := range s.refreshers { + refresher.statsd.Close() + } +} + func (s *subDomains) RefreshAll(cloudData map[string]cloudmodel.SubDomainResource) error { // 遍历 cloud 中的 subdomain 资源,与缓存中的 subdomain 资源对比,根据对比结果增删改 var err error @@ -109,6 +116,7 @@ func (s *subDomains) newRefresher(lcuuid string) (*subDomain, error) { type subDomain struct { metadata *rcommon.Metadata + statsd *statsd.SubDomainStatsd domainToolDataSet *tool.DataSet cache *cache.Cache @@ -118,6 +126,7 @@ type subDomain struct { func newSubDomain(eventQueue *queue.OverwriteQueue, md *rcommon.Metadata, domainToolDataSet *tool.DataSet, cache *cache.Cache) *subDomain { return &subDomain{ metadata: md, + statsd: statsd.NewSubDomainStatsd(md), domainToolDataSet: domainToolDataSet, cache: cache, @@ -207,7 +216,7 @@ func (s *subDomain) getUpdatersInOrder(cloudData cloudmodel.SubDomainResource) [ updater.NewPodReplicaSet(s.cache, cloudData.PodReplicaSets).RegisterListener( listener.NewPodReplicaSet(s.cache)), updater.NewPod(s.cache, cloudData.Pods).RegisterListener( - listener.NewPod(s.cache, s.eventQueue)), + listener.NewPod(s.cache, s.eventQueue)).BuildStatsd(s.statsd), updater.NewNetwork(s.cache, cloudData.Networks).RegisterListener( listener.NewNetwork(s.cache)), updater.NewSubnet(s.cache, cloudData.Subnets).RegisterListener( @@ -252,6 +261,8 @@ func (s *subDomain) updateSyncedAt(lcuuid string, syncAt time.Time) { if syncAt.IsZero() { return } + log.Infof("update sub_domain synced_at: %s", syncAt.Format(common.GO_BIRTHDAY), s.metadata.LogPrefixes) + var subDomain mysqlmodel.SubDomain err := s.metadata.DB.Where("lcuuid = ?", lcuuid).First(&subDomain).Error if err != nil { diff --git a/server/controller/recorder/updater/ip.go b/server/controller/recorder/updater/ip.go index 2286c303f35..5145e24d328 100644 --- a/server/controller/recorder/updater/ip.go +++ b/server/controller/recorder/updater/ip.go @@ -21,6 +21,7 @@ import ( ctrlrcommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/recorder/cache" "github.com/deepflowio/deepflow/server/controller/recorder/cache/tool" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" ) type IP struct { @@ -70,6 +71,8 @@ func (i *IP) GetResourceType() string { return ctrlrcommon.RESOURCE_TYPE_IP_EN } +func (i *IP) BuildStatsd(statsd statsd.Statsd) ResourceUpdater { return nil } + func (i *IP) splitToWANAndLAN(cloudData []cloudmodel.IP) ([]cloudmodel.IP, []cloudmodel.IP) { wanCloudData := []cloudmodel.IP{} lanCloudData := []cloudmodel.IP{} diff --git a/server/controller/recorder/updater/pod.go b/server/controller/recorder/updater/pod.go index 276b2f85ff7..aeaa4bb310d 100644 --- a/server/controller/recorder/updater/pod.go +++ b/server/controller/recorder/updater/pod.go @@ -17,6 +17,8 @@ package updater import ( + "time" + cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model" ctrlrcommon "github.com/deepflowio/deepflow/server/controller/common" mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" @@ -24,6 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/recorder/cache/diffbase" "github.com/deepflowio/deepflow/server/controller/recorder/db" "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" ) type Pod struct { @@ -150,10 +153,16 @@ func (p *Pod) generateDBItemToAdd(cloudItem *cloudmodel.Pod) (*mysqlmodel.Pod, b dbItem.Lcuuid = cloudItem.Lcuuid if !cloudItem.CreatedAt.IsZero() { dbItem.CreatedAt = cloudItem.CreatedAt + p.recordStatsd(cloudItem) } return dbItem, true } +func (m *Pod) recordStatsd(cloudItem *cloudmodel.Pod) { + syncDelay := time.Since(cloudItem.CreatedAt).Seconds() + m.statsd.GetMonitor(statsd.TagTypePodSyncDelay).Fill(int(syncDelay)) +} + func (p *Pod) generateUpdateInfo(diffBase *diffbase.Pod, cloudItem *cloudmodel.Pod) (*message.PodFieldsUpdate, map[string]interface{}, bool) { structInfo := new(message.PodFieldsUpdate) mapInfo := make(map[string]interface{}) diff --git a/server/controller/recorder/updater/updater.go b/server/controller/recorder/updater/updater.go index 49ce4b393a6..88c52f41c47 100644 --- a/server/controller/recorder/updater/updater.go +++ b/server/controller/recorder/updater/updater.go @@ -26,6 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/recorder/pubsub" "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" msg "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message/constraint" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" ) // ResourceUpdater 实现资源进行新旧数据比对,并根据比对结果增删改资源 @@ -39,6 +40,7 @@ type ResourceUpdater interface { HandleDelete() Publisher + StatsdBuilder } type Publisher interface { @@ -46,6 +48,10 @@ type Publisher interface { GetResourceType() string } +type StatsdBuilder interface { + BuildStatsd(statsd.Statsd) ResourceUpdater +} + type DataGenerator[CT constraint.CloudModel, MT constraint.MySQLModel, BT constraint.DiffBase, MFUPT msg.FieldsUpdatePtr[MFUT], MFUT msg.FieldsUpdate] interface { // 根据 cloud 数据获取对应的 diff base 数据 getDiffBaseByCloudItem(*CT) (BT, bool) @@ -72,6 +78,8 @@ type UpdaterBase[ metadata *common.Metadata msgMetadata *message.Metadata + statsd statsd.Statsd + resourceType string cache *cache.Cache // 基于 Domain 或者 SubDomain 范围构造 @@ -137,6 +145,11 @@ func (u *UpdaterBase[CT, BT, MPT, MT, MAPT, MAT, MUPT, MUT, MFUPT, MFUT, MDPT, M u.pubsub = ps.(pubsub.ResourcePubSub[MAPT, MAT, MUPT, MUT, MFUPT, MFUT, MDPT, MDT]) } +func (u *UpdaterBase[CT, BT, MPT, MT, MAPT, MAT, MUPT, MUT, MFUPT, MFUT, MDPT, MDT]) BuildStatsd(statsd statsd.Statsd) ResourceUpdater { + u.statsd = statsd + return u +} + func (u *UpdaterBase[CT, BT, MPT, MT, MAPT, MAT, MUPT, MUT, MFUPT, MFUT, MDPT, MDT]) setDataGenerator(dataGenerator DataGenerator[CT, MT, BT, MFUPT, MFUT]) { u.dataGenerator = dataGenerator } diff --git a/server/controller/recorder/updater/vm.go b/server/controller/recorder/updater/vm.go index dd89585e0c7..663e8e79fe7 100644 --- a/server/controller/recorder/updater/vm.go +++ b/server/controller/recorder/updater/vm.go @@ -18,6 +18,7 @@ package updater import ( "encoding/json" + "time" cloudcommon "github.com/deepflowio/deepflow/server/controller/cloud/common" cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model" @@ -27,6 +28,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/recorder/cache/diffbase" "github.com/deepflowio/deepflow/server/controller/recorder/db" "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" + "github.com/deepflowio/deepflow/server/controller/recorder/statsd" ) type VM struct { @@ -112,10 +114,16 @@ func (m *VM) generateDBItemToAdd(cloudItem *cloudmodel.VM) (*mysqlmodel.VM, bool dbItem.Lcuuid = cloudItem.Lcuuid if !cloudItem.CreatedAt.IsZero() { dbItem.CreatedAt = cloudItem.CreatedAt + m.recordStatsd(cloudItem) } return dbItem, true } +func (m *VM) recordStatsd(cloudItem *cloudmodel.VM) { + syncDelay := time.Since(cloudItem.CreatedAt).Seconds() + m.statsd.GetMonitor(statsd.TagTypeVMSyncDelay).Fill(int(syncDelay)) +} + func (m *VM) generateUpdateInfo(diffBase *diffbase.VM, cloudItem *cloudmodel.VM) (*message.VMFieldsUpdate, map[string]interface{}, bool) { structInfo := new(message.VMFieldsUpdate) mapInfo := make(map[string]interface{}) diff --git a/server/controller/trisolaris/refresh/refresh.go b/server/controller/trisolaris/refresh/refresh.go index ced2fb7cc7f..1d0b66d6456 100644 --- a/server/controller/trisolaris/refresh/refresh.go +++ b/server/controller/trisolaris/refresh/refresh.go @@ -65,7 +65,7 @@ func (r *RefreshOP) refreshCache(orgID int, dataTypes []common.DataChanged) { if len(dataTypes) == 0 || (len(localControllerIPs) == 0 && len(remoteControllerIPs) == 0) { return } - log.Infof("refresh cache for trisolaris(%v %v)", localControllerIPs, remoteControllerIPs) + log.Infof("refresh cache for trisolaris(%v %v), orgID(%d), dataTypes(%v)", localControllerIPs, remoteControllerIPs, orgID, dataTypes) params := url.Values{} params.Add("org_id", strconv.Itoa(orgID)) for _, dataType := range dataTypes {