Skip to content

Commit

Permalink
Merge branch 'main' into dl_speedtest_race
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Jul 14, 2024
2 parents f0228b1 + 4a02c66 commit 9fd42ab
Show file tree
Hide file tree
Showing 21 changed files with 273 additions and 120 deletions.
17 changes: 14 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,21 @@ CGO_CFLAGS += -DMDBX_DISABLE_VALIDATION=0 # Can disable it on CI by separated PR
#CGO_CFLAGS += -DMDBX_ENABLE_PROFGC=0 # Disabled by default, but may be useful for performance debugging
#CGO_CFLAGS += -DMDBX_ENABLE_PGOP_STAT=0 # Disabled by default, but may be useful for performance debugging
CGO_CFLAGS += -DMDBX_ENV_CHECKPID=0 # Erigon doesn't do fork() syscall
CGO_CFLAGS += -D__BLST_PORTABLE__
CGO_CFLAGS += -Wno-unknown-warning-option -Wno-enum-int-mismatch -Wno-strict-prototypes -Wno-unused-but-set-variable

# If it is arm64 or aarch64, then we need to use portable version of blst. use or with stringw "arm64" and "aarch64" to support both
ifeq ($(shell uname -m), arm64)
CGO_CFLAGS += -D__BLST_PORTABLE__
endif
ifeq ($(shell uname -m), aarch64)
CGO_CFLAGS += -D__BLST_PORTABLE__
endif


CGO_CFLAGS += -Wno-unknown-warning-option -Wno-enum-int-mismatch -Wno-strict-prototypes -Wno-unused-but-set-variable -O3

CGO_LDFLAGS := $(shell $(GO) env CGO_LDFLAGS 2> /dev/null)
CGO_LDFLAGS += -O3 -g

ifeq ($(shell uname -s), Darwin)
ifeq ($(filter-out 13.%,$(shell sw_vers --productVersion)),)
CGO_LDFLAGS += -mmacosx-version-min=13.3
Expand All @@ -44,7 +55,7 @@ GOPRIVATE = github.com/erigontech/silkworm-go

PACKAGE = github.com/ledgerwatch/erigon

GO_FLAGS += -trimpath -tags $(BUILD_TAGS) -buildvcs=false
GO_FLAGS += -trimpath -tags $(BUILD_TAGS) -buildvcs=false
GO_FLAGS += -ldflags "-X ${PACKAGE}/params.GitCommit=${GIT_COMMIT} -X ${PACKAGE}/params.GitBranch=${GIT_BRANCH} -X ${PACKAGE}/params.GitTag=${GIT_TAG}"

GOBUILD = CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOPRIVATE="$(GOPRIVATE)" $(GO) build $(GO_FLAGS)
Expand Down
4 changes: 3 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ func (a *Antiquary) Loop() error {
return err
}
defer logInterval.Stop()
log.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable())
if from != a.sn.BlocksAvailable() {
log.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable())
}

// Now write the snapshots as indicies
for i := from; i < a.sn.BlocksAvailable(); i++ {
Expand Down
4 changes: 2 additions & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error {
// Let's fetch the head first
currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, fmt.Sprintf("%s/head", baseUri), nil)
if err != nil {
return err
return fmt.Errorf("failed to retrieve head: %w, uri: %s", err, fmt.Sprintf("%s/head", baseUri))
}
currentRoot, err := currentBlock.Block.HashSSZ()
if err != nil {
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error {
// Let's fetch the head first
currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, fmt.Sprintf("%s/0x%s", baseUri, stringifiedRoot), (*libcommon.Hash)(&currentRoot))
if err != nil {
return false, err
return false, fmt.Errorf("failed to retrieve block: %w, uri: %s", err, fmt.Sprintf("%s/0x%s", baseUri, stringifiedRoot))
}
currentRoot, err = currentBlock.Block.HashSSZ()
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@ func manifestVerify(ctx context.Context, logger log.Logger) error {
webseedsList := common.CliString2Array(webseeds)
if len(webseedsList) == 0 { // fallback to default if exact list not passed
if known, ok := snapcfg.KnownWebseeds[chain]; ok {
webseedsList = append(webseedsList, known...)
for _, s := range known {
//TODO: enable validation of this buckets also. skipping to make CI useful.k
if strings.Contains(s, "erigon2-v2") {
continue
}
webseedsList = append(webseedsList, s)
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions diagnostics/bodies_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand All @@ -37,5 +36,5 @@ func SetupBodiesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient
}

func writeBodies(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetBodiesInfo())
diag.BodiesInfoJson(w)
}
3 changes: 1 addition & 2 deletions diagnostics/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand All @@ -36,5 +35,5 @@ func SetupHeadersAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClien
}

func writeHeaders(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetHeaders())
diag.HeadersJson(w)
}
13 changes: 6 additions & 7 deletions diagnostics/snapshot_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand Down Expand Up @@ -66,25 +65,25 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient
}

func writeNetworkSpeed(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetNetworkSpeed())
diag.NetworkSpeedJson(w)
}

func writeResourcesUsage(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetResourcesUsage())
diag.ResourcesUsageJson(w)
}

func writeStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SyncStatistics())
diag.SyncStatsJson(w)
}

func writeFilesList(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SnapshotFilesList())
diag.SnapshotFilesListJson(w)
}

func writeHardwareInfo(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.HardwareInfo())
diag.HardwareInfoJson(w)
}

func writeSyncStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetSyncStages())
diag.SyncStagesJson(w)
}
46 changes: 42 additions & 4 deletions erigon-lib/diagnostics/block_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,45 @@ package diagnostics

import (
"context"
"encoding/json"
"io"
"sync"

"github.com/ledgerwatch/erigon-lib/log/v3"
)

type BlockEexcStatsData struct {
data BlockExecutionStatistics
mu sync.Mutex
}

type BlockExecutionStatistics struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
BlockNumber uint64 `json:"blockNumber"`
BlkPerSec float64 `json:"blkPerSec"`
TxPerSec float64 `json:"txPerSec"`
MgasPerSec float64 `json:"mgasPerSec"`
GasState float64 `json:"gasState"`
Batch uint64 `json:"batch"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
TimeElapsed float64 `json:"timeElapsed"`
}

func (b *BlockEexcStatsData) SetData(d BlockExecutionStatistics) {
b.mu.Lock()
defer b.mu.Unlock()
b.data = d
}

func (b *BlockEexcStatsData) Data() (d BlockExecutionStatistics) {
b.mu.Lock()
d = b.data
b.mu.Unlock()
return
}

func (d *DiagnosticClient) setupBlockExecutionDiagnostics(rootCtx context.Context) {
d.runBlockExecutionListener(rootCtx)
}
Expand All @@ -37,14 +72,17 @@ func (d *DiagnosticClient) runBlockExecutionListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.syncStats.BlockExecution = info
d.mu.Unlock()

d.BlockExecution.SetData(info)
if d.syncStats.SyncFinished {
return
}
}
}
}()
}

func (d *DiagnosticClient) BlockExecutionInfoJson(w io.Writer) {
if err := json.NewEncoder(w).Encode(d.BlockExecution.Data()); err != nil {
log.Debug("[diagnostics] BlockExecutionInfoJson", "err", err)
}
}
8 changes: 6 additions & 2 deletions erigon-lib/diagnostics/bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"context"
"encoding/json"
"io"

"github.com/ledgerwatch/erigon-lib/log/v3"
)
Expand Down Expand Up @@ -109,8 +111,10 @@ func (d *DiagnosticClient) runBodiesProcessingListener(rootCtx context.Context)
}()
}

func (d *DiagnosticClient) GetBodiesInfo() BodiesInfo {
func (d *DiagnosticClient) BodiesInfoJson(w io.Writer) {
d.bodiesMutex.Lock()
defer d.bodiesMutex.Unlock()
return d.bodies
if err := json.NewEncoder(w).Encode(d.bodies); err != nil {
log.Debug("[diagnostics] BodiesInfoJson", "err", err)
}
}
20 changes: 3 additions & 17 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package diagnostics
import (
"context"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/c2h5oh/datasize"
Expand All @@ -44,6 +41,7 @@ type DiagnosticClient struct {

syncStages []SyncStage
syncStats SyncStatistics
BlockExecution BlockEexcStatsData
snapshotFileList SnapshoFilesList
mu sync.Mutex
headerMutex sync.Mutex
Expand Down Expand Up @@ -120,33 +118,21 @@ func (d *DiagnosticClient) Setup() {
d.setupResourcesUsageDiagnostics(rootCtx)
d.setupSpeedtestDiagnostics(rootCtx)
d.runSaveProcess(rootCtx)
d.runStopNodeListener(rootCtx)

//d.logDiagMsgs()
}

func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) {
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
select {
case <-ch:
d.SaveData()
case <-rootCtx.Done():
}
}()
}

// Save diagnostic data by time interval to reduce save events
func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.SaveData()
case <-rootCtx.Done():
ticker.Stop()
d.SaveData()
return
}
}
Expand Down
35 changes: 21 additions & 14 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"time"

"golang.org/x/exp/maps"
)

type SyncStageType string
Expand Down Expand Up @@ -47,6 +49,25 @@ type PeerStatistics struct {
TypeBytesOut map[string]uint64
}

func (p PeerStatistics) Clone() PeerStatistics {
p1 := p
p1.CapBytesIn = maps.Clone(p.CapBytesIn)
p1.CapBytesOut = maps.Clone(p.CapBytesOut)
p1.TypeBytesIn = maps.Clone(p.TypeBytesIn)
p1.TypeBytesOut = maps.Clone(p.TypeBytesOut)
return p1
}

func (p PeerStatistics) Equal(p2 PeerStatistics) bool {
return p.PeerType == p2.PeerType &&
p.BytesIn == p2.BytesIn &&
p.BytesOut == p2.BytesOut &&
maps.Equal(p.CapBytesIn, p2.CapBytesIn) &&
maps.Equal(p.CapBytesOut, p2.CapBytesOut) &&
maps.Equal(p.TypeBytesIn, p2.TypeBytesIn) &&
maps.Equal(p.TypeBytesOut, p2.TypeBytesOut)
}

type PeerDataUpdate struct {
PeerID string
ENR string
Expand All @@ -70,7 +91,6 @@ type SyncStatistics struct {
SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"`
SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"`
SnapshotFillDB SnapshotFillDBStatistics `json:"snapshotFillDB"`
BlockExecution BlockExecutionStatistics `json:"blockExecution"`
SyncFinished bool `json:"syncFinished"`
}

Expand Down Expand Up @@ -146,19 +166,6 @@ type SnapshotFillDBStageUpdate struct {
Stage SnapshotFillDBStage `json:"stage"`
TimeElapsed float64 `json:"timeElapsed"`
}
type BlockExecutionStatistics struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
BlockNumber uint64 `json:"blockNumber"`
BlkPerSec float64 `json:"blkPerSec"`
TxPerSec float64 `json:"txPerSec"`
MgasPerSec float64 `json:"mgasPerSec"`
GasState float64 `json:"gasState"`
Batch uint64 `json:"batch"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
TimeElapsed float64 `json:"timeElapsed"`
}

type SnapshoFilesList struct {
Files []string `json:"files"`
Expand Down
10 changes: 8 additions & 2 deletions erigon-lib/diagnostics/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"context"
"encoding/json"
"io"

"github.com/ledgerwatch/erigon-lib/log/v3"
)
Expand All @@ -29,8 +31,12 @@ func (d *DiagnosticClient) setupHeadersDiagnostics(rootCtx context.Context) {
d.runProcessedListener(rootCtx)
}

func (d *DiagnosticClient) GetHeaders() Headers {
return d.headers
func (d *DiagnosticClient) HeadersJson(w io.Writer) {
d.headerMutex.Lock()
defer d.headerMutex.Unlock()
if err := json.NewEncoder(w).Encode(d.headers); err != nil {
log.Debug("[diagnostics] HeadersJson", "err", err)
}
}

func (d *DiagnosticClient) runHeadersWaitingListener(rootCtx context.Context) {
Expand Down
Loading

0 comments on commit 9fd42ab

Please sign in to comment.