diff --git a/cmd/state/exec3/historical_trace_worker.go b/cmd/state/exec3/historical_trace_worker.go index 7e9ff54106d..b8dbdecd22b 100644 --- a/cmd/state/exec3/historical_trace_worker.go +++ b/cmd/state/exec3/historical_trace_worker.go @@ -240,7 +240,7 @@ type ExecArgs struct { Workers int } -func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, toTxNum uint64, in *state.QueueWithRetry, workerCount int, outputTxNum *atomic.Uint64, logger log.Logger) (g *errgroup.Group, applyWorker *HistoricalTraceWorker, clearFunc func()) { +func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, toTxNum uint64, in *state.QueueWithRetry, workerCount int, outputTxNum *atomic.Uint64, logger log.Logger) (g *errgroup.Group, clearFunc func()) { workers := make([]*HistoricalTraceWorker, workerCount) // can afford big limits - because historical execution doesn't need conflicts-resolution @@ -248,10 +248,10 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex heapLimit := workerCount * 16 rws := state.NewResultsQueue(resultChannelLimit, heapLimit) // workerCount * 4 - reducerGroup := &errgroup.Group{} + g, ctx = errgroup.WithContext(ctx) //Reducer - reducerGroup.Go(func() (err error) { + g.Go(func() (err error) { defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("%s, %s", rec, dbg.Stack()) @@ -295,26 +295,21 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex // we all errors in background workers (except ctx.Cancel), because applyLoop will detect this error anyway. // and in applyLoop all errors are critical - ctx, cancel := context.WithCancel(ctx) - { - g, ctx = errgroup.WithContext(ctx) - for i := 0; i < workerCount; i++ { - workers[i] = NewHistoricalTraceWorker(consumer, in, rws, true, ctx, cfg, logger) - } - applyWorker = NewHistoricalTraceWorker(consumer, in, rws, false, ctx, cfg, logger) - for i := 0; i < workerCount; i++ { - i := i - g.Go(func() (err error) { - defer func() { - if rec := recover(); rec != nil { - err = fmt.Errorf("%s, %s", rec, dbg.Stack()) - log.Warn("[dbg] 'worker' paniced", "i", i, "err", err) - } - }() + for i := 0; i < workerCount; i++ { + workers[i] = NewHistoricalTraceWorker(consumer, in, rws, true, ctx, cfg, logger) + } + for i := 0; i < workerCount; i++ { + i := i + g.Go(func() (err error) { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("%s, %s", rec, dbg.Stack()) + log.Warn("[dbg] 'worker' paniced", "i", i, "err", err) + } + }() - return workers[i].Run() - }) - } + return workers[i].Run() + }) } var clearDone bool @@ -323,16 +318,14 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex return } clearDone = true - cancel() + //rws.Close() g.Wait() - rws.Close() - reducerGroup.Wait() for _, w := range workers { w.ResetTx(nil) } } - return reducerGroup, applyWorker, clearFunc + return g, clearFunc } func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueue, outputTxNumIn uint64, tx kv.TemporalTx, forceStopAtBlockEnd bool) (outputTxNum uint64, stopedAtBlockEnd bool, err error) { @@ -408,10 +401,9 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx outTxNum := &atomic.Uint64{} outTxNum.Store(fromTxNum) - workers, applyWorker, cleanup := NewHistoricalTraceWorkers(consumer, cfg, ctx, toTxNum, in, WorkerCount, outTxNum, logger) + workers, cleanup := NewHistoricalTraceWorkers(consumer, cfg, ctx, toTxNum, in, WorkerCount, outTxNum, logger) defer workers.Wait() defer cleanup() - applyWorker.ResetTx(tx) workersExited := &atomic.Bool{} go func() { @@ -494,17 +486,8 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx if workersExited.Load() { return workers.Wait() } - if WorkerCount == 1 { - applyWorker.RunTxTask(txTask) - if txTask.TxIndex >= 0 && !txTask.Final { - txTask.CreateReceipt(tx) - } - if err := consumer.Reduce(txTask, tx); err != nil { - return err - } - } else { - in.Add(ctx, txTask) - } + + in.Add(ctx, txTask) inputTxNum++ //select {