Skip to content

Commit

Permalink
Merge branch 'main' into pablo/check-mev-relays
Browse files Browse the repository at this point in the history
  • Loading branch information
pablomendezroyo authored Dec 5, 2024
2 parents 2d99e7c + 2bd942f commit 9752232
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 167 deletions.
193 changes: 74 additions & 119 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package main

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

"lido-events/internal/adapters/api"
"lido-events/internal/adapters/beaconchain"
csfeedistributor "lido-events/internal/adapters/csFeeDistributor"
Expand All @@ -16,149 +22,77 @@ import (
relaysused "lido-events/internal/adapters/relaysUsed"
"lido-events/internal/adapters/storage"
"lido-events/internal/adapters/vebo"
"lido-events/internal/logger"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"lido-events/internal/application/services"
"lido-events/internal/config"
"net/http"
"lido-events/internal/logger"
)

// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}
}
var logPrefix = "MAIN"

func main() {
// Set up context with cancellation and a WaitGroup for graceful shutdown
// Set up context with cancellation and WaitGroup for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup

// Set up signal channel to handle OS interrupts
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

// Load configurations
networkConfig, err := config.LoadNetworkConfig()
if err != nil {
logger.Fatal("Failed to load network configuration: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to load network configuration: %v", err)
}
logger.Debug("Network config: %+v", networkConfig)
logger.DebugWithPrefix(logPrefix, "Network config: %+v", networkConfig)

// Initialize adapters
storageAdapter := storage.NewStorageAdapter()
// Initialize the notifier adapter (Telegram configuration optional)
notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter)
if err != nil {
logger.Warn("Telegram notifier not initialized: %v", err)
logger.WarnWithPrefix(logPrefix, "Telegram notifier not initialized: %v", err)
}
relaysUsedAdapter := relaysused.NewRelaysUsedAdapter(networkConfig.DappmanagerUrl, networkConfig.MevBoostDnpName)
relaysAllowedAdapter, err := relaysallowed.NewRelaysAllowedAdapter(networkConfig.WsURL, networkConfig.MEVBoostRelaysAllowListAddres, networkConfig.DappmanagerUrl, networkConfig.MevBoostDnpName)
if err != nil {
logger.Fatal("Failed to initialize relaysAllowedAdapter: %v", err)
}

// Start HTTP server
apiAdapter := api.NewAPIAdapter(ctx, storageAdapter, notifierAdapter, relaysUsedAdapter, relaysAllowedAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
}
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Server started on :%d", networkConfig.ApiPort)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("HTTP server ListenAndServe: %v", err)
}
}()

// Start Proxy API server
apiAdapter := api.NewAPIAdapter(storageAdapter, relaysUsedAdapter, relaysAllowedAdapter, networkConfig.CORS)
proxyApiAdapter := proxyapi.NewProxyAPIAdapter(networkConfig.CORS, networkConfig.LidoKeysApiUrl)
proxyServer := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ProxyApiPort, 10),
Handler: proxyApiAdapter.Router,
}
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Proxy API server started on :%d", networkConfig.ProxyApiPort)
if err := proxyServer.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("Proxy API server ListenAndServe: %v", err)
}
}()

// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
}()
// Initialize API services
apiService := services.NewAPIServerService(apiAdapter, networkConfig.ApiPort)
proxyService := services.NewProxyAPIServerService(proxyApiAdapter, networkConfig.ProxyApiPort)

// Start listening for signals in a separate goroutine
go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
}()
// Start API services
apiService.Start(&wg)
proxyService.Start(&wg)

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
return
}
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
return
// Wait for and validate initial configuration
if err := waitForConfig(ctx, storageAdapter); err != nil {
logger.FatalWithPrefix(logPrefix, "Application shutting down due to configuration validation failure: %v", err)
}

// Initialize domain adapters
ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl)
exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl)

csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorImplAdapter: %v", err)
}
veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize Vebo adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize VeboAdapter: %v", err)
}
csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize CsModule adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsModuleAdapter: %v", err)
}
csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributor adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorAdapter: %v", err)
}

// Initialize services
// Initialize domain services
eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter)
distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment)
validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment)
Expand All @@ -169,41 +103,62 @@ func main() {
// Relays
go relaysCheckerService.StartRelayMonitoringCron(ctx, 5*time.Minute, &wg)

// DistributionLogUpdated
// Start domain services
distributionLogUpdatedExecutionComplete := make(chan struct{})
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) // once every epoch
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete)
go pendingHashesLoaderService.LoadPendingHashesCron(ctx, 3*time.Hour, &wg, distributionLogUpdatedExecutionComplete)

// ExitRequest
exitRequestExecutionComplete := make(chan struct{})
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) // once every epoch
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete)
go validatorEjectorService.ValidatorEjectorCron(ctx, 64*time.Minute, &wg, exitRequestExecutionComplete)

// Events watcher
go eventsWatcherService.WatchAllEvents(ctx, &wg)

// Handle shutdown signals
// Handle OS signals for shutdown
handleShutdown(cancel, apiService, proxyService)

// Wait for all goroutines to finish
wg.Wait()
logger.InfoWithPrefix(logPrefix, "All services stopped. Shutting down application.")
}

// Helper function to check if operator IDs and Telegram config are available
func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
logger.InfoWithPrefix(logPrefix, "Context canceled before configuration was ready.")
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.InfoWithPrefix(logPrefix, "Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.InfoWithPrefix(logPrefix, "Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}
}

// handleShutdown manages graceful shutdown for services
func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerService, proxyService *services.ProxyAPIServerService) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to signal all services to stop

// Give the HTTP server time to finish ongoing requests
serverCtx, serverCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer serverCancel()
if err := server.Shutdown(serverCtx); err != nil {
logger.Info("HTTP server Shutdown: %v", err)
}
logger.InfoWithPrefix(logPrefix, "Received shutdown signal. Initiating graceful shutdown...")
cancel()

// Give the Proxy API server time to finish ongoing requests
proxyServerCtx, proxyServerCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer proxyServerCancel()
if err := proxyServer.Shutdown(proxyServerCtx); err != nil {
logger.Info("Proxy API server Shutdown: %v", err)
}
}()
// Shutdown API services with a timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

// Wait for all goroutines to finish
wg.Wait()
logger.Info("All services stopped. Shutting down application.")
apiService.Shutdown(shutdownCtx)
proxyService.Shutdown(shutdownCtx)
}()
}
14 changes: 11 additions & 3 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type APIHandler struct {
adapterPrefix string
}

// Ensure APIHandler implements the ports.API interface
var _ ports.API = (*APIHandler)(nil)

// GetRouter implements the ports.API interface
func (h *APIHandler) GetRouter() http.Handler {
return h.Router
}

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(ctx context.Context, storagePort ports.StoragePort, notifierPort ports.NotifierPort, relaysUsedPort ports.RelaysUsedPort, relaysAllowedPort ports.RelaysAllowedPort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
Expand Down Expand Up @@ -188,7 +196,7 @@ func (h *APIHandler) DeleteOperator(w http.ResponseWriter, r *http.Request) {
return
}

// check it exists calling GetOperatorIds
// Check if operator ID exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand Down Expand Up @@ -244,7 +252,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {
return
}

// check if operator id already exists and if so return ok
// Check if operator ID already exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand All @@ -268,7 +276,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {

// Set last block processed to 0, this will trigger the events scanner to start from the beginning
// and look for events for the new operator ID
// TODO: this logic should be in the services layer
// TODO: Consider moving this logic to the services layer
if err := h.StoragePort.SaveDistributionLogLastProcessedBlock(0); err != nil {
logger.ErrorWithPrefix("API", "Failed to update DistributionLogLastProcessedBlock: %v", err)
writeErrorResponse(w, "Failed to reset DistributionLogLastProcessedBlock", http.StatusInternalServerError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func (csfa *CsFeeDistributorAdapter) WatchCsFeeDistributorEvents(ctx context.Con
select {
case event := <-distributionDataUpdatedChan:
handleDistributionDataUpdated(event)
return
// case err := <-sub.Err():
// // Subscription error should be handled by returning it to the service layer.
// return
case <-ctx.Done():
return
}
Expand Down
15 changes: 0 additions & 15 deletions internal/adapters/csModule/csmodule_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,49 +203,34 @@ func (csma *CsModuleAdapter) WatchCsModuleEvents(ctx context.Context, handlers p
select {
case event := <-depositedSigningKeysChangedChan:
handlers.HandleDepositedSigningKeysCountChanged(event)
return
case event := <-elRewardsStealingPenaltyReportedChan:
handlers.HandleElRewardsStealingPenaltyReported(event)
return
case event := <-elRewardsStealingPenaltySettledChan:
handlers.HandleElRewardsStealingPenaltySettled(event)
return
case event := <-elRewardsStealingPenaltyCancelledChan:
handlers.HandleElRewardsStealingPenaltyCancelled(event)
return
case event := <-initialSlashingSubmittedChan:
handlers.HandleInitialSlashingSubmitted(event)
return
case event := <-keyRemovalChargeAppliedChan:
handlers.HandleKeyRemovalChargeApplied(event)
return
case event := <-nodeOperatorManagerAddressChangeProposedChan:
handlers.HandleNodeOperatorManagerAddressChangeProposed(event)
return
case event := <-nodeOperatorManagerAddressChangedChan:
handlers.HandleNodeOperatorManagerAddressChanged(event)
return
case event := <-nodeOperatorRewardAddressChangeProposedChan:
handlers.HandleNodeOperatorRewardAddressChangeProposed(event)
return
case event := <-nodeOperatorRewardAddressChangedChan:
handlers.HandleNodeOperatorRewardAddressChanged(event)
return
case event := <-stuckSigningKeysCountChangedChan:
handlers.HandleStuckSigningKeysCountChanged(event)
return
case event := <-vettedSigningKeysCountDecreasedChan:
handlers.HandleVettedSigningKeysCountDecreased(event)
return
case event := <-withdrawalSubmittedChan:
handlers.HandleWithdrawalSubmitted(event)
return
case event := <-totalSigningKeysCountChangedChan:
handlers.HandleTotalSigningKeysCountChanged(event)
return
case event := <-publicReleaseChan:
handlers.HandlePublicRelease(event)
return

case <-ctx.Done():
return
Expand Down
Loading

0 comments on commit 9752232

Please sign in to comment.