diff --git a/cmd/main.go b/cmd/main.go index f5f9b4e..c86f30a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,12 @@ package main import ( "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + "lido-events/internal/adapters/api" "lido-events/internal/adapters/beaconchain" csfeedistributor "lido-events/internal/adapters/csFeeDistributor" @@ -16,63 +22,31 @@ import ( relaysused "lido-events/internal/adapters/relaysUsed" "lido-events/internal/adapters/storage" "lido-events/internal/adapters/vebo" - "lido-events/internal/logger" - "os" - "os/signal" - "strconv" - "sync" - "syscall" - "time" - "lido-events/internal/application/services" "lido-events/internal/config" - "net/http" + "lido-events/internal/logger" ) -// Helper function to check if operator IDs and Telegram config are available -func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error { - for { - select { - case <-ctx.Done(): // Exit if the context is canceled - return ctx.Err() - default: - // Check for operator IDs - operatorIds, err := storageAdapter.GetOperatorIds() - if err != nil || len(operatorIds) == 0 { - logger.Info("Waiting for operator IDs to be set...") - } else { - // Operator IDs are set - logger.Info("Operator IDs are set. Proceeding with initialization.") - return nil - } - time.Sleep(2 * time.Second) // Poll every 2 seconds - } - } -} +var logPrefix = "MAIN" func main() { - // Set up context with cancellation and a WaitGroup for graceful shutdown + // Set up context with cancellation and WaitGroup for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup - // Set up signal channel to handle OS interrupts - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - // Load configurations networkConfig, err := config.LoadNetworkConfig() if err != nil { - logger.Fatal("Failed to load network configuration: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to load network configuration: %v", err) } - logger.Debug("Network config: %+v", networkConfig) + logger.DebugWithPrefix(logPrefix, "Network config: %+v", networkConfig) // Initialize adapters storageAdapter := storage.NewStorageAdapter() - // Initialize the notifier adapter (Telegram configuration optional) notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter) if err != nil { - logger.Warn("Telegram notifier not initialized: %v", err) + logger.WarnWithPrefix(logPrefix, "Telegram notifier not initialized: %v", err) } relaysUsedAdapter := relaysused.NewRelaysUsedAdapter(networkConfig.DappmanagerUrl, networkConfig.MevBoostDnpName) relaysAllowedAdapter, err := relaysallowed.NewRelaysAllowedAdapter(networkConfig.WsURL, networkConfig.MEVBoostRelaysAllowListAddres, networkConfig.DappmanagerUrl, networkConfig.MevBoostDnpName) @@ -80,85 +54,45 @@ func main() { logger.Fatal("Failed to initialize relaysAllowedAdapter: %v", err) } - // Start HTTP server - apiAdapter := api.NewAPIAdapter(ctx, storageAdapter, notifierAdapter, relaysUsedAdapter, relaysAllowedAdapter, networkConfig.CORS) - server := &http.Server{ - Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10), - Handler: apiAdapter.Router, - } - wg.Add(1) - go func() { - defer wg.Done() - logger.Info("Server started on :%d", networkConfig.ApiPort) - if err := server.ListenAndServe(); err != http.ErrServerClosed { - logger.Fatal("HTTP server ListenAndServe: %v", err) - } - }() - - // Start Proxy API server + apiAdapter := api.NewAPIAdapter(storageAdapter, relaysUsedAdapter, relaysAllowedAdapter, networkConfig.CORS) proxyApiAdapter := proxyapi.NewProxyAPIAdapter(networkConfig.CORS, networkConfig.LidoKeysApiUrl) - proxyServer := &http.Server{ - Addr: ":" + strconv.FormatUint(networkConfig.ProxyApiPort, 10), - Handler: proxyApiAdapter.Router, - } - wg.Add(1) - go func() { - defer wg.Done() - logger.Info("Proxy API server started on :%d", networkConfig.ProxyApiPort) - if err := proxyServer.ListenAndServe(); err != http.ErrServerClosed { - logger.Fatal("Proxy API server ListenAndServe: %v", err) - } - }() - // Wait for initial configuration in a separate goroutine - configReady := make(chan error, 1) - go func() { - configReady <- waitForInitialConfig(ctx, storageAdapter) - }() + // Initialize API services + apiService := services.NewAPIServerService(apiAdapter, networkConfig.ApiPort) + proxyService := services.NewProxyAPIServerService(proxyApiAdapter, networkConfig.ProxyApiPort) - // Start listening for signals in a separate goroutine - go func() { - <-signalChan - logger.Info("Received shutdown signal. Initiating graceful shutdown...") - cancel() // Cancel context to stop all services - }() + // Start API services + apiService.Start(&wg) + proxyService.Start(&wg) - // Wait for either the config to be ready or the context to be canceled - select { - case err := <-configReady: - if err != nil { - logger.Warn("Shutting down due to: %v", err) - return - } - logger.Info("Configuration is ready. Proceeding with initialization.") - case <-ctx.Done(): - logger.Info("Context canceled before configuration was ready.") - return + // Wait for and validate initial configuration + if err := waitForConfig(ctx, storageAdapter); err != nil { + logger.FatalWithPrefix(logPrefix, "Application shutting down due to configuration validation failure: %v", err) } + // Initialize domain adapters ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl) beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL) executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl) exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl) - csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) if err != nil { - logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorImplAdapter: %v", err) } veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter) if err != nil { - logger.Fatal("Failed to initialize Vebo adapter: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to initialize VeboAdapter: %v", err) } csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter) if err != nil { - logger.Fatal("Failed to initialize CsModule adapter: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsModuleAdapter: %v", err) } csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) if err != nil { - logger.Fatal("Failed to initialize CsFeeDistributor adapter: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorAdapter: %v", err) } - // Initialize services + // Initialize domain services eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter) distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment) validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment) @@ -169,41 +103,62 @@ func main() { // Relays go relaysCheckerService.StartRelayMonitoringCron(ctx, 5*time.Minute, &wg) - // DistributionLogUpdated + // Start domain services distributionLogUpdatedExecutionComplete := make(chan struct{}) - go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) // once every epoch + go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) go pendingHashesLoaderService.LoadPendingHashesCron(ctx, 3*time.Hour, &wg, distributionLogUpdatedExecutionComplete) - // ExitRequest exitRequestExecutionComplete := make(chan struct{}) - go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) // once every epoch + go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) go validatorEjectorService.ValidatorEjectorCron(ctx, 64*time.Minute, &wg, exitRequestExecutionComplete) - // Events watcher go eventsWatcherService.WatchAllEvents(ctx, &wg) - // Handle shutdown signals + // Handle OS signals for shutdown + handleShutdown(cancel, apiService, proxyService) + + // Wait for all goroutines to finish + wg.Wait() + logger.InfoWithPrefix(logPrefix, "All services stopped. Shutting down application.") +} + +// Helper function to check if operator IDs and Telegram config are available +func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error { + for { + select { + case <-ctx.Done(): // Exit if the context is canceled + logger.InfoWithPrefix(logPrefix, "Context canceled before configuration was ready.") + return ctx.Err() + default: + // Check for operator IDs + operatorIds, err := storageAdapter.GetOperatorIds() + if err != nil || len(operatorIds) == 0 { + logger.InfoWithPrefix(logPrefix, "Waiting for operator IDs to be set...") + } else { + // Operator IDs are set + logger.InfoWithPrefix(logPrefix, "Operator IDs are set. Proceeding with initialization.") + return nil + } + time.Sleep(2 * time.Second) // Poll every 2 seconds + } + } +} + +// handleShutdown manages graceful shutdown for services +func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerService, proxyService *services.ProxyAPIServerService) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + go func() { <-signalChan - logger.Info("Received shutdown signal. Initiating graceful shutdown...") - cancel() // Cancel context to signal all services to stop - - // Give the HTTP server time to finish ongoing requests - serverCtx, serverCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer serverCancel() - if err := server.Shutdown(serverCtx); err != nil { - logger.Info("HTTP server Shutdown: %v", err) - } + logger.InfoWithPrefix(logPrefix, "Received shutdown signal. Initiating graceful shutdown...") + cancel() - // Give the Proxy API server time to finish ongoing requests - proxyServerCtx, proxyServerCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer proxyServerCancel() - if err := proxyServer.Shutdown(proxyServerCtx); err != nil { - logger.Info("Proxy API server Shutdown: %v", err) - } - }() + // Shutdown API services with a timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() - // Wait for all goroutines to finish - wg.Wait() - logger.Info("All services stopped. Shutting down application.") + apiService.Shutdown(shutdownCtx) + proxyService.Shutdown(shutdownCtx) + }() } diff --git a/internal/adapters/api/api_adapter.go b/internal/adapters/api/api_adapter.go index 14881a3..2a0adc4 100644 --- a/internal/adapters/api/api_adapter.go +++ b/internal/adapters/api/api_adapter.go @@ -25,6 +25,14 @@ type APIHandler struct { adapterPrefix string } +// Ensure APIHandler implements the ports.API interface +var _ ports.API = (*APIHandler)(nil) + +// GetRouter implements the ports.API interface +func (h *APIHandler) GetRouter() http.Handler { + return h.Router +} + // NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled func NewAPIAdapter(ctx context.Context, storagePort ports.StoragePort, notifierPort ports.NotifierPort, relaysUsedPort ports.RelaysUsedPort, relaysAllowedPort ports.RelaysAllowedPort, allowedOrigins []string) *APIHandler { h := &APIHandler{ @@ -188,7 +196,7 @@ func (h *APIHandler) DeleteOperator(w http.ResponseWriter, r *http.Request) { return } - // check it exists calling GetOperatorIds + // Check if operator ID exists operatorIds, err := h.StoragePort.GetOperatorIds() if err != nil { logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err) @@ -244,7 +252,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) { return } - // check if operator id already exists and if so return ok + // Check if operator ID already exists operatorIds, err := h.StoragePort.GetOperatorIds() if err != nil { logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err) @@ -268,7 +276,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) { // Set last block processed to 0, this will trigger the events scanner to start from the beginning // and look for events for the new operator ID - // TODO: this logic should be in the services layer + // TODO: Consider moving this logic to the services layer if err := h.StoragePort.SaveDistributionLogLastProcessedBlock(0); err != nil { logger.ErrorWithPrefix("API", "Failed to update DistributionLogLastProcessedBlock: %v", err) writeErrorResponse(w, "Failed to reset DistributionLogLastProcessedBlock", http.StatusInternalServerError) diff --git a/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go b/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go index 8795675..a8aa032 100644 --- a/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go +++ b/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go @@ -50,10 +50,6 @@ func (csfa *CsFeeDistributorAdapter) WatchCsFeeDistributorEvents(ctx context.Con select { case event := <-distributionDataUpdatedChan: handleDistributionDataUpdated(event) - return - // case err := <-sub.Err(): - // // Subscription error should be handled by returning it to the service layer. - // return case <-ctx.Done(): return } diff --git a/internal/adapters/csModule/csmodule_adapter.go b/internal/adapters/csModule/csmodule_adapter.go index 57a965f..471eb61 100644 --- a/internal/adapters/csModule/csmodule_adapter.go +++ b/internal/adapters/csModule/csmodule_adapter.go @@ -203,49 +203,34 @@ func (csma *CsModuleAdapter) WatchCsModuleEvents(ctx context.Context, handlers p select { case event := <-depositedSigningKeysChangedChan: handlers.HandleDepositedSigningKeysCountChanged(event) - return case event := <-elRewardsStealingPenaltyReportedChan: handlers.HandleElRewardsStealingPenaltyReported(event) - return case event := <-elRewardsStealingPenaltySettledChan: handlers.HandleElRewardsStealingPenaltySettled(event) - return case event := <-elRewardsStealingPenaltyCancelledChan: handlers.HandleElRewardsStealingPenaltyCancelled(event) - return case event := <-initialSlashingSubmittedChan: handlers.HandleInitialSlashingSubmitted(event) - return case event := <-keyRemovalChargeAppliedChan: handlers.HandleKeyRemovalChargeApplied(event) - return case event := <-nodeOperatorManagerAddressChangeProposedChan: handlers.HandleNodeOperatorManagerAddressChangeProposed(event) - return case event := <-nodeOperatorManagerAddressChangedChan: handlers.HandleNodeOperatorManagerAddressChanged(event) - return case event := <-nodeOperatorRewardAddressChangeProposedChan: handlers.HandleNodeOperatorRewardAddressChangeProposed(event) - return case event := <-nodeOperatorRewardAddressChangedChan: handlers.HandleNodeOperatorRewardAddressChanged(event) - return case event := <-stuckSigningKeysCountChangedChan: handlers.HandleStuckSigningKeysCountChanged(event) - return case event := <-vettedSigningKeysCountDecreasedChan: handlers.HandleVettedSigningKeysCountDecreased(event) - return case event := <-withdrawalSubmittedChan: handlers.HandleWithdrawalSubmitted(event) - return case event := <-totalSigningKeysCountChangedChan: handlers.HandleTotalSigningKeysCountChanged(event) - return case event := <-publicReleaseChan: handlers.HandlePublicRelease(event) - return case <-ctx.Done(): return diff --git a/internal/adapters/execution/execution_adapter.go b/internal/adapters/execution/execution_adapter.go index b814b2c..189c1a4 100644 --- a/internal/adapters/execution/execution_adapter.go +++ b/internal/adapters/execution/execution_adapter.go @@ -63,3 +63,53 @@ func (e *ExecutionAdapter) GetMostRecentBlockNumber() (uint64, error) { return blockNumber, nil } + +// GetBlockTimestampByNumber retrieves the timestamp of the block with the specified number from the Ethereum execution client. +func (e *ExecutionAdapter) GetBlockTimestampByNumber(blockNumber uint64) (uint64, error) { + // Convert block number to hexadecimal + blockNumberHex := fmt.Sprintf("0x%x", blockNumber) + + // Create the request payload for eth_getBlockByNumber + payload := map[string]interface{}{ + "jsonrpc": "2.0", + "method": "eth_getBlockByNumber", + "params": []interface{}{blockNumberHex, false}, // `false` indicates we don't need full transaction objects + "id": 1, + } + + // Marshal the payload to JSON + jsonPayload, err := json.Marshal(payload) + if err != nil { + return 0, fmt.Errorf("failed to marshal request payload for eth_getBlockByNumber: %w", err) + } + + // Send the request to the execution client + resp, err := http.Post(e.rpcURL, "application/json", bytes.NewBuffer(jsonPayload)) + if err != nil { + return 0, fmt.Errorf("failed to send request to execution client at %s: %w", e.rpcURL, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("unexpected status code %d received from execution client", resp.StatusCode) + } + + // Parse the response + var result struct { + Result struct { + Timestamp string `json:"timestamp"` + } `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return 0, fmt.Errorf("failed to decode response from execution client: %w", err) + } + + // Convert the hexadecimal timestamp to uint64 + var timestamp uint64 + _, err = fmt.Sscanf(result.Result.Timestamp, "0x%x", ×tamp) + if err != nil { + return 0, fmt.Errorf("failed to parse timestamp from result %s: %w", result.Result.Timestamp, err) + } + + return timestamp, nil +} diff --git a/internal/adapters/execution/execution_adapter_integration_test.go b/internal/adapters/execution/execution_adapter_integration_test.go index f61f032..aa41776 100644 --- a/internal/adapters/execution/execution_adapter_integration_test.go +++ b/internal/adapters/execution/execution_adapter_integration_test.go @@ -38,3 +38,25 @@ func TestGetMostRecentBlockNumberIntegration(t *testing.T) { // Log the block number for debugging t.Logf("Most recent block number: %d", blockNumber) } + +// TestGetBlockTimestampByNumberIntegration tests fetching the timestamp of a specific block +func TestGetBlockTimestampByNumberIntegration(t *testing.T) { + adapter, err := setupExecutionAdapter(t) + assert.NoError(t, err) + + // Specify the block number to test + blockNumber := uint64(2876079) + + // Call the GetBlockTimestampByNumber method + timestamp, err := adapter.GetBlockTimestampByNumber(blockNumber) + assert.NoError(t, err) + + // Ensure timestamp is greater than 0, indicating a valid response + assert.Greater(t, timestamp, uint64(0), "Expected a non-zero timestamp") + + // ensure timestamp is 1733395440 + assert.Equal(t, uint64(1733395440), timestamp, "Expected timestamp to be 1733395440") + + // Log the timestamp for debugging + t.Logf("Timestamp for block %d: %d (Unix)", blockNumber, timestamp) +} diff --git a/internal/adapters/proxyApi/proxy_api_adapter.go b/internal/adapters/proxyApi/proxy_api_adapter.go index 8797c02..9e83cbc 100644 --- a/internal/adapters/proxyApi/proxy_api_adapter.go +++ b/internal/adapters/proxyApi/proxy_api_adapter.go @@ -6,6 +6,8 @@ import ( "net/http" "net/url" + "lido-events/internal/application/ports" + "github.com/gorilla/handlers" "github.com/gorilla/mux" ) @@ -17,6 +19,14 @@ type APIHandler struct { adapterPrefix string } +// Ensure APIHandler implements the ports.ProxyAPI interface +var _ ports.ProxyAPI = (*APIHandler)(nil) + +// GetRouter implements the ports.ProxyAPI interface +func (h *APIHandler) GetRouter() http.Handler { + return h.Router +} + // NewProxyAPIAdapter initializes the APIHandler and sets up routes func NewProxyAPIAdapter(allowedOrigins []string, proxyApiURL string) *APIHandler { h := &APIHandler{ diff --git a/internal/adapters/vebo/vebo_adapter.go b/internal/adapters/vebo/vebo_adapter.go index c6e07ab..8e6be03 100644 --- a/internal/adapters/vebo/vebo_adapter.go +++ b/internal/adapters/vebo/vebo_adapter.go @@ -88,10 +88,6 @@ func (va *VeboAdapter) WatchReportSubmittedEvents(ctx context.Context, handleRep select { case event := <-reportSubmittedChan: handleReportSubmittedEvent(event) - return - // case err := <-subReport.Err(): - // // Exit on subscription error - // return case <-ctx.Done(): return } diff --git a/internal/application/ports/api_port.go b/internal/application/ports/api_port.go new file mode 100644 index 0000000..a7c9301 --- /dev/null +++ b/internal/application/ports/api_port.go @@ -0,0 +1,8 @@ +package ports + +import "net/http" + +// API is the interface for the API adapter. +type API interface { + GetRouter() http.Handler +} diff --git a/internal/application/ports/execution_port.go b/internal/application/ports/execution_port.go index e1f68a4..eedf4f7 100644 --- a/internal/application/ports/execution_port.go +++ b/internal/application/ports/execution_port.go @@ -2,4 +2,5 @@ package ports type ExecutionPort interface { GetMostRecentBlockNumber() (uint64, error) + GetBlockTimestampByNumber(blockNumber uint64) (uint64, error) } diff --git a/internal/application/ports/proxy_api_port.go b/internal/application/ports/proxy_api_port.go new file mode 100644 index 0000000..f6b5d27 --- /dev/null +++ b/internal/application/ports/proxy_api_port.go @@ -0,0 +1,8 @@ +package ports + +import "net/http" + +// ProxyAPI is the interface for the Proxy API adapter. +type ProxyAPI interface { + GetRouter() http.Handler +} diff --git a/internal/application/services/api_server.go b/internal/application/services/api_server.go new file mode 100644 index 0000000..1fe12f8 --- /dev/null +++ b/internal/application/services/api_server.go @@ -0,0 +1,43 @@ +// internal/application/services/api_server_service.go +package services + +import ( + "context" + "lido-events/internal/application/ports" + "lido-events/internal/logger" + "net/http" + "strconv" + "sync" +) + +type APIServerService struct { + server *http.Server + servicePrefix string +} + +func NewAPIServerService(apiAdapter ports.API, port uint64) *APIServerService { + return &APIServerService{ + server: &http.Server{ + Addr: ":" + strconv.FormatUint(port, 10), + Handler: apiAdapter.GetRouter(), + }, + servicePrefix: "API", + } +} + +func (s *APIServerService) Start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + logger.InfoWithPrefix(s.servicePrefix, "server started on %s", s.server.Addr) + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + logger.FatalWithPrefix(s.servicePrefix, "server ListenAndServe: %v", err) + } + }() +} + +func (s *APIServerService) Shutdown(ctx context.Context) { + if err := s.server.Shutdown(ctx); err != nil { + logger.WarnWithPrefix(s.servicePrefix, "server Shutdown: %v", err) + } +} diff --git a/internal/application/services/distributionLogUpdatedEventScanner.go b/internal/application/services/distributionLogUpdatedEventScanner.go index b397123..8d277c3 100644 --- a/internal/application/services/distributionLogUpdatedEventScanner.go +++ b/internal/application/services/distributionLogUpdatedEventScanner.go @@ -99,11 +99,21 @@ func (ds *DistributionLogUpdatedEventScanner) HandleDistributionLogUpdatedEvent( return err } - message := fmt.Sprintf("- 📦 New distribution log updated: %s", distributionLogUpdated.LogCid) - if err := ds.notifierPort.SendNotification(message); err != nil { - logger.ErrorWithPrefix(ds.servicePrefix, "Error sending distributionLogUpdated notification: %v", err) + blockTimestamp, err := ds.executionPort.GetBlockTimestampByNumber(distributionLogUpdated.Raw.BlockNumber) + if err != nil { + logger.ErrorWithPrefix(ds.servicePrefix, "Error getting block timestamp for block number %d: %v", distributionLogUpdated.Raw.BlockNumber, err) return err } + // If the block timestamp is within the last 24 hours, send a notification + if time.Now().Unix()-int64(blockTimestamp) < 24*60*60 { + message := fmt.Sprintf("- 📦 New distribution log updated: %s", distributionLogUpdated.LogCid) + if err := ds.notifierPort.SendNotification(message); err != nil { + logger.ErrorWithPrefix(ds.servicePrefix, "Error sending distributionLogUpdated notification: %v", err) + return err + } + + } + return nil } diff --git a/internal/application/services/proxy_api_server.go b/internal/application/services/proxy_api_server.go new file mode 100644 index 0000000..42ecfe9 --- /dev/null +++ b/internal/application/services/proxy_api_server.go @@ -0,0 +1,43 @@ +// internal/application/services/proxy_api_server_service.go +package services + +import ( + "context" + "lido-events/internal/application/ports" + "lido-events/internal/logger" + "net/http" + "strconv" + "sync" +) + +type ProxyAPIServerService struct { + server *http.Server + servicePrefix string +} + +func NewProxyAPIServerService(proxyApiAdapter ports.ProxyAPI, port uint64) *ProxyAPIServerService { + return &ProxyAPIServerService{ + server: &http.Server{ + Addr: ":" + strconv.FormatUint(port, 10), + Handler: proxyApiAdapter.GetRouter(), + }, + servicePrefix: "Proxy API", + } +} + +func (s *ProxyAPIServerService) Start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + logger.InfoWithPrefix(s.servicePrefix, "server started on %s", s.server.Addr) + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + logger.FatalWithPrefix(s.servicePrefix, "server ListenAndServe: %v", err) + } + }() +} + +func (s *ProxyAPIServerService) Shutdown(ctx context.Context) { + if err := s.server.Shutdown(ctx); err != nil { + logger.WarnWithPrefix(s.servicePrefix, "server Shutdown: %v", err) + } +} diff --git a/internal/application/services/validatorEjector.go b/internal/application/services/validatorEjector.go index ba008bb..4a10c7d 100644 --- a/internal/application/services/validatorEjector.go +++ b/internal/application/services/validatorEjector.go @@ -60,7 +60,7 @@ func (ve *ValidatorEjector) ValidatorEjectorCron(ctx context.Context, interval t } } -// ejectValidator orchestrates the voluntary exit process for a validator +// EjectValidator orchestrates the voluntary exit process for a validator func (ve *ValidatorEjector) EjectValidator() error { logger.DebugWithPrefix(ve.servicePrefix, "Validator Ejector cron started") @@ -69,21 +69,48 @@ func (ve *ValidatorEjector) EjectValidator() error { return err } - for _, operatorID := range operatorIDs { + concurrencyLimit := 10 // could be part of configuration parameters + allExitRequests := []struct { + exitRequest domain.ExitRequest + operatorID string + }{} - // get exit requests + // Collect all exit requests. Could be from different operators, so we create a struct of exit requests with the operatorID + for _, operatorID := range operatorIDs { exitRequests, err := ve.storagePort.GetExitRequests(operatorID.String()) if err != nil { + logger.ErrorWithPrefix(ve.servicePrefix, "Error getting exit requests for operator %s: %v", operatorID.String(), err) continue } + // Append all exit request with their related operatorID + for _, er := range exitRequests { + allExitRequests = append(allExitRequests, struct { + exitRequest domain.ExitRequest + operatorID string + }{ + exitRequest: er, + operatorID: operatorID.String(), + }) + } + } + + // If we have more exit requests than concurrencyLimit, just process the first 10 + if len(allExitRequests) > concurrencyLimit { + allExitRequests = allExitRequests[:concurrencyLimit] // Returns the first "concurrencyLimit" elements + } - for _, exitRequest := range exitRequests { + var wg sync.WaitGroup + wg.Add(len(allExitRequests)) // Add the count of requests we are going to process + + for _, req := range allExitRequests { + go func(exitRequest domain.ExitRequest, operatorID string) { + defer wg.Done() // First thing we do is to check the onchain status of the validator. This way we make sure we dont try to exit a validator that is already exiting onchainStatus, err := ve.beaconchainPort.GetValidatorStatus(exitRequest.Event.ValidatorIndex.String()) if err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error getting validator status from beaconchain, skipping.", err) - continue + return } // TODO: simplify this logic @@ -91,25 +118,23 @@ func (ve *ValidatorEjector) EjectValidator() error { if onchainStatus != domain.StatusActiveOngoing && onchainStatus != domain.StatusActiveSlashed { if onchainStatus != domain.StatusPendingInitialized && onchainStatus != domain.StatusPendingQueued { logger.InfoWithPrefix(ve.servicePrefix, "Validator %s is %s so no exit request is required, deleting the exit request from db", exitRequest.Event.ValidatorIndex, exitRequest.Status) - // TODO: send notiifcation validator exited if timestamp of the event is within an hour - //Since the validator is already exiting, we remove the exit request from the db - if err := ve.storagePort.DeleteExitRequest(operatorID.String(), exitRequest.Event.ValidatorIndex.String()); err != nil { + // TODO: send notification validator exited if timestamp of the event is within an hour + // Since the validator is already exiting, we remove the exit request from the db + if err := ve.storagePort.DeleteExitRequest(operatorID, exitRequest.Event.ValidatorIndex.String()); err != nil { // An error here is no big deal, we will retry to delete this in the next iteration of the cron logger.ErrorWithPrefix(ve.servicePrefix, "Error deleting exit request from db", err) } } else { logger.DebugWithPrefix(ve.servicePrefix, "Validator %s is exited to request but it is in a pending status, %s waiting for it to be active", exitRequest.Event.ValidatorIndex, exitRequest.Status) } - continue + return } - // send notification and skip on error message := fmt.Sprintf("- 🚨 Your validator %s is requested to exit. Executing automatic exit.", exitRequest.Event.ValidatorIndex) if err := ve.notifierPort.SendNotification(message); err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error sending exit notification", err) } - // exit the validator logger.InfoWithPrefix(ve.servicePrefix, "Exiting validator %s with status %s", exitRequest.Event.ValidatorIndex, exitRequest.Status) if err := ve.exitValidatorPort.ExitValidator(exitRequest.ValidatorPubkeyHex, exitRequest.Event.ValidatorIndex.String()); err != nil { logger.WarnWithPrefix(ve.servicePrefix, "Failed to exit validator %s, a manual exit is required: %v", exitRequest.Event.ValidatorIndex, err) @@ -119,11 +144,10 @@ func (ve *ValidatorEjector) EjectValidator() error { if err := ve.notifierPort.SendNotification(message); err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error sending manual exit notification", err) } - continue + return } // TODO: send notification "exited submitted. Your validator will exit within X minutes. wait for confirmatio, If not confirmation received, please check manually" - // wait for the transaction to be included // call ve.beaconchainPort.GetValidatorStatus(string(validator.Event.ValidatorPubkey)) in a loop until the status is domain.StatusActiveExiting // a maximum of 64 times with a 30 second sleep between each call (check for 32 minutes, two times x minute) @@ -135,21 +159,18 @@ func (ve *ValidatorEjector) EjectValidator() error { validatorStatus, err := ve.beaconchainPort.GetValidatorStatus(exitRequest.ValidatorPubkeyHex) if err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error getting validator status", err) + time.Sleep(10 * time.Second) // Wait a bit before retrying to avoid spamming the beaconchain and give it time to resync/recover. continue } if validatorStatus == domain.StatusActiveExiting || validatorStatus == domain.StatusExitedUnslashed || validatorStatus == domain.StatusExitedSlashed { logger.InfoWithPrefix(ve.servicePrefix, "Validator %s has entered the exit queue", exitRequest.Event.ValidatorIndex) - - // send notification and skip on error message = fmt.Sprintf("- 🚪 Validator %s has entered the exit queue automatically, no manual action required", exitRequest.Event.ValidatorIndex) if err := ve.notifierPort.SendNotification(message); err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error sending exit notification", err) } - - // remove the exit request from the db logger.DebugWithPrefix(ve.servicePrefix, "Deleting exit request for validator %s from db", exitRequest.Event.ValidatorIndex) - if err := ve.storagePort.DeleteExitRequest(operatorID.String(), exitRequest.Event.ValidatorIndex.String()); err != nil { + if err := ve.storagePort.DeleteExitRequest(operatorID, exitRequest.Event.ValidatorIndex.String()); err != nil { logger.ErrorWithPrefix(ve.servicePrefix, "Error deleting exit request from db", err) } break @@ -158,9 +179,11 @@ func (ve *ValidatorEjector) EjectValidator() error { time.Sleep(30 * time.Second) } - } + }(req.exitRequest, req.operatorID) } + // Wait for all goroutines to complete (either success or failure) + wg.Wait() logger.DebugWithPrefix(ve.servicePrefix, "Validator Ejector cron finished") return nil }