Skip to content

Commit

Permalink
feat: adds resource synchronization delay alarms
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 committed Dec 10, 2024
1 parent f05cd23 commit 49e6dce
Show file tree
Hide file tree
Showing 15 changed files with 650 additions and 239 deletions.
1 change: 1 addition & 0 deletions server/controller/manager/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (t *Task) startSubDomainRefreshMonitor() {
}

func (t *Task) Stop() {
t.Recorder.Stop()
t.Cloud.Stop()
if t.tCancel != nil {
t.tCancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
* limitations under the License.
*/

// 永久删除MySQL中超过7天的软删除云平台资源数据
package recorder
package cleaner

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -31,12 +29,11 @@ import (
mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model"
"github.com/deepflowio/deepflow/server/controller/recorder/common"
"github.com/deepflowio/deepflow/server/controller/recorder/config"
"github.com/deepflowio/deepflow/server/controller/recorder/constraint"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
"github.com/deepflowio/deepflow/server/controller/tagrecorder"
"github.com/deepflowio/deepflow/server/libs/stats"
"github.com/deepflowio/deepflow/server/libs/logger"
)

var log = logger.MustGetLogger("recorder.cleaner")

var (
cleanersOnce sync.Once
cleaners *Cleaners
Expand Down Expand Up @@ -594,229 +591,3 @@ func (c *Cleaner) cleanVInterfaceDirty(domainLcuuid string) {
}
}
}

const (
tagTypeDeviceIPConn = "device_ip_connection"
tagTypeCHostPodNodeConn = "chost_pod_node_connection"
)

type domainStatsd struct {
org *common.ORG
lcuuid string
name string
teamID int

deviceIPConn *CleanerCounter
chostPodNodeConn *CleanerCounter
}

func newDomainStatsd(org *common.ORG, domain *mysqlmodel.Domain) *domainStatsd {
return &domainStatsd{
org: org,
lcuuid: domain.Lcuuid,
name: domain.Name,
teamID: domain.TeamID,

deviceIPConn: newCleanerCounter(),
chostPodNodeConn: newCleanerCounter(),
}
}

func (d *domainStatsd) close() {
log.Info("close cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix)
d.deviceIPConn.Closed()
d.chostPodNodeConn.Closed()
}

func (d *domainStatsd) get(tagType string) *CleanerCounter {
switch tagType {
case tagTypeDeviceIPConn:
return d.deviceIPConn
case tagTypeCHostPodNodeConn:
return d.chostPodNodeConn
}
return nil
}

func (d *domainStatsd) start() {
log.Infof("start cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix)
err := stats.RegisterCountableWithModulePrefix(
"controller_",
"resource_relation_exception",
d.deviceIPConn,
stats.OptionStatTags{
"tenant_org_id": fmt.Sprintf("%d", d.org.ID),
"tenant_team_id": fmt.Sprintf("%d", d.teamID),
"domain": d.name,
"type": tagTypeDeviceIPConn,
},
)
if err != nil {
log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) device_ip_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix)
}

err = stats.RegisterCountableWithModulePrefix(
"controller_",
"resource_relation_exception",
d.chostPodNodeConn,
stats.OptionStatTags{
"tenant_org_id": fmt.Sprintf("%d", d.org.ID),
"tenant_team_id": fmt.Sprintf("%d", d.teamID),
"domain": d.name,
"type": tagTypeCHostPodNodeConn,
},
)
if err != nil {
log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) chost_pod_node_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix)
}
}

type TmpCounter struct {
Count uint64 `statsd:"count"`
}

func (c *TmpCounter) Fill(count int) {
atomic.AddUint64(&c.Count, uint64(count))
}

type CleanerCounter struct {
*TmpCounter
}

func newCleanerCounter() *CleanerCounter {
return &CleanerCounter{
TmpCounter: &TmpCounter{},
}
}

func (c *CleanerCounter) GetCounter() interface{} {
counter := &TmpCounter{}
counter, c.TmpCounter = c.TmpCounter, counter
if counter.Count != 0 {
log.Infof("cleaner counter count: %d", counter.Count)
}
return counter
}

func (c *CleanerCounter) Closed() bool {
return false
}

func WhereFindPtr[T any](db *mysql.DB, query interface{}, args ...interface{}) ([]*T, error) {
var result []*T
err := db.Where(query, args...).Find(&result).Error
return result, err
}

func formatLogDeleteABecauseBHasGone[MT constraint.MySQLModel](a, b string, items []*MT) string {
var str string
for _, item := range items {
str += fmt.Sprintf("%+v ", item)
}
return fmt.Sprintf("%s: %+v because %s has gone", common.LogDelete(a), str, b)
}

func deleteExpired[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time) []*MT {
var dbItems []*MT
err := db.Unscoped().Where("deleted_at < ?", expiredAt).Find(&dbItems).Error
if err != nil {
log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID)
return nil
}
if len(dbItems) == 0 {
return nil
}
if err := db.Unscoped().Delete(&dbItems).Error; err != nil {
log.Errorf("mysql delete resource failed: %s", err.Error(), db.LogPrefixORGID)
return nil
}
return dbItems
}

func getIDs[MT constraint.MySQLModel](db *mysql.DB, domainLcuuid string) (ids []int) {
var dbItems []*MT
db.Where("domain = ?", domainLcuuid).Select("id").Find(&dbItems)
for _, item := range dbItems {
ids = append(ids, (*item).GetID())
}
return
}

func deleteAndPublish[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt time.Time, resourceType string, toolData *toolData) {
dbItems := deleteExpired[MT](db, expiredAt)
publishTagrecorder(db, dbItems, resourceType, toolData)
log.Infof("clean %s completed: %d", resourceType, len(dbItems), db.LogPrefixORGID)
}

func publishTagrecorder[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, dbItems []*MT, resourceType string, toolData *toolData) {
msgMetadataToDBItems := make(map[*message.Metadata][]*MT)
for _, item := range dbItems {
var msgMetadata *message.Metadata
if (*item).GetSubDomainLcuuid() != "" {
msgMetadata = toolData.subDomainLcuuidToMsgMetadata[(*item).GetSubDomainLcuuid()]
} else {
msgMetadata = toolData.domainLcuuidToMsgMetadata[(*item).GetDomainLcuuid()]
}
if msgMetadata == nil {
log.Errorf("failed to get metadata for %s: %#v", resourceType, item, db.LogPrefixORGID)
continue
}
msgMetadataToDBItems[msgMetadata] = append(msgMetadataToDBItems[msgMetadata], item)
}
if len(msgMetadataToDBItems) == 0 {
return
}
for _, sub := range tagrecorder.GetSubscriberManager().GetSubscribers(resourceType) {
for msgMetadata, dbItems := range msgMetadataToDBItems {
sub.OnResourceBatchDeleted(msgMetadata, dbItems)
}
}
}

type toolData struct {
mux sync.Mutex

domainLcuuidToMsgMetadata map[string]*message.Metadata
subDomainLcuuidToMsgMetadata map[string]*message.Metadata
}

func newToolData() *toolData {
return &toolData{
domainLcuuidToMsgMetadata: make(map[string]*message.Metadata),
subDomainLcuuidToMsgMetadata: make(map[string]*message.Metadata),
}
}

func (t *toolData) clean() {
t.domainLcuuidToMsgMetadata = make(map[string]*message.Metadata)
t.subDomainLcuuidToMsgMetadata = make(map[string]*message.Metadata)
}

func (t *toolData) load(db *mysql.DB) error {
t.mux.Lock()
defer t.mux.Unlock()

t.clean()

var domains []*mysqlmodel.Domain
if err := db.Find(&domains).Error; err != nil {
log.Errorf("failed to get domain: %s", err.Error(), db.LogPrefixORGID)
return err
}
domainLcuuidToID := make(map[string]int)
for _, domain := range domains {
domainLcuuidToID[domain.Lcuuid] = domain.ID
t.domainLcuuidToMsgMetadata[domain.Lcuuid] = message.NewMetadata(db.ORGID, message.MetadataTeamID(domain.TeamID), message.MetadataDomainID(domain.ID))
}
var subDomains []*mysqlmodel.SubDomain
if err := db.Find(&subDomains).Error; err != nil {
log.Errorf("failed to get sub_domain: %s", err.Error(), db.LogPrefixORGID)
return err
}
for _, subDomain := range subDomains {
t.subDomainLcuuidToMsgMetadata[subDomain.Lcuuid] = message.NewMetadata(
db.ORGID, message.MetadataTeamID(subDomain.TeamID), message.MetadataDomainID(domainLcuuidToID[subDomain.Domain]), message.MetadataSubDomainID(subDomain.ID),
)
}
return nil
}
Loading

0 comments on commit 49e6dce

Please sign in to comment.