Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Dec 12, 2024
1 parent 7ff8182 commit 0a7fd76
Showing 1 changed file with 22 additions and 39 deletions.
61 changes: 22 additions & 39 deletions cmd/state/exec3/historical_trace_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,18 @@ type ExecArgs struct {
Workers int
}

func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, toTxNum uint64, in *state.QueueWithRetry, workerCount int, outputTxNum *atomic.Uint64, logger log.Logger) (g *errgroup.Group, applyWorker *HistoricalTraceWorker, clearFunc func()) {
func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, toTxNum uint64, in *state.QueueWithRetry, workerCount int, outputTxNum *atomic.Uint64, logger log.Logger) (g *errgroup.Group, clearFunc func()) {
workers := make([]*HistoricalTraceWorker, workerCount)

// can afford big limits - because historical execution doesn't need conflicts-resolution
resultChannelLimit := workerCount * 16
heapLimit := workerCount * 16
rws := state.NewResultsQueue(resultChannelLimit, heapLimit) // workerCount * 4

reducerGroup := &errgroup.Group{}
g, ctx = errgroup.WithContext(ctx)

//Reducer
reducerGroup.Go(func() (err error) {
g.Go(func() (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%s, %s", rec, dbg.Stack())
Expand Down Expand Up @@ -295,26 +295,21 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex

// we all errors in background workers (except ctx.Cancel), because applyLoop will detect this error anyway.
// and in applyLoop all errors are critical
ctx, cancel := context.WithCancel(ctx)
{
g, ctx = errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
workers[i] = NewHistoricalTraceWorker(consumer, in, rws, true, ctx, cfg, logger)
}
applyWorker = NewHistoricalTraceWorker(consumer, in, rws, false, ctx, cfg, logger)
for i := 0; i < workerCount; i++ {
i := i
g.Go(func() (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%s, %s", rec, dbg.Stack())
log.Warn("[dbg] 'worker' paniced", "i", i, "err", err)
}
}()
for i := 0; i < workerCount; i++ {
workers[i] = NewHistoricalTraceWorker(consumer, in, rws, true, ctx, cfg, logger)
}
for i := 0; i < workerCount; i++ {
i := i
g.Go(func() (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%s, %s", rec, dbg.Stack())
log.Warn("[dbg] 'worker' paniced", "i", i, "err", err)
}
}()

return workers[i].Run()
})
}
return workers[i].Run()
})
}

var clearDone bool
Expand All @@ -323,16 +318,14 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex
return
}
clearDone = true
cancel()
//rws.Close()
g.Wait()
rws.Close()
reducerGroup.Wait()
for _, w := range workers {
w.ResetTx(nil)
}
}

return reducerGroup, applyWorker, clearFunc
return g, clearFunc
}

func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueue, outputTxNumIn uint64, tx kv.TemporalTx, forceStopAtBlockEnd bool) (outputTxNum uint64, stopedAtBlockEnd bool, err error) {
Expand Down Expand Up @@ -408,10 +401,9 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx

outTxNum := &atomic.Uint64{}
outTxNum.Store(fromTxNum)
workers, applyWorker, cleanup := NewHistoricalTraceWorkers(consumer, cfg, ctx, toTxNum, in, WorkerCount, outTxNum, logger)
workers, cleanup := NewHistoricalTraceWorkers(consumer, cfg, ctx, toTxNum, in, WorkerCount, outTxNum, logger)
defer workers.Wait()
defer cleanup()
applyWorker.ResetTx(tx)

workersExited := &atomic.Bool{}
go func() {
Expand Down Expand Up @@ -494,17 +486,8 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx
if workersExited.Load() {
return workers.Wait()
}
if WorkerCount == 1 {
applyWorker.RunTxTask(txTask)
if txTask.TxIndex >= 0 && !txTask.Final {
txTask.CreateReceipt(tx)
}
if err := consumer.Reduce(txTask, tx); err != nil {
return err
}
} else {
in.Add(ctx, txTask)
}

in.Add(ctx, txTask)
inputTxNum++

//select {
Expand Down

0 comments on commit 0a7fd76

Please sign in to comment.