From b1c60ad5c4d6aabc1c56c6c450554bdde4f27546 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 13 Jul 2024 15:24:29 +0700 Subject: [PATCH 1/9] diag: thread-safety step1 - json marshal under mutex (#11134) --- diagnostics/bodies_info.go | 3 +-- diagnostics/headers.go | 3 +-- diagnostics/snapshot_sync.go | 13 ++++++------- erigon-lib/diagnostics/bodies.go | 8 ++++++-- erigon-lib/diagnostics/headers.go | 10 ++++++++-- erigon-lib/diagnostics/resources_usage.go | 9 +++++++-- erigon-lib/diagnostics/snapshots.go | 19 +++++++++++++++++-- erigon-lib/diagnostics/speedtest.go | 11 +++++++++-- erigon-lib/diagnostics/stages.go | 16 ++++++++++++++++ erigon-lib/diagnostics/sys_info.go | 11 +++++++++-- 10 files changed, 80 insertions(+), 23 deletions(-) diff --git a/diagnostics/bodies_info.go b/diagnostics/bodies_info.go index c04c077b466..6a656a0c339 100644 --- a/diagnostics/bodies_info.go +++ b/diagnostics/bodies_info.go @@ -17,7 +17,6 @@ package diagnostics import ( - "encoding/json" "net/http" diaglib "github.com/ledgerwatch/erigon-lib/diagnostics" @@ -37,5 +36,5 @@ func SetupBodiesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient } func writeBodies(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.GetBodiesInfo()) + diag.BodiesInfoJson(w) } diff --git a/diagnostics/headers.go b/diagnostics/headers.go index 7c770f76461..da861ed9902 100644 --- a/diagnostics/headers.go +++ b/diagnostics/headers.go @@ -17,7 +17,6 @@ package diagnostics import ( - "encoding/json" "net/http" diaglib "github.com/ledgerwatch/erigon-lib/diagnostics" @@ -36,5 +35,5 @@ func SetupHeadersAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClien } func writeHeaders(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.GetHeaders()) + diag.HeadersJson(w) } diff --git a/diagnostics/snapshot_sync.go b/diagnostics/snapshot_sync.go index bbe543dc3fd..a65ca373085 100644 --- a/diagnostics/snapshot_sync.go +++ b/diagnostics/snapshot_sync.go @@ -17,7 +17,6 @@ package diagnostics import ( - "encoding/json" "net/http" diaglib "github.com/ledgerwatch/erigon-lib/diagnostics" @@ -66,25 +65,25 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient } func writeNetworkSpeed(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.GetNetworkSpeed()) + diag.NetworkSpeedJson(w) } func writeResourcesUsage(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.GetResourcesUsage()) + diag.ResourcesUsageJson(w) } func writeStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.SyncStatistics()) + diag.SyncStatsJson(w) } func writeFilesList(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.SnapshotFilesList()) + diag.SnapshotFilesListJson(w) } func writeHardwareInfo(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.HardwareInfo()) + diag.HardwareInfoJson(w) } func writeSyncStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) { - json.NewEncoder(w).Encode(diag.GetSyncStages()) + diag.SyncStagesJson(w) } diff --git a/erigon-lib/diagnostics/bodies.go b/erigon-lib/diagnostics/bodies.go index 9bf2ac82ded..d52ebcac4d7 100644 --- a/erigon-lib/diagnostics/bodies.go +++ b/erigon-lib/diagnostics/bodies.go @@ -18,6 +18,8 @@ package diagnostics import ( "context" + "encoding/json" + "io" "github.com/ledgerwatch/erigon-lib/log/v3" ) @@ -109,8 +111,10 @@ func (d *DiagnosticClient) runBodiesProcessingListener(rootCtx context.Context) }() } -func (d *DiagnosticClient) GetBodiesInfo() BodiesInfo { +func (d *DiagnosticClient) BodiesInfoJson(w io.Writer) { d.bodiesMutex.Lock() defer d.bodiesMutex.Unlock() - return d.bodies + if err := json.NewEncoder(w).Encode(d.bodies); err != nil { + log.Debug("[diagnostics] BodiesInfoJson", "err", err) + } } diff --git a/erigon-lib/diagnostics/headers.go b/erigon-lib/diagnostics/headers.go index ed0afb0cac0..c03b3bee735 100644 --- a/erigon-lib/diagnostics/headers.go +++ b/erigon-lib/diagnostics/headers.go @@ -18,6 +18,8 @@ package diagnostics import ( "context" + "encoding/json" + "io" "github.com/ledgerwatch/erigon-lib/log/v3" ) @@ -29,8 +31,12 @@ func (d *DiagnosticClient) setupHeadersDiagnostics(rootCtx context.Context) { d.runProcessedListener(rootCtx) } -func (d *DiagnosticClient) GetHeaders() Headers { - return d.headers +func (d *DiagnosticClient) HeadersJson(w io.Writer) { + d.headerMutex.Lock() + defer d.headerMutex.Unlock() + if err := json.NewEncoder(w).Encode(d.headers); err != nil { + log.Debug("[diagnostics] HeadersJson", "err", err) + } } func (d *DiagnosticClient) runHeadersWaitingListener(rootCtx context.Context) { diff --git a/erigon-lib/diagnostics/resources_usage.go b/erigon-lib/diagnostics/resources_usage.go index e5d4ad7510f..6f02d102aa6 100644 --- a/erigon-lib/diagnostics/resources_usage.go +++ b/erigon-lib/diagnostics/resources_usage.go @@ -18,6 +18,8 @@ package diagnostics import ( "context" + "encoding/json" + "io" "github.com/ledgerwatch/erigon-lib/log/v3" ) @@ -26,13 +28,16 @@ func (d *DiagnosticClient) setupResourcesUsageDiagnostics(rootCtx context.Contex d.runMemoryStatsListener(rootCtx) } -func (d *DiagnosticClient) GetResourcesUsage() ResourcesUsage { +func (d *DiagnosticClient) ResourcesUsageJson(w io.Writer) { d.resourcesUsageMutex.Lock() defer d.resourcesUsageMutex.Unlock() returnObj := d.resourcesUsage d.resourcesUsage = ResourcesUsage{} - return returnObj + + if err := json.NewEncoder(w).Encode(returnObj); err != nil { + log.Debug("[diagnostics] ResourcesUsageJson", "err", err) + } } func (d *DiagnosticClient) runMemoryStatsListener(rootCtx context.Context) { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index b83439bee72..4e453092afe 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -18,7 +18,9 @@ package diagnostics import ( "context" + "encoding/json" "fmt" + "io" "time" "github.com/ledgerwatch/erigon-lib/common" @@ -424,12 +426,25 @@ func (d *DiagnosticClient) SetFillDBInfo(info SnapshotFillDBStage) { } } +// Deprecated - it's not thread-safe and used only in tests. Need introduce another method or add special methods for Tests. func (d *DiagnosticClient) SyncStatistics() SyncStatistics { return d.syncStats } -func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList { - return d.snapshotFileList +func (d *DiagnosticClient) SyncStatsJson(w io.Writer) { + d.mu.Lock() + defer d.mu.Unlock() + if err := json.NewEncoder(w).Encode(d.syncStats); err != nil { + log.Debug("[diagnostics] SyncStatsJson", "err", err) + } +} + +func (d *DiagnosticClient) SnapshotFilesListJson(w io.Writer) { + d.mu.Lock() + defer d.mu.Unlock() + if err := json.NewEncoder(w).Encode(d.snapshotFileList); err != nil { + log.Debug("[diagnostics] SnapshotFilesListJson", "err", err) + } } func SnapshotDownloadInfoFromTx(tx kv.Tx) ([]byte, error) { diff --git a/erigon-lib/diagnostics/speedtest.go b/erigon-lib/diagnostics/speedtest.go index 8f2c1fad21f..522f5132fd5 100644 --- a/erigon-lib/diagnostics/speedtest.go +++ b/erigon-lib/diagnostics/speedtest.go @@ -18,8 +18,11 @@ package diagnostics import ( "context" + "encoding/json" + "io" "time" + "github.com/ledgerwatch/erigon-lib/log/v3" "github.com/showwin/speedtest-go/speedtest" "github.com/showwin/speedtest-go/speedtest/transport" ) @@ -86,6 +89,10 @@ func (d *DiagnosticClient) runSpeedTest(rootCtx context.Context) NetworkSpeedTes } } -func (d *DiagnosticClient) GetNetworkSpeed() NetworkSpeedTestResult { - return d.networkSpeed +func (d *DiagnosticClient) NetworkSpeedJson(w io.Writer) { + d.networkSpeedMutex.Lock() + defer d.networkSpeedMutex.Unlock() + if err := json.NewEncoder(w).Encode(d.networkSpeed); err != nil { + log.Debug("[diagnostics] ResourcesUsageJson", "err", err) + } } diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index f7c821ded7c..9d80f18e283 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -18,7 +18,9 @@ package diagnostics import ( "context" + "encoding/json" "fmt" + "io" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" @@ -283,7 +285,12 @@ func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) { } } +// Deprecated - used only in tests. Non-thread-safe. func (d *DiagnosticClient) GetStageState(stageId string) (StageState, error) { + return d.getStageState(stageId) +} + +func (d *DiagnosticClient) getStageState(stageId string) (StageState, error) { for _, stage := range d.syncStages { if stage.ID == stageId { return stage.State, nil @@ -311,6 +318,15 @@ func StagesListUpdater(info []SyncStage) func(tx kv.RwTx) error { return PutDataToTable(kv.DiagSyncStages, StagesListKey, info) } +// Deprecated - not thread-safe method. Used only in tests. Need introduce more thread-safe method or something special for tests. func (d *DiagnosticClient) GetSyncStages() []SyncStage { return d.syncStages } + +func (d *DiagnosticClient) SyncStagesJson(w io.Writer) { + d.mu.Lock() + defer d.mu.Unlock() + if err := json.NewEncoder(w).Encode(d.syncStages); err != nil { + log.Debug("[diagnostics] HardwareInfoJson", "err", err) + } +} diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index 25fef40facb..95a94db884f 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -17,6 +17,9 @@ package diagnostics import ( + "encoding/json" + "io" + "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" @@ -59,8 +62,12 @@ func (d *DiagnosticClient) setupSysInfoDiagnostics() { d.mu.Unlock() } -func (d *DiagnosticClient) HardwareInfo() HardwareInfo { - return d.hardwareInfo +func (d *DiagnosticClient) HardwareInfoJson(w io.Writer) { + d.mu.Lock() + defer d.mu.Unlock() + if err := json.NewEncoder(w).Encode(d.hardwareInfo); err != nil { + log.Debug("[diagnostics] HardwareInfoJson", "err", err) + } } func findNodeDisk(dirPath string) string { From 2d96dbb3a7329660df68674314d42995b375d6f1 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 13 Jul 2024 15:25:34 +0700 Subject: [PATCH 2/9] diag: thread-safety step2 - unlock mutex in defer (#11135) --- erigon-lib/diagnostics/snapshots.go | 41 +++++++++++++++-------------- erigon-lib/diagnostics/speedtest.go | 2 +- erigon-lib/diagnostics/sys_info.go | 7 +++-- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 4e453092afe..0f1d56b97f0 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -116,6 +116,26 @@ func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subSta d.syncStages[idxs.Stage].SubStages[idxs.SubStage].Stats = stats } +func (d *DiagnosticClient) saveSnapshotStageStatsToDB() { + d.mu.Lock() + defer d.mu.Unlock() + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + err := SnapshotFillDBUpdater(d.syncStats.SnapshotFillDB)(tx) + if err != nil { + return err + } + + err = StagesListUpdater(d.syncStages)(tx) + if err != nil { + return err + } + + return nil + }) + if err != nil { + log.Debug("[Diagnostics] Failed to update snapshot download info", "err", err) + } +} func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context) { go func() { @@ -384,26 +404,7 @@ func (d *DiagnosticClient) runFillDBListener(rootCtx context.Context) { TimeLeft: "unknown", Progress: fmt.Sprintf("%d%%", (info.Stage.Current*100)/info.Stage.Total), }, "Fill DB from snapshots") - - d.mu.Lock() - err := d.db.Update(d.ctx, func(tx kv.RwTx) error { - err := SnapshotFillDBUpdater(d.syncStats.SnapshotFillDB)(tx) - if err != nil { - return err - } - - err = StagesListUpdater(d.syncStages)(tx) - if err != nil { - return err - } - - return nil - }) - - if err != nil { - log.Warn("[Diagnostics] Failed to update snapshot download info", "err", err) - } - d.mu.Unlock() + d.saveSnapshotStageStatsToDB() } } }() diff --git a/erigon-lib/diagnostics/speedtest.go b/erigon-lib/diagnostics/speedtest.go index 522f5132fd5..9be466eae18 100644 --- a/erigon-lib/diagnostics/speedtest.go +++ b/erigon-lib/diagnostics/speedtest.go @@ -31,8 +31,8 @@ func (d *DiagnosticClient) setupSpeedtestDiagnostics(rootCtx context.Context) { go func() { if d.speedTest { d.networkSpeedMutex.Lock() + defer d.networkSpeedMutex.Unlock() d.networkSpeed = d.runSpeedTest(rootCtx) - d.networkSpeedMutex.Unlock() } }() } diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index 95a94db884f..08cf6145395 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -37,6 +37,9 @@ var ( ) func (d *DiagnosticClient) setupSysInfoDiagnostics() { + d.mu.Lock() + defer d.mu.Unlock() + sysInfo := GetSysInfo(d.dataDirPath) var funcs []func(tx kv.RwTx) error @@ -52,14 +55,10 @@ func (d *DiagnosticClient) setupSysInfoDiagnostics() { return nil }) - if err != nil { log.Warn("[Diagnostics] Failed to update system info", "err", err) } - - d.mu.Lock() d.hardwareInfo = sysInfo - d.mu.Unlock() } func (d *DiagnosticClient) HardwareInfoJson(w io.Writer) { From 12c2732ad92733a6a6aae9db7259062182799674 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 13 Jul 2024 15:26:49 +0700 Subject: [PATCH 3/9] diag: thread-safety step4 - remove dedicated shutdown listener goroutine (#11137) reason: - we already have 1 goroutine for saving data: ``` func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { ticker := time.NewTicker(5 * time.Minute) go func() { for { select { case <-ticker.C: d.SaveData() case <-rootCtx.Done(): ticker.Stop() return } } }() } ``` no reason to save it from one more goroutine. just save it right here - in `case <-rootCtx.Done()` section. less concurrency - better. rootContext already subscribed to sigterm --- erigon-lib/diagnostics/client.go | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index 7edf07bc5a0..0f653b589a1 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -19,11 +19,8 @@ package diagnostics import ( "context" "net/http" - "os" - "os/signal" "path/filepath" "sync" - "syscall" "time" "github.com/c2h5oh/datasize" @@ -120,33 +117,21 @@ func (d *DiagnosticClient) Setup() { d.setupResourcesUsageDiagnostics(rootCtx) d.setupSpeedtestDiagnostics(rootCtx) d.runSaveProcess(rootCtx) - d.runStopNodeListener(rootCtx) //d.logDiagMsgs() } -func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) { - go func() { - ch := make(chan os.Signal, 1) - signal.Notify(ch, os.Interrupt, syscall.SIGTERM) - select { - case <-ch: - d.SaveData() - case <-rootCtx.Done(): - } - }() -} - // Save diagnostic data by time interval to reduce save events func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { ticker := time.NewTicker(5 * time.Minute) go func() { + defer ticker.Stop() for { select { case <-ticker.C: d.SaveData() case <-rootCtx.Done(): - ticker.Stop() + d.SaveData() return } } From d52ab87e30ac3404f66eb552b3b5f1d04a77d358 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 13 Jul 2024 17:04:36 +0700 Subject: [PATCH 4/9] diag: thread-safety step3 - `PeerStatistics.Clone()` and `PeerStats.mutex` (#11136) Co-authored-by: dvovk --- erigon-lib/diagnostics/entities.go | 21 +++++++ erigon-lib/diagnostics/network.go | 82 ++++++++++++++++++++------ erigon-lib/diagnostics/network_test.go | 3 +- 3 files changed, 87 insertions(+), 19 deletions(-) diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 6033978f14f..ea83615c163 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -18,6 +18,8 @@ package diagnostics import ( "time" + + "golang.org/x/exp/maps" ) type SyncStageType string @@ -47,6 +49,25 @@ type PeerStatistics struct { TypeBytesOut map[string]uint64 } +func (p PeerStatistics) Clone() PeerStatistics { + p1 := p + p1.CapBytesIn = maps.Clone(p.CapBytesIn) + p1.CapBytesOut = maps.Clone(p.CapBytesOut) + p1.TypeBytesIn = maps.Clone(p.TypeBytesIn) + p1.TypeBytesOut = maps.Clone(p.TypeBytesOut) + return p1 +} + +func (p PeerStatistics) Equal(p2 PeerStatistics) bool { + return p.PeerType == p2.PeerType && + p.BytesIn == p2.BytesIn && + p.BytesOut == p2.BytesOut && + maps.Equal(p.CapBytesIn, p2.CapBytesIn) && + maps.Equal(p.CapBytesOut, p2.CapBytesOut) && + maps.Equal(p.TypeBytesIn, p2.TypeBytesIn) && + maps.Equal(p.TypeBytesOut, p2.TypeBytesOut) +} + type PeerDataUpdate struct { PeerID string ENR string diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index fe53667c961..e69e8e3c9ec 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -30,6 +30,7 @@ type PeerStats struct { recordsCount int lastUpdateMap map[string]time.Time limit int + mu sync.Mutex } func NewPeerStats(peerLimit int) *PeerStats { @@ -43,16 +44,23 @@ func NewPeerStats(peerLimit int) *PeerStats { func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { if value, ok := p.peersInfo.Load(peerID); ok { - p.UpdatePeer(peerID, peerInfo, value) + p.updatePeer(peerID, peerInfo, value) } else { - p.AddPeer(peerID, peerInfo) - if p.GetPeersCount() > p.limit { - p.RemovePeersWhichExceedLimit(p.limit) + p.addPeer(peerID, peerInfo) + if p.getPeersCount() > p.limit { + p.removePeersWhichExceedLimit(p.limit) } } } +// Deprecated - used in tests. non-thread-safe func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() + p.addPeer(peerID, peerInfo) +} + +func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) p.peersInfo.Store(peerID, pv) p.recordsCount++ @@ -60,6 +68,12 @@ func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { } func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { + p.mu.Lock() + defer p.mu.Unlock() + p.updatePeer(peerID, peerInfo, prevValue) +} + +func (p *PeerStats) updatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) p.peersInfo.Store(peerID, pv) @@ -104,23 +118,34 @@ func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) Peer } func (p *PeerStats) GetPeersCount() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.getPeersCount() +} + +func (p *PeerStats) getPeersCount() int { return p.recordsCount } -func (p *PeerStats) GetPeers() map[string]*PeerStatistics { - stats := make(map[string]*PeerStatistics) +func (p *PeerStats) GetPeers() map[string]PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + stats := make(map[string]PeerStatistics) p.peersInfo.Range(func(key, value interface{}) bool { - if loadedKey, ok := key.(string); ok { - if loadedValue, ok := value.(PeerStatistics); ok { - stats[loadedKey] = &loadedValue - } else { - log.Debug("Failed to cast value to PeerStatistics struct", value) - } - } else { + loadedKey, ok := key.(string) + if !ok { log.Debug("Failed to cast key to string", key) + return true } + loadedValue, ok := value.(PeerStatistics) + if !ok { + log.Debug("Failed to cast value to PeerStatistics struct", value) + return true + } + + stats[loadedKey] = loadedValue.Clone() return true }) @@ -128,9 +153,12 @@ func (p *PeerStats) GetPeers() map[string]*PeerStatistics { } func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + if value, ok := p.peersInfo.Load(peerID); ok { if peerStats, ok := value.(PeerStatistics); ok { - return peerStats + return peerStats.Clone() } } @@ -138,6 +166,9 @@ func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { } func (p *PeerStats) GetLastUpdate(peerID string) time.Time { + p.mu.Lock() + defer p.mu.Unlock() + if lastUpdate, ok := p.lastUpdateMap[peerID]; ok { return lastUpdate } @@ -146,6 +177,9 @@ func (p *PeerStats) GetLastUpdate(peerID string) time.Time { } func (p *PeerStats) RemovePeer(peerID string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peersInfo.Delete(peerID) p.recordsCount-- delete(p.lastUpdateMap, peerID) @@ -157,7 +191,13 @@ type PeerUpdTime struct { } func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { - timeArray := make([]PeerUpdTime, 0, p.GetPeersCount()) + p.mu.Lock() + defer p.mu.Unlock() + return p.getOldestUpdatedPeersWithSize(size) +} + +func (p *PeerStats) getOldestUpdatedPeersWithSize(size int) []PeerUpdTime { + timeArray := make([]PeerUpdTime, 0, p.getPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) } @@ -174,9 +214,15 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { } func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { - peersToRemove := p.GetPeersCount() - limit + p.mu.Lock() + defer p.mu.Unlock() + p.removePeersWhichExceedLimit(limit) +} + +func (p *PeerStats) removePeersWhichExceedLimit(limit int) { + peersToRemove := p.getPeersCount() - limit if peersToRemove > 0 { - peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) + peers := p.getOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { p.RemovePeer(peer.PeerID) } @@ -204,6 +250,6 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { }() } -func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { +func (d *DiagnosticClient) Peers() map[string]PeerStatistics { return d.peersStats.GetPeers() } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index ec9525281b8..cce3c25bfbb 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -159,7 +159,7 @@ func TestGetPeers(t *testing.T) { peers := peerStats.GetPeers() require.Equal(t, 3, len(peers)) - require.Equal(t, &mockInboundPeerStats, peers["test1"]) + require.True(t, peers["test1"].Equal(mockInboundPeerStats)) } func TestLastUpdated(t *testing.T) { @@ -200,6 +200,7 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { pid := "test" + strconv.Itoa(i) peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } + require.Equal(t, 100, peerStats.GetPeersCount()) peerStats.RemovePeersWhichExceedLimit(limit) From f5743dd2b5ff50d9b8a835783a6c512b1c89cc67 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 14 Jul 2024 00:16:26 +0700 Subject: [PATCH 5/9] dl: manifest-verify green CI (#11142) - skip `erigon2-v2` buckets - until https://github.com/ledgerwatch/erigon/issues/10967 --- cmd/downloader/main.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 10785436e28..3793ffd55e0 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -429,7 +429,13 @@ func manifestVerify(ctx context.Context, logger log.Logger) error { webseedsList := common.CliString2Array(webseeds) if len(webseedsList) == 0 { // fallback to default if exact list not passed if known, ok := snapcfg.KnownWebseeds[chain]; ok { - webseedsList = append(webseedsList, known...) + for _, s := range known { + //TODO: enable validation of this buckets also. skipping to make CI useful.k + if strings.Contains(s, "erigon2-v2") { + continue + } + webseedsList = append(webseedsList, s) + } } } From b8c9187a8d809aaac5d81a6e72c16888a59bc2af Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 14 Jul 2024 00:17:02 +0700 Subject: [PATCH 6/9] bor: fix race in `LockedMilestoneIDs` access (#11139) for https://github.com/ledgerwatch/erigon/issues/11129 --- polygon/bor/finality/whitelist/milestone.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/polygon/bor/finality/whitelist/milestone.go b/polygon/bor/finality/whitelist/milestone.go index 8c75f4e061c..05f135ee63c 100644 --- a/polygon/bor/finality/whitelist/milestone.go +++ b/polygon/bor/finality/whitelist/milestone.go @@ -17,9 +17,8 @@ package whitelist import ( - "github.com/ledgerwatch/erigon-lib/log/v3" - "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/log/v3" "github.com/ledgerwatch/erigon-lib/metrics" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/polygon/bor/finality/flags" @@ -172,8 +171,8 @@ func (m *milestone) UnlockSprint(endBlockNum uint64) { m.Locked = false m.purgeMilestoneIDsList() - - err := rawdb.WriteLockField(m.db, m.Locked, m.LockedMilestoneNumber, m.LockedMilestoneHash, m.LockedMilestoneIDs) + purgedMilestoneIDs := map[string]struct{}{} + err := rawdb.WriteLockField(m.db, m.Locked, m.LockedMilestoneNumber, m.LockedMilestoneHash, purgedMilestoneIDs) if err != nil { log.Error("[bor] Error in writing lock data of milestone to db", "err", err) @@ -274,8 +273,8 @@ func (m *milestone) ProcessFutureMilestone(num uint64, hash common.Hash) { m.Locked = false m.purgeMilestoneIDsList() - - err := rawdb.WriteLockField(m.db, m.Locked, m.LockedMilestoneNumber, m.LockedMilestoneHash, m.LockedMilestoneIDs) + purgedMilestoneIDs := map[string]struct{}{} + err := rawdb.WriteLockField(m.db, m.Locked, m.LockedMilestoneNumber, m.LockedMilestoneHash, purgedMilestoneIDs) if err != nil { log.Error("[bor] Error in writing lock data of milestone to db", "err", err) From 6bb5d8b4f9923860c74d3b4add6a1ead7bdc9153 Mon Sep 17 00:00:00 2001 From: Dmytro Vovk Date: Sat, 13 Jul 2024 19:53:27 +0200 Subject: [PATCH 7/9] Diagnostics: refactor bulk execution thread safety (#11143) --- erigon-lib/diagnostics/block_execution.go | 46 +++++++++++++++++++++-- erigon-lib/diagnostics/client.go | 1 + erigon-lib/diagnostics/entities.go | 14 ------- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/erigon-lib/diagnostics/block_execution.go b/erigon-lib/diagnostics/block_execution.go index eb1c3dd4949..54cf9de70d8 100644 --- a/erigon-lib/diagnostics/block_execution.go +++ b/erigon-lib/diagnostics/block_execution.go @@ -18,10 +18,45 @@ package diagnostics import ( "context" + "encoding/json" + "io" + "sync" "github.com/ledgerwatch/erigon-lib/log/v3" ) +type BlockEexcStatsData struct { + data BlockExecutionStatistics + mu sync.Mutex +} + +type BlockExecutionStatistics struct { + From uint64 `json:"from"` + To uint64 `json:"to"` + BlockNumber uint64 `json:"blockNumber"` + BlkPerSec float64 `json:"blkPerSec"` + TxPerSec float64 `json:"txPerSec"` + MgasPerSec float64 `json:"mgasPerSec"` + GasState float64 `json:"gasState"` + Batch uint64 `json:"batch"` + Alloc uint64 `json:"alloc"` + Sys uint64 `json:"sys"` + TimeElapsed float64 `json:"timeElapsed"` +} + +func (b *BlockEexcStatsData) SetData(d BlockExecutionStatistics) { + b.mu.Lock() + defer b.mu.Unlock() + b.data = d +} + +func (b *BlockEexcStatsData) Data() (d BlockExecutionStatistics) { + b.mu.Lock() + d = b.data + b.mu.Unlock() + return +} + func (d *DiagnosticClient) setupBlockExecutionDiagnostics(rootCtx context.Context) { d.runBlockExecutionListener(rootCtx) } @@ -37,10 +72,7 @@ func (d *DiagnosticClient) runBlockExecutionListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() - d.syncStats.BlockExecution = info - d.mu.Unlock() - + d.BlockExecution.SetData(info) if d.syncStats.SyncFinished { return } @@ -48,3 +80,9 @@ func (d *DiagnosticClient) runBlockExecutionListener(rootCtx context.Context) { } }() } + +func (d *DiagnosticClient) BlockExecutionInfoJson(w io.Writer) { + if err := json.NewEncoder(w).Encode(d.BlockExecution.Data()); err != nil { + log.Debug("[diagnostics] BlockExecutionInfoJson", "err", err) + } +} diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index 0f653b589a1..983a63459ee 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -41,6 +41,7 @@ type DiagnosticClient struct { syncStages []SyncStage syncStats SyncStatistics + BlockExecution BlockEexcStatsData snapshotFileList SnapshoFilesList mu sync.Mutex headerMutex sync.Mutex diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index ea83615c163..08bbeca5f20 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -91,7 +91,6 @@ type SyncStatistics struct { SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"` SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"` SnapshotFillDB SnapshotFillDBStatistics `json:"snapshotFillDB"` - BlockExecution BlockExecutionStatistics `json:"blockExecution"` SyncFinished bool `json:"syncFinished"` } @@ -167,19 +166,6 @@ type SnapshotFillDBStageUpdate struct { Stage SnapshotFillDBStage `json:"stage"` TimeElapsed float64 `json:"timeElapsed"` } -type BlockExecutionStatistics struct { - From uint64 `json:"from"` - To uint64 `json:"to"` - BlockNumber uint64 `json:"blockNumber"` - BlkPerSec float64 `json:"blkPerSec"` - TxPerSec float64 `json:"txPerSec"` - MgasPerSec float64 `json:"mgasPerSec"` - GasState float64 `json:"gasState"` - Batch uint64 `json:"batch"` - Alloc uint64 `json:"alloc"` - Sys uint64 `json:"sys"` - TimeElapsed float64 `json:"timeElapsed"` -} type SnapshoFilesList struct { Files []string `json:"files"` From 9890a94707305f496ee3af3278953811553e641e Mon Sep 17 00:00:00 2001 From: antonis19 Date: Sat, 13 Jul 2024 21:00:15 +0300 Subject: [PATCH 8/9] Call UnwindTo with tx instead of nil in sync_test.go (#11133) Co-authored-by: antonis19 --- eth/stagedsync/sync_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index 4f08624dd2d..e8353d33314 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -196,7 +196,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { flow = append(flow, stages.Senders) if !unwound { unwound = true - _ = u.UnwindTo(1500, UnwindReason{}, nil) + _ = u.UnwindTo(1500, UnwindReason{}, txc.Tx) return nil } return nil @@ -289,7 +289,7 @@ func TestUnwind(t *testing.T) { flow = append(flow, stages.Senders) if !unwound { unwound = true - _ = u.UnwindTo(500, UnwindReason{}, nil) + _ = u.UnwindTo(500, UnwindReason{}, txc.Tx) return s.Update(txc.Tx, 3000) } return nil @@ -343,7 +343,7 @@ func TestUnwind(t *testing.T) { //check that at unwind disabled stage not appear flow = flow[:0] state.unwindOrder = []*Stage{s[3], s[2], s[1], s[0]} - _ = state.UnwindTo(100, UnwindReason{}, nil) + _ = state.UnwindTo(100, UnwindReason{}, tx) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -393,7 +393,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { flow = append(flow, stages.Senders) if !unwound { unwound = true - _ = u.UnwindTo(500, UnwindReason{}, nil) + _ = u.UnwindTo(500, UnwindReason{}, txc.Tx) return s.Update(txc.Tx, 3000) } return nil @@ -581,7 +581,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { flow = append(flow, stages.Senders) if !unwound { unwound = true - _ = u.UnwindTo(500, UnwindReason{}, nil) + _ = u.UnwindTo(500, UnwindReason{}, txc.Tx) return s.Update(txc.Tx, 3000) } return nil From 4a02c66af2da0b76e6d8b3fa03f79a5d078343f0 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Sun, 14 Jul 2024 08:09:32 +0200 Subject: [PATCH 9/9] Caplin: Tweaked CGO_Flags (#11144) --- Makefile | 17 ++++++++++++++--- cl/antiquary/antiquary.go | 4 +++- cmd/capcli/cli.go | 4 ++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 50e746c6f47..30751ab1ff1 100644 --- a/Makefile +++ b/Makefile @@ -23,10 +23,21 @@ CGO_CFLAGS += -DMDBX_DISABLE_VALIDATION=0 # Can disable it on CI by separated PR #CGO_CFLAGS += -DMDBX_ENABLE_PROFGC=0 # Disabled by default, but may be useful for performance debugging #CGO_CFLAGS += -DMDBX_ENABLE_PGOP_STAT=0 # Disabled by default, but may be useful for performance debugging CGO_CFLAGS += -DMDBX_ENV_CHECKPID=0 # Erigon doesn't do fork() syscall -CGO_CFLAGS += -D__BLST_PORTABLE__ -CGO_CFLAGS += -Wno-unknown-warning-option -Wno-enum-int-mismatch -Wno-strict-prototypes -Wno-unused-but-set-variable + +# If it is arm64 or aarch64, then we need to use portable version of blst. use or with stringw "arm64" and "aarch64" to support both +ifeq ($(shell uname -m), arm64) + CGO_CFLAGS += -D__BLST_PORTABLE__ +endif +ifeq ($(shell uname -m), aarch64) + CGO_CFLAGS += -D__BLST_PORTABLE__ +endif + + +CGO_CFLAGS += -Wno-unknown-warning-option -Wno-enum-int-mismatch -Wno-strict-prototypes -Wno-unused-but-set-variable -O3 CGO_LDFLAGS := $(shell $(GO) env CGO_LDFLAGS 2> /dev/null) +CGO_LDFLAGS += -O3 -g + ifeq ($(shell uname -s), Darwin) ifeq ($(filter-out 13.%,$(shell sw_vers --productVersion)),) CGO_LDFLAGS += -mmacosx-version-min=13.3 @@ -44,7 +55,7 @@ GOPRIVATE = github.com/erigontech/silkworm-go PACKAGE = github.com/ledgerwatch/erigon -GO_FLAGS += -trimpath -tags $(BUILD_TAGS) -buildvcs=false +GO_FLAGS += -trimpath -tags $(BUILD_TAGS) -buildvcs=false GO_FLAGS += -ldflags "-X ${PACKAGE}/params.GitCommit=${GIT_COMMIT} -X ${PACKAGE}/params.GitBranch=${GIT_BRANCH} -X ${PACKAGE}/params.GitTag=${GIT_TAG}" GOBUILD = CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOPRIVATE="$(GOPRIVATE)" $(GO) build $(GO_FLAGS) diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 45a46d03174..7fc345834d0 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -133,7 +133,9 @@ func (a *Antiquary) Loop() error { return err } defer logInterval.Stop() - log.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable()) + if from != a.sn.BlocksAvailable() { + log.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable()) + } // Now write the snapshots as indicies for i := from; i < a.sn.BlocksAvailable(); i++ { diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 91460e897fd..29ba3794048 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -210,7 +210,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error { // Let's fetch the head first currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, fmt.Sprintf("%s/head", baseUri), nil) if err != nil { - return err + return fmt.Errorf("failed to retrieve head: %w, uri: %s", err, fmt.Sprintf("%s/head", baseUri)) } currentRoot, err := currentBlock.Block.HashSSZ() if err != nil { @@ -246,7 +246,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error { // Let's fetch the head first currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, fmt.Sprintf("%s/0x%s", baseUri, stringifiedRoot), (*libcommon.Hash)(¤tRoot)) if err != nil { - return false, err + return false, fmt.Errorf("failed to retrieve block: %w, uri: %s", err, fmt.Sprintf("%s/0x%s", baseUri, stringifiedRoot)) } currentRoot, err = currentBlock.Block.HashSSZ() if err != nil {