Skip to content

Commit

Permalink
Merge 192a535 into 95c2db3
Browse files Browse the repository at this point in the history
  • Loading branch information
JuArce authored Oct 21, 2024
2 parents 95c2db3 + 192a535 commit 3116f55
Show file tree
Hide file tree
Showing 116 changed files with 9,132 additions and 4,463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ jobs:
run: go build operator/cmd/main.go
- name: Build aggregator
run: go build aggregator/cmd/main.go

2 changes: 1 addition & 1 deletion .github/workflows/build-and-test-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cargo build --all
test:
runs-on: ubuntu-latest
runs-on: aligned-runner
needs: build
steps:
- name: Checkout code
Expand Down
71 changes: 68 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.9.2
OPERATOR_VERSION=v0.10.0

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand All @@ -17,7 +17,15 @@ ifeq ($(OS),Darwin)
endif

ifeq ($(OS),Linux)
export LD_LIBRARY_PATH := $(CURDIR)/operator/risc_zero/lib
LD_LIBRARY_PATH += $(CURDIR)/operator/risc_zero/lib
endif

ifeq ($(OS),Linux)
BUILD_OPERATOR = $(MAKE) build_operator_linux
endif

ifeq ($(OS),Darwin)
BUILD_OPERATOR = $(MAKE) build_operator_macos
endif


Expand Down Expand Up @@ -85,6 +93,10 @@ anvil_upgrade_add_aggregator:
@echo "Adding Aggregator to Aligned Contracts..."
. contracts/scripts/anvil/upgrade_add_aggregator_to_service_manager.sh

anvil_upgrade_initialize_disable_verifiers:
@echo "Initializing disabled verifiers..."
. contracts/scripts/anvil/upgrade_disabled_verifiers_in_service_manager.sh

lint_contracts:
@cd contracts && npm run lint:sol

Expand Down Expand Up @@ -120,6 +132,14 @@ operator_full_registration: operator_get_eth operator_register_with_eigen_layer
operator_register_and_start: operator_full_registration operator_start

build_operator: deps
$(BUILD_OPERATOR)

build_operator_macos:
@echo "Building Operator..."
@go build -ldflags "-X main.Version=$(OPERATOR_VERSION)" -o ./operator/build/aligned-operator ./operator/cmd/main.go
@echo "Operator built into /operator/build/aligned-operator"

build_operator_linux:
@echo "Building Operator..."
@go build -ldflags "-X main.Version=$(OPERATOR_VERSION) -r $(LD_LIBRARY_PATH)" -o ./operator/build/aligned-operator ./operator/cmd/main.go
@echo "Operator built into /operator/build/aligned-operator"
Expand Down Expand Up @@ -214,6 +234,23 @@ operator_register_with_aligned_layer:
operator_deposit_and_register: operator_deposit_into_strategy operator_register_with_aligned_layer


# The verifier ID to enable or disable corresponds to the index of the verifier in the `ProvingSystemID` enum.
verifier_enable_devnet:
@echo "Enabling verifier with id: $(VERIFIER_ID)"
PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 RPC_URL=http://localhost:8545 OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/enable_verifier.sh $(VERIFIER_ID)

verifier_disable_devnet:
@echo "Disabling verifier with id: $(VERIFIER_ID)"
PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 RPC_URL=http://localhost:8545 OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/disable_verifier.sh $(VERIFIER_ID)

verifier_enable:
@echo "Enabling verifier with ID: $(VERIFIER_ID)"
@. contracts/scripts/.env && . contracts/scripts/enable_verifier.sh $(VERIFIER_ID)

verifier_disable:
@echo "Disabling verifier with ID: $(VERIFIER_ID)"
@. contracts/scripts/.env && . contracts/scripts/disable_verifier.sh $(VERIFIER_ID)

__BATCHER__:

BURST_SIZE=5
Expand Down Expand Up @@ -291,6 +328,16 @@ batcher_send_risc0_task:
--rpc_url $(RPC_URL) \
--network $(NETWORK)

batcher_send_risc0_task_no_pub_input:
@echo "Sending Risc0 fibonacci task to Batcher..."
@cd batcher/aligned/ && cargo run --release -- submit \
--proving_system Risc0 \
--proof ../../scripts/test_files/risc_zero/no_public_inputs/risc_zero_no_pub_input.proof \
--vm_program ../../scripts/test_files/risc_zero/no_public_inputs/no_pub_input_id.bin \
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
--rpc_url $(RPC_URL) \
--payment_service_addr $(BATCHER_PAYMENTS_CONTRACT_ADDRESS)

batcher_send_risc0_burst:
@echo "Sending Risc0 fibonacci task to Batcher..."
@cd batcher/aligned/ && cargo run --release -- submit \
Expand Down Expand Up @@ -390,6 +437,7 @@ generate_groth16_ineq_proof: ## Run the gnark_plonk_bn254_script
@go run scripts/test_files/gnark_groth16_bn254_infinite_script/cmd/main.go 1

__METRICS__:
# Prometheus and graphana
run_metrics: ## Run metrics using metrics-docker-compose.yaml
@echo "Running metrics..."
@docker compose -f metrics-docker-compose.yaml up
Expand Down Expand Up @@ -428,6 +476,10 @@ upgrade_add_aggregator: ## Add Aggregator to Aligned Contracts
@echo "Adding Aggregator to Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_add_aggregator_to_service_manager.sh

upgrade_initialize_disabled_verifiers:
@echo "Adding disabled verifiers to Aligned Service Manager..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_disabled_verifiers_in_service_manager.sh

deploy_verify_batch_inclusion_caller:
@echo "Deploying VerifyBatchInclusionCaller contract..."
@. examples/verify/.env && . examples/verify/scripts/deploy_verify_batch_inclusion_caller.sh
Expand Down Expand Up @@ -486,6 +538,11 @@ generate_sp1_fibonacci_proof:
@mv scripts/test_files/sp1/fibonacci_proof_generator/script/sp1_fibonacci.proof scripts/test_files/sp1/
@echo "Fibonacci proof and ELF generated in scripts/test_files/sp1 folder"

generate_risc_zero_empty_journal_proof:
@cd scripts/test_files/risc_zero/no_public_inputs && RUST_LOG=info cargo run --release
@echo "Fibonacci proof and ELF with empty journal generated in scripts/test_files/risc_zero/no_public_inputs folder"


__RISC_ZERO_FFI__: ##
build_risc_zero_macos:
@cd operator/risc_zero/lib && cargo build $(RELEASE_FLAG)
Expand Down Expand Up @@ -642,11 +699,19 @@ tracker_dump_db:
@echo "Dumped database successfully to /operator_tracker"

__TELEMETRY__:
# Collector, Jaeger and Elixir API
telemetry_full_start: open_telemetry_start telemetry_start

# Collector and Jaeger
open_telemetry_start: ## Run open telemetry services using telemetry-docker-compose.yaml
## TODO(juarce) ADD DOCKER COMPOSE
@echo "Running telemetry..."
@docker compose -f telemetry-docker-compose.yaml up -d

open_telemetry_prod_start: ## Run open telemetry services with Cassandra using telemetry-prod-docker-compose.yaml
@echo "Running telemetry for Prod..."
@docker compose -f telemetry-prod-docker-compose.yaml up -d

# Elixir API
telemetry_start: telemetry_run_db telemetry_ecto_migrate ## Run Telemetry API
@cd telemetry_api && \
./start.sh
Expand Down
9 changes: 9 additions & 0 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Supervisor revives garbage collector
go func() {
for {
log.Println("Starting Garbage collector")
aggregator.ClearTasksFromMaps()
log.Println("Garbage collector panicked, Supervisor restarting")
}
}()

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
Expand Down
60 changes: 58 additions & 2 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Aggregator struct {
// and can start from zero
batchesIdxByIdentifierHash map[[32]byte]uint32

// Stores the taskCreatedBlock for each batch bt batch index
// Stores the taskCreatedBlock for each batch by batch index
batchCreatedBlockByIdx map[uint32]uint64

// Stores the TaskResponse for each batch by batchIdentifierHash
Expand Down Expand Up @@ -215,6 +215,8 @@ func (agg *Aggregator) Start(ctx context.Context) error {

const MaxSentTxRetries = 5

const BLS_AGG_SERVICE_TIMEOUT = 100 * time.Second

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
Expand Down Expand Up @@ -275,6 +277,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down Expand Up @@ -361,12 +364,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
"batchIdentifierHash", batchIdentifierHash,
)
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, BLS_AGG_SERVICE_TIMEOUT)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand All @@ -377,3 +385,51 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
func (agg *Aggregator) ClearTasksFromMaps() {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("Recovered from panic", "err", err)
}
}()

agg.AggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod))
lastIdxDeleted := uint32(0)

for {
time.Sleep(agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod)

agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(agg.AggregatorConfig.Aggregator.GarbageCollectorTasksAge, agg.AggregatorConfig.Aggregator.GarbageCollectorTasksInterval)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
continue // Retry in the next iteration
}
if oldTaskIdHash == nil {
agg.logger.Warn("No old tasks found")
continue // Retry in the next iteration
}

taskIdxToDelete := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", taskIdxToDelete)
// delete from lastIdxDeleted to taskIdxToDelete
for i := lastIdxDeleted + 1; i <= taskIdxToDelete; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
}
lastIdxDeleted = taskIdxToDelete
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
}
}
Loading

0 comments on commit 3116f55

Please sign in to comment.