Skip to content


Simplify main logic (#62)
Browse files Browse the repository at this point in the history
* Simplify main logic

* add main prefix
  • Loading branch information
pablomendezroyo authored Dec 5, 2024
1 parent 23a5b2c commit 39bdb1a
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 144 deletions.
191 changes: 73 additions & 118 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package main

import (

csfeedistributor "lido-events/internal/adapters/csFeeDistributor"
Expand All @@ -14,185 +20,134 @@ import (
proxyapi "lido-events/internal/adapters/proxyApi"


// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
time.Sleep(2 * time.Second) // Poll every 2 seconds
var logPrefix = "MAIN"

func main() {
// Set up context with cancellation and a WaitGroup for graceful shutdown
// Set up context with cancellation and WaitGroup for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup

// Set up signal channel to handle OS interrupts
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

// Load configurations
networkConfig, err := config.LoadNetworkConfig()
if err != nil {
logger.Fatal("Failed to load network configuration: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to load network configuration: %v", err)
logger.Debug("Network config: %+v", networkConfig)
logger.DebugWithPrefix(logPrefix, "Network config: %+v", networkConfig)

// Initialize adapters
storageAdapter := storage.NewStorageAdapter()
// Initialize the notifier adapter (Telegram configuration optional)
notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter)
if err != nil {
logger.Warn("Telegram notifier not initialized: %v", err)
logger.WarnWithPrefix(logPrefix, "Telegram notifier not initialized: %v", err)

// Start HTTP server
apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
go func() {
defer wg.Done()
logger.Info("Server started on :%d", networkConfig.ApiPort)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("HTTP server ListenAndServe: %v", err)

// Start Proxy API server
proxyApiAdapter := proxyapi.NewProxyAPIAdapter(networkConfig.CORS, networkConfig.LidoKeysApiUrl)
proxyServer := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ProxyApiPort, 10),
Handler: proxyApiAdapter.Router,
go func() {
defer wg.Done()
logger.Info("Proxy API server started on :%d", networkConfig.ProxyApiPort)
if err := proxyServer.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("Proxy API server ListenAndServe: %v", err)

// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
// Initialize API services
apiService := services.NewAPIServerService(apiAdapter, networkConfig.ApiPort)
proxyService := services.NewProxyAPIServerService(proxyApiAdapter, networkConfig.ProxyApiPort)

// Start listening for signals in a separate goroutine
go func() {
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
// Start API services

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
// Wait for and validate initial configuration
if err := waitForConfig(ctx, storageAdapter); err != nil {
logger.FatalWithPrefix(logPrefix, "Application shutting down due to configuration validation failure: %v", err)

// Initialize domain adapters
ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl)
exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl)

csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorImplAdapter: %v", err)
veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize Vebo adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize VeboAdapter: %v", err)
csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize CsModule adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsModuleAdapter: %v", err)
csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributor adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorAdapter: %v", err)

// Initialize services
// Initialize domain services
eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter)
distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment)
validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment)
validatorEjectorService := services.NewValidatorEjectorService(storageAdapter, notifierAdapter, exitValidatorAdapter, beaconchainAdapter)
pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, ipfsAdapter)

// DistributionLogUpdated
// Start domain services
distributionLogUpdatedExecutionComplete := make(chan struct{})
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) // once every epoch
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete)
go pendingHashesLoaderService.LoadPendingHashesCron(ctx, 3*time.Hour, &wg, distributionLogUpdatedExecutionComplete)

// ExitRequest
exitRequestExecutionComplete := make(chan struct{})
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) // once every epoch
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete)
go validatorEjectorService.ValidatorEjectorCron(ctx, 64*time.Minute, &wg, exitRequestExecutionComplete)

// Events watcher
go eventsWatcherService.WatchAllEvents(ctx, &wg)

// Handle shutdown signals
// Handle OS signals for shutdown
handleShutdown(cancel, apiService, proxyService)

// Wait for all goroutines to finish
logger.InfoWithPrefix(logPrefix, "All services stopped. Shutting down application.")

// Helper function to check if operator IDs and Telegram config are available
func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
logger.InfoWithPrefix(logPrefix, "Context canceled before configuration was ready.")
return ctx.Err()
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.InfoWithPrefix(logPrefix, "Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.InfoWithPrefix(logPrefix, "Operator IDs are set. Proceeding with initialization.")
return nil
time.Sleep(2 * time.Second) // Poll every 2 seconds

// handleShutdown manages graceful shutdown for services
func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerService, proxyService *services.ProxyAPIServerService) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to signal all services to stop

// Give the HTTP server time to finish ongoing requests
serverCtx, serverCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer serverCancel()
if err := server.Shutdown(serverCtx); err != nil {
logger.Info("HTTP server Shutdown: %v", err)
logger.InfoWithPrefix(logPrefix, "Received shutdown signal. Initiating graceful shutdown...")

// Give the Proxy API server time to finish ongoing requests
proxyServerCtx, proxyServerCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer proxyServerCancel()
if err := proxyServer.Shutdown(proxyServerCtx); err != nil {
logger.Info("Proxy API server Shutdown: %v", err)
// Shutdown API services with a timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

// Wait for all goroutines to finish
logger.Info("All services stopped. Shutting down application.")
14 changes: 11 additions & 3 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type APIHandler struct {
adapterPrefix string

// Ensure APIHandler implements the ports.API interface
var _ ports.API = (*APIHandler)(nil)

// GetRouter implements the ports.API interface
func (h *APIHandler) GetRouter() http.Handler {
return h.Router

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
Expand Down Expand Up @@ -135,7 +143,7 @@ func (h *APIHandler) DeleteOperator(w http.ResponseWriter, r *http.Request) {

// check it exists calling GetOperatorIds
// Check if operator ID exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand Down Expand Up @@ -191,7 +199,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {

// check if operator id already exists and if so return ok
// Check if operator ID already exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand All @@ -215,7 +223,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {

// Set last block processed to 0, this will trigger the events scanner to start from the beginning
// and look for events for the new operator ID
// TODO: this logic should be in the services layer
// TODO: Consider moving this logic to the services layer
if err := h.StoragePort.SaveDistributionLogLastProcessedBlock(0); err != nil {
logger.ErrorWithPrefix("API", "Failed to update DistributionLogLastProcessedBlock: %v", err)
writeErrorResponse(w, "Failed to reset DistributionLogLastProcessedBlock", http.StatusInternalServerError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func (csfa *CsFeeDistributorAdapter) WatchCsFeeDistributorEvents(ctx context.Con
select {
case event := <-distributionDataUpdatedChan:
// case err := <-sub.Err():
// // Subscription error should be handled by returning it to the service layer.
// return
case <-ctx.Done():
Expand Down
15 changes: 0 additions & 15 deletions internal/adapters/csModule/csmodule_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,49 +203,34 @@ func (csma *CsModuleAdapter) WatchCsModuleEvents(ctx context.Context, handlers p
select {
case event := <-depositedSigningKeysChangedChan:
case event := <-elRewardsStealingPenaltyReportedChan:
case event := <-elRewardsStealingPenaltySettledChan:
case event := <-elRewardsStealingPenaltyCancelledChan:
case event := <-initialSlashingSubmittedChan:
case event := <-keyRemovalChargeAppliedChan:
case event := <-nodeOperatorManagerAddressChangeProposedChan:
case event := <-nodeOperatorManagerAddressChangedChan:
case event := <-nodeOperatorRewardAddressChangeProposedChan:
case event := <-nodeOperatorRewardAddressChangedChan:
case event := <-stuckSigningKeysCountChangedChan:
case event := <-vettedSigningKeysCountDecreasedChan:
case event := <-withdrawalSubmittedChan:
case event := <-totalSigningKeysCountChangedChan:
case event := <-publicReleaseChan:

case <-ctx.Done():
Expand Down
10 changes: 10 additions & 0 deletions internal/adapters/proxyApi/proxy_api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (


Expand All @@ -17,6 +19,14 @@ type APIHandler struct {
adapterPrefix string

// Ensure APIHandler implements the ports.ProxyAPI interface
var _ ports.ProxyAPI = (*APIHandler)(nil)

// GetRouter implements the ports.ProxyAPI interface
func (h *APIHandler) GetRouter() http.Handler {
return h.Router

// NewProxyAPIAdapter initializes the APIHandler and sets up routes
func NewProxyAPIAdapter(allowedOrigins []string, proxyApiURL string) *APIHandler {
h := &APIHandler{
Expand Down

0 comments on commit 39bdb1a

Please sign in to comment.