Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(audit): Periodically Remove oldTask data from aggregator #1004

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1e56af6
remove responded entries after they have been responded or failed to …
PatStiles Sep 18, 2024
2514d25
comment
PatStiles Sep 18, 2024
2c92e79
move logs outside of mutex
PatStiles Sep 23, 2024
ea592f4
move logs in failure to respond
PatStiles Sep 23, 2024
3bf3fc0
wrap deletion logic in go func
PatStiles Sep 30, 2024
5e23085
add sleep to go routine
PatStiles Sep 30, 2024
a32492c
Update aggregator/internal/pkg/aggregator.go
uri-99 Oct 3, 2024
c17a5ef
Merge branch 'staging' into 977-aggregator-memory-leak-could-lead-to-…
PatStiles Oct 3, 2024
1c51c9c
refactor: clearTaskFromMaps function
uri-99 Oct 3, 2024
2a031ce
refactor: clearTaskFromMaps is sequential and no longer a goroutine
uri-99 Oct 3, 2024
91fad8e
feat: refactor now using map to have a garbage collector
uri-99 Oct 4, 2024
da173bd
chore: comments
uri-99 Oct 4, 2024
34841b4
chore: cargo.lock
uri-99 Oct 7, 2024
56f9b70
feat: chan usage
uri-99 Oct 8, 2024
b50facc
Merge remote-tracking branch 'origin/staging' into 977-aggregator-mem…
uri-99 Oct 8, 2024
ce96d69
chore: cargo
uri-99 Oct 8, 2024
36cb8df
fix: agg retasking expired tasks
uri-99 Oct 8, 2024
56132e8
refactor: now delete old tasks
uri-99 Oct 9, 2024
9d8faeb
fix: wip
uri-99 Oct 14, 2024
6c709b7
Merge branch 'staging' into 977-aggregator-memory-leak-could-lead-to-…
uri-99 Oct 14, 2024
7391495
fix: cleaner
uri-99 Oct 14, 2024
ef8e5d6
refactor: GC has supervisor
uri-99 Oct 15, 2024
d9afb7b
feat: read GC vars from config file
uri-99 Oct 17, 2024
d41399e
chore: add comments
uri-99 Oct 17, 2024
5403e73
chore: simple PR comments
uri-99 Oct 17, 2024
c02265b
fix: nits
uri-99 Oct 17, 2024
cdeae19
Merge branch 'staging' into 977-aggregator-memory-leak-could-lead-to-…
uri-99 Oct 17, 2024
51c454d
Unneeded whitespace
Oppen Oct 18, 2024
79d38d8
Typo
Oppen Oct 18, 2024
74add84
Use `.Error()` rather than `.(string)`
Oppen Oct 18, 2024
6919a23
fix: print error
uri-99 Oct 18, 2024
45e49a0
chore: set numbers for prod
uri-99 Oct 18, 2024
7f9ddb0
chore: set better values for dev
uri-99 Oct 18, 2024
3c1898c
refcator: GarbageCollectorTasksInterval to config.yml and remove para…
uri-99 Oct 18, 2024
dfe5fd7
Merge branch 'staging' into 977-aggregator-memory-leak-could-lead-to-…
uri-99 Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Supervisor revives garbage collector
go func() {
for {
log.Println("Starting Garbage collector")
aggregator.ClearTasksFromMaps(aggregatorConfig.Aggregator.GarbageCollectorPeriod, aggregatorConfig.Aggregator.GarbageCollectorTasksAge)
log.Println("Garbage collector panicked, Supervisor restarting")
}
}()

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
Expand Down
60 changes: 58 additions & 2 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Aggregator struct {
// and can start from zero
batchesIdxByIdentifierHash map[[32]byte]uint32

// Stores the taskCreatedBlock for each batch bt batch index
// Stores the taskCreatedBlock for each batch by batch index
batchCreatedBlockByIdx map[uint32]uint64

// Stores the TaskResponse for each batch by batchIdentifierHash
Expand Down Expand Up @@ -215,6 +215,8 @@ func (agg *Aggregator) Start(ctx context.Context) error {

const MaxSentTxRetries = 5

const BLS_AGG_SERVICE_TIMEOUT = 100 * time.Second

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
Expand Down Expand Up @@ -275,6 +277,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down Expand Up @@ -361,12 +364,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
"batchIdentifierHash", batchIdentifierHash,
)
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, BLS_AGG_SERVICE_TIMEOUT)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand All @@ -377,3 +385,51 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every period and removes all tasks older than blocksOld
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
func (agg *Aggregator) ClearTasksFromMaps(period time.Duration, blocksOld uint64) {
defer func() {
JuArce marked this conversation as resolved.
Show resolved Hide resolved
err := recover() //stops panics
if err != nil {
agg.logger.Error(err.(string))
Oppen marked this conversation as resolved.
Show resolved Hide resolved
}
}()

agg.AggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", period))
lastIdxDeleted := uint32(0)

for {
time.Sleep(period)
uri-99 marked this conversation as resolved.
Show resolved Hide resolved

agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(blocksOld)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
continue // Retry in the next iteration
}
if oldTaskIdHash == nil {
agg.logger.Warn("No old tasks found")
continue // Retry in the next iteration
}

taskIdxToDelete := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", taskIdxToDelete)
// delete from lastIdxDeleted to taskIdxToDelete
for i := lastIdxDeleted+1; i <= taskIdxToDelete; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
}
lastIdxDeleted = taskIdxToDelete
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
}
}
Loading
Loading