From 6f096be70797d1cd0ab8bd35817f3b12d1f9b9ec Mon Sep 17 00:00:00 2001 From: Ivan Mashonskii Date: Tue, 1 Oct 2024 17:17:26 +0300 Subject: [PATCH 1/3] Better logs for block applying --- src/main/scala/units/ELUpdater.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index 1b08afd2..dd4a3151 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -88,6 +88,7 @@ class ELUpdater( case w: Working[ChainStatus] => w.returnToMainChainInfo match { case Some(rInfo) if rInfo.missedBlock.hash == payload.hash => + logger.debug(s"New block ${payload.hash} payload is for missed block from main chain ${rInfo.chainId}") chainContractClient.getChainInfo(rInfo.chainId) match { case Some(chainInfo) if chainInfo.isMain => validateAndApplyMissed(epi, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo) @@ -96,7 +97,15 @@ class ELUpdater( case _ => logger.error(s"Failed to get chain ${rInfo.chainId} info, ignoring ${payload.hash}") } - case _ => logger.debug(s"Expecting ${w.returnToMainChainInfo.fold("no block payload")(_.toString)}, ignoring unexpected ${payload.hash}") + case rInfo => + val returnToMainChainMsg = rInfo match { + case Some(rInfo) => s", main chain id: ${rInfo.chainId}, expected missed payload for main chain block: ${rInfo.missedBlock.hash}" + case _ => ", no missed blocks' payloads from main chain expecting" + } + + logger.debug( + s"Ignoring unexpected payload for block ${payload.hash}, expected parent for current chain: ${w.lastPayload.hash}$returnToMainChainMsg" + ) } case other => logger.debug(s"$other: ignoring ${payload.hash}") From ae872b5d791aa7a9b7ee17cce3981a7cb7fcc74d Mon Sep 17 00:00:00 2001 From: Ivan Mashonskii Date: Tue, 1 Oct 2024 18:29:56 +0300 Subject: [PATCH 2/3] Remove ClientError --- src/main/scala/units/ClientError.scala | 4 - src/main/scala/units/ELUpdater.scala | 146 +++++++++--------- .../scala/units/client/JsonRpcClient.scala | 6 +- .../client/contract/ContractFunction.scala | 6 +- .../units/client/engine/EngineApiClient.scala | 24 +-- .../client/engine/HttpEngineApiClient.scala | 60 +++---- .../client/engine/LoggedEngineApiClient.scala | 54 ++++--- .../scala/units/network/HistoryReplier.scala | 7 +- src/main/scala/units/package.scala | 3 +- src/test/scala/units/HasJobLogging.scala | 6 +- .../scala/units/client/TestEcClients.scala | 26 ++-- 11 files changed, 164 insertions(+), 178 deletions(-) delete mode 100644 src/main/scala/units/ClientError.scala diff --git a/src/main/scala/units/ClientError.scala b/src/main/scala/units/ClientError.scala deleted file mode 100644 index 706db9f6..00000000 --- a/src/main/scala/units/ClientError.scala +++ /dev/null @@ -1,4 +0,0 @@ -package units - -// TODO: maybe remove? -case class ClientError(message: String) diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index dd4a3151..f4733aa0 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -168,7 +168,7 @@ class ELUpdater( } } - private def callContract(fc: FUNCTION_CALL, payload: ExecutionPayload, invoker: KeyPair): JobResult[Unit] = { + private def callContract(fc: FUNCTION_CALL, payload: ExecutionPayload, invoker: KeyPair): Either[String, Unit] = { val extraFee = if (blockchain.hasPaidVerifier(invoker.toAddress)) ScriptExtraFee else 0 val tx = InvokeScriptTransaction( @@ -201,9 +201,9 @@ class ELUpdater( logger.error(fatalReasonMessage) forceStopApplication(UnsupportedFeature) } - ClientError(s"Failed tx=${tx.id()}: $message").asLeft + s"Failed tx=${tx.id()}: $message".asLeft - case Left(e) => ClientError(s"Failed tx=${tx.id()}: ${e.toString}").asLeft + case Left(e) => s"Failed tx=${tx.id()}: ${e.toString}".asLeft } } @@ -235,16 +235,16 @@ class ELUpdater( payloadJson <- engineApiClient.getPayload(payloadId) _ = logger.info(s"Forged payload $payloadId") latestValidHashOpt <- engineApiClient.applyNewPayload(payloadJson) - latestValidHash <- Either.fromOption(latestValidHashOpt, ClientError("Latest valid hash not defined")) + latestValidHash <- Either.fromOption(latestValidHashOpt, "Latest valid hash not defined") _ = logger.info(s"Applied payload $payloadId, block hash is $latestValidHash, timestamp = $timestamp") - newPm <- payloadObserver.broadcastSigned(payloadJson, m.keyPair.privateKey).leftMap(ClientError.apply) - payloadInfo <- newPm.payloadInfo.leftMap(ClientError.apply) + newPm <- payloadObserver.broadcastSigned(payloadJson, m.keyPair.privateKey) + payloadInfo <- newPm.payloadInfo payload = payloadInfo.payload transfersRootHash <- getE2CTransfersRootHash(payload.hash, chainContractOptions.elBridgeAddress) funcCall <- contractFunction.toFunctionCall(payload.hash, transfersRootHash, m.lastC2ETransferIndex) _ <- callContract(funcCall, payload, m.keyPair) } yield payload).fold( - err => logger.error(s"Failed to forge block for payloadId $payloadId at epoch ${epochInfo.number}: ${err.message}"), + err => logger.error(s"Failed to forge block for payloadId $payloadId at epoch ${epochInfo.number}: $err"), newPayload => scheduler.execute { () => tryToForgeNextBlock(epochInfo.number, newPayload, chainContractOptions) } ) } @@ -256,7 +256,11 @@ class ELUpdater( } } - private def rollbackTo(prevState: Working[ChainStatus], target: CommonBlockData, finalizedBlock: ContractBlock): JobResult[Working[ChainStatus]] = { + private def rollbackTo( + prevState: Working[ChainStatus], + target: CommonBlockData, + finalizedBlock: ContractBlock + ): Either[String, Working[ChainStatus]] = { val targetHash = target.hash for { rollbackBlock <- mkRollbackBlock(targetHash) @@ -268,7 +272,7 @@ class ELUpdater( _ <- Either.cond( targetHash == lastPayload.hash, (), - ClientError(s"Rollback to $targetHash error: last block hash ${lastPayload.hash} is not equal to target block hash") + s"Rollback to $targetHash error: last block hash ${lastPayload.hash} is not equal to target block hash" ) } yield { logger.info(s"Rollback to $targetHash finished successfully") @@ -296,7 +300,7 @@ class ELUpdater( lastElWithdrawalIndex: WithdrawalIndex, chainContractOptions: ChainContractOptions, prevEpochMinerRewardAddress: Option[EthAddress] - ): JobResult[MiningData] = { + ): Either[String, MiningData] = { val firstElWithdrawalIndex = lastElWithdrawalIndex + 1 val startC2ETransferIndex = lastC2ETransferIndex + 1 @@ -382,7 +386,7 @@ class ELUpdater( ) ) }).fold( - err => logger.error(s"Error starting payload build process: ${err.message}"), + err => logger.error(s"Error starting payload build process: $err"), _ => () ) case _ => @@ -410,7 +414,7 @@ class ELUpdater( None ).fold[Unit]( err => { - logger.error(s"Error starting payload build process: ${err.message}") + logger.error(s"Error starting payload build process: $err") scheduler.scheduleOnce(MiningRetryInterval) { tryToForgeNextBlock(epochNumber, parentPayload, chainContractOptions) } @@ -454,7 +458,7 @@ class ELUpdater( (for { newEpochInfo <- calculateEpochInfo mainChainInfo <- chainContractClient.getMainChainInfo.toRight("Can't get main chain info") - lastPayload <- engineApiClient.getLatestBlock.leftMap(_.message) + lastPayload <- engineApiClient.getLatestBlock } yield { logger.trace(s"Following main chain ${mainChainInfo.id}") val fullValidationStatus = FullValidationStatus( @@ -559,7 +563,7 @@ class ELUpdater( tryToStartMining(newState, Left(chainSwitchInfo)) } }).fold( - err => logger.error(err.message), + err => logger.error(err), _ => () ) case w: Working[ChainStatus] => @@ -751,7 +755,7 @@ class ELUpdater( requestAndProcessBlockPayload(contractBlock.hash) PayloadRequestResult.Requested(contractBlock) case Left(err) => - logger.warn(s"Failed to get block ${contractBlock.hash} payload by hash: ${err.message}") + logger.warn(s"Failed to get block ${contractBlock.hash} payload by hash: $err") requestAndProcessBlockPayload(contractBlock.hash) PayloadRequestResult.Requested(contractBlock) } @@ -775,7 +779,7 @@ class ELUpdater( logger.error(s"Failed to get chain ${returnToMainChainInfo.chainId} info: not found") } case Left(err) => - logger.debug(s"Missed block ${payload.hash} of main chain ${returnToMainChainInfo.chainId} validation error: ${err.message}") + logger.debug(s"Missed block ${payload.hash} of main chain ${returnToMainChainInfo.chainId} validation error: $err") } case PayloadRequestResult.Requested(_) => } @@ -814,7 +818,7 @@ class ELUpdater( finalizedBlockPayload } case Left(err) => - logger.warn(s"Failed to get block ${curBlock.hash} payload by hash: ${err.message}") + logger.warn(s"Failed to get block ${curBlock.hash} payload by hash: $err") finalizedBlockPayload } } @@ -850,7 +854,7 @@ class ELUpdater( case Right(updatedState) => Some(followChain(nodeChainInfo, updatedState.lastPayload, mainChainInfo, updatedState.fullValidationStatus, returnToMainChainInfo)) case Left(err) => - logger.error(s"Failed to rollback to ${target.hash}: ${err.message}") + logger.error(s"Failed to rollback to ${target.hash}: $err") None } } @@ -895,7 +899,7 @@ class ELUpdater( logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}") engineApiClient.getLatestBlock match { case Left(error) => - logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}") + logger.error(s"Sync to ${target.hash} was not completed, error=$error") setState("23", Starting) case Right(lastPayload) if lastPayload.hash == target.hash => logger.debug(s"Finished synchronization to ${target.hash} successfully") @@ -935,40 +939,38 @@ class ELUpdater( logger.debug(s"Unexpected state on sync: $other") }) - private def validateRandao(payload: ExecutionPayload, epochNumber: Int): JobResult[Unit] = + private def validateRandao(payload: ExecutionPayload, epochNumber: Int): Either[String, Unit] = blockchain.vrf(epochNumber) match { - case None => ClientError(s"VRF of $epochNumber epoch is empty").asLeft + case None => s"VRF of $epochNumber epoch is empty".asLeft case Some(vrf) => val expectedPrevRandao = calculateRandao(vrf, payload.parentHash) Either.cond( expectedPrevRandao == payload.prevRandao, (), - ClientError(s"expected prevRandao $expectedPrevRandao, got ${payload.prevRandao}, VRF=$vrf of $epochNumber") + s"expected prevRandao $expectedPrevRandao, got ${payload.prevRandao}, VRF=$vrf of $epochNumber" ) } - private def validateMiner(epi: ExecutionPayloadInfo, epochInfo: Option[EpochInfo]): JobResult[Unit] = { + private def validateMiner(epi: ExecutionPayloadInfo, epochInfo: Option[EpochInfo]): Either[String, Unit] = { val payload = epi.payload epochInfo match { case Some(epochMeta) => Either.cond( payload.feeRecipient == epochMeta.rewardAddress, (), - ClientError(s"block miner ${payload.feeRecipient} doesn't equal to ${epochMeta.rewardAddress}") + s"block miner ${payload.feeRecipient} doesn't equal to ${epochMeta.rewardAddress}" ) case _ => Either.unit } } - private def validateTimestamp(payload: ExecutionPayload, parentPayload: ExecutionPayload): JobResult[Unit] = { + private def validateTimestamp(payload: ExecutionPayload, parentPayload: ExecutionPayload): Either[String, Unit] = { val minAppendTs = parentPayload.timestamp + config.blockDelay.toSeconds Either.cond( payload.timestamp >= minAppendTs, (), - ClientError( - s"timestamp (${payload.timestamp}) of appended block must be greater or equal $minAppendTs, " + - s"Δ${minAppendTs - payload.timestamp}s" - ) + s"timestamp (${payload.timestamp}) of appended block must be greater or equal $minAppendTs, " + + s"Δ${minAppendTs - payload.timestamp}s" ) } @@ -976,7 +978,7 @@ class ELUpdater( epi: ExecutionPayloadInfo, parentPayload: ExecutionPayload, epochInfo: Option[EpochInfo] - ): JobResult[Unit] = { + ): Either[String, Unit] = { for { _ <- validateTimestamp(epi.payload, parentPayload) _ <- validateMiner(epi, epochInfo) @@ -984,30 +986,28 @@ class ELUpdater( } yield () } - private def getAltChainReferenceBlock(nodeChainInfo: ChainInfo, lastContractBlock: ContractBlock): JobResult[ContractBlock] = { + private def getAltChainReferenceBlock(nodeChainInfo: ChainInfo, lastContractBlock: ContractBlock): Either[String, ContractBlock] = { if (nodeChainInfo.isMain) { for { lastEpoch <- chainContractClient .getEpochMeta(lastContractBlock.epoch) .toRight( - ClientError( - s"Can't find the epoch #${lastContractBlock.epoch} metadata of invalid block ${lastContractBlock.hash} on contract" - ) + s"Can't find the epoch #${lastContractBlock.epoch} metadata of invalid block ${lastContractBlock.hash} on contract" ) prevEpoch <- chainContractClient .getEpochMeta(lastEpoch.prevEpoch) - .toRight(ClientError(s"Can't find a previous epoch #${lastEpoch.prevEpoch} metadata on contract")) + .toRight(s"Can't find a previous epoch #${lastEpoch.prevEpoch} metadata on contract") referenceBlockHash = prevEpoch.lastBlockHash referenceBlock <- chainContractClient .getBlock(referenceBlockHash) - .toRight(ClientError(s"Can't find a last block $referenceBlockHash of epoch #${lastEpoch.prevEpoch} on contract")) + .toRight(s"Can't find a last block $referenceBlockHash of epoch #${lastEpoch.prevEpoch} on contract") } yield referenceBlock } else { val blockHash = nodeChainInfo.firstBlock.parentHash chainContractClient .getBlock(blockHash) .toRight( - ClientError(s"Parent block $blockHash for first block ${nodeChainInfo.firstBlock.hash} of chain ${nodeChainInfo.id} not found at contract") + s"Parent block $blockHash for first block ${nodeChainInfo.firstBlock.hash} of chain ${nodeChainInfo.id} not found at contract" ) } } @@ -1025,7 +1025,7 @@ class ELUpdater( logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} was successfully validated") broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, None) case Left(err) => - logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} validation error: ${err.message}, ignoring block") + logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} validation error: $err, ignoring block") } } @@ -1045,7 +1045,7 @@ class ELUpdater( logger.debug(s"Block ${payload.hash} was successfully validated") broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, returnToMainChainInfo) case Left(err) => - logger.debug(s"Block ${payload.hash} validation error: ${err.message}") + logger.debug(s"Block ${payload.hash} validation error: $err") processInvalidBlock(contractBlock, prevState, Some(nodeChainInfo)) } case contractBlock => @@ -1057,7 +1057,7 @@ class ELUpdater( logger.debug(s"Block ${payload.hash} was successfully partially validated") broadcastAndConfirmBlock(epi, prevState, nodeChainInfo, returnToMainChainInfo) case Left(err) => - logger.error(s"Block ${payload.hash} prevalidation error: ${err.message}, ignoring block") + logger.error(s"Block ${payload.hash} prevalidation error: $err, ignoring block") } } } @@ -1071,7 +1071,7 @@ class ELUpdater( val finalizedBlock = prevState.finalizedBlock confirmBlock(payload, finalizedBlock) .fold[Unit]( - err => logger.error(s"Can't confirm block ${payload.hash} of chain ${nodeChainInfo.id}: ${err.message}"), + err => logger.error(s"Can't confirm block ${payload.hash} of chain ${nodeChainInfo.id}: $err"), _ => { logger.info(s"Successfully confirmed block ${payload.hash} of chain ${nodeChainInfo.id}") followChainAndRequestNextBlockPayload( @@ -1132,7 +1132,7 @@ class ELUpdater( setState("7", newState) maybeRequestNextBlockPayload(newState, finalizedBlock) case Left(err) => - logger.error(s"Failed to confirm next block ${payload.hash}: ${err.message}") + logger.error(s"Failed to confirm next block ${payload.hash}: $err") prevState } case PayloadRequestResult.Requested(contractBlock) => @@ -1147,19 +1147,19 @@ class ELUpdater( } } - private def mkRollbackBlock(rollbackTargetBlockHash: BlockHash): JobResult[RollbackBlock] = for { + private def mkRollbackBlock(rollbackTargetBlockHash: BlockHash): Either[String, RollbackBlock] = for { targetBlockDataOpt <- chainContractClient.getBlock(rollbackTargetBlockHash) match { case None => engineApiClient.getBlockByHash(rollbackTargetBlockHash) case x => Right(x) } targetBlockData <- Either.fromOption( targetBlockDataOpt, - ClientError(s"Can't find block $rollbackTargetBlockHash neither on a contract, nor in EC") + s"Can't find block $rollbackTargetBlockHash neither on a contract, nor in EC" ) parentPayloadOpt <- engineApiClient.getBlockByHash(targetBlockData.parentHash) - parentPayload <- Either.fromOption(parentPayloadOpt, ClientError(s"Can't find block $rollbackTargetBlockHash parent payload in execution client")) + parentPayload <- Either.fromOption(parentPayloadOpt, s"Can't find block $rollbackTargetBlockHash parent payload in execution client") rollbackBlockOpt <- engineApiClient.applyNewPayload(EmptyPayload.mkExecutionPayloadJson(parentPayload)) - rollbackBlock <- Either.fromOption(rollbackBlockOpt, ClientError("Rollback block hash is not defined as latest valid hash")) + rollbackBlock <- Either.fromOption(rollbackBlockOpt, "Rollback block hash is not defined as latest valid hash") } yield RollbackBlock(rollbackBlock, parentPayload) private def toWithdrawals(transfers: Vector[ChainContractClient.ContractTransfer], firstWithdrawalIndex: Long): Vector[Withdrawal] = @@ -1168,9 +1168,9 @@ class ELUpdater( Withdrawal(index, x.destElAddress, Bridge.clToGweiNativeTokenAmount(x.amount)) } - private def getLastWithdrawalIndex(hash: BlockHash): JobResult[WithdrawalIndex] = + private def getLastWithdrawalIndex(hash: BlockHash): Either[String, WithdrawalIndex] = engineApiClient.getBlockByHash(hash).flatMap { - case None => Left(ClientError(s"Can't find block $hash payload on EC during withdrawal search")) + case None => Left(s"Can't find block $hash payload on EC during withdrawal search") case Some(payload) => payload.withdrawals.lastOption match { case Some(lastWithdrawal) => Right(lastWithdrawal.index) @@ -1180,14 +1180,13 @@ class ELUpdater( } } - private def getE2CTransfersRootHash(hash: BlockHash, elBridgeAddress: EthAddress): JobResult[Digest] = + private def getE2CTransfersRootHash(hash: BlockHash, elBridgeAddress: EthAddress): Either[String, Digest] = for { elRawLogs <- engineApiClient.getLogs(hash, elBridgeAddress, Bridge.ElSentNativeEventTopic) rootHash <- { val relatedElRawLogs = elRawLogs.filter(x => x.address == elBridgeAddress && x.topics.contains(Bridge.ElSentNativeEventTopic)) Bridge .mkTransfersHash(relatedElRawLogs) - .leftMap(e => ClientError(e)) .map { rootHash => if (rootHash.isEmpty) rootHash else { @@ -1211,9 +1210,9 @@ class ELUpdater( case w: Working[ChainStatus] => val startState = skipFinalizedBlocksValidation(w) getContractBlocksForValidation(startState).fold[Unit]( - err => logger.error(s"Validation of applied blocks error: ${err.message}"), + err => logger.error(s"Validation of applied blocks error: $err"), blocksToValidate => - blocksToValidate.foldLeft[JobResult[Working[ChainStatus]]](Right(startState)) { + blocksToValidate.foldLeft[Either[String, Working[ChainStatus]]](Right(startState)) { case (Right(curState), block) => logger.debug(s"Trying to validate applied block ${block.hash}") validateAppliedBlock(block.contractBlock, block.payload, curState) match { @@ -1221,7 +1220,7 @@ class ELUpdater( logger.debug(s"Block ${block.hash} was successfully validated") Right(updatedState) case Left(err) => - logger.debug(s"Validation of applied block ${block.hash} failed: ${err.message}") + logger.debug(s"Validation of applied block ${block.hash} failed: $err") processInvalidBlock(block.contractBlock, curState, None) Left(err) } @@ -1234,17 +1233,15 @@ class ELUpdater( } } - private def validateE2CTransfers(contractBlock: ContractBlock, elBridgeAddress: EthAddress): JobResult[Unit] = + private def validateE2CTransfers(contractBlock: ContractBlock, elBridgeAddress: EthAddress): Either[String, Unit] = getE2CTransfersRootHash(contractBlock.hash, elBridgeAddress).flatMap { elRootHash => // elRootHash is the source of true if (java.util.Arrays.equals(contractBlock.e2cTransfersRootHash, elRootHash)) Either.unit else Left( - ClientError( - s"EL to CL transfers hash of ${contractBlock.hash} are different: " + - s"EL=${toHexNoPrefix(elRootHash)}, " + - s"CL=${toHexNoPrefix(contractBlock.e2cTransfersRootHash)}" - ) + s"EL to CL transfers hash of ${contractBlock.hash} are different: " + + s"EL=${toHexNoPrefix(elRootHash)}, " + + s"CL=${toHexNoPrefix(contractBlock.e2cTransfersRootHash)}" ) } @@ -1253,7 +1250,7 @@ class ELUpdater( payload: ExecutionPayload, fullValidationStatus: FullValidationStatus, chainContractOptions: ChainContractOptions - ): JobResult[Option[WithdrawalIndex]] = { + ): Either[String, Option[WithdrawalIndex]] = { val blockEpoch = chainContractClient .getEpochMeta(contractBlock.epoch) .getOrElse(throw new RuntimeException(s"Can't find an epoch ${contractBlock.epoch} data of block ${contractBlock.hash} on chain contract")) @@ -1282,7 +1279,6 @@ class ELUpdater( chainContractOptions, elWithdrawalIndexBefore ) - .leftMap(ClientError.apply) } yield Some(lastElWithdrawalIndex) } @@ -1291,7 +1287,7 @@ class ELUpdater( contractBlock: ContractBlock, parentPayload: ExecutionPayload, prevState: Working[ChainStatus] - ): JobResult[Working[ChainStatus]] = { + ): Either[String, Working[ChainStatus]] = { val payload = epi.payload logger.debug(s"Trying to do full validation of block ${payload.hash}") for { @@ -1305,15 +1301,13 @@ class ELUpdater( contractBlock: ContractBlock, payload: ExecutionPayload, prevState: Working[ChainStatus] - ): JobResult[Working[ChainStatus]] = { + ): Either[String, Working[ChainStatus]] = { val validationResult = for { _ <- Either.cond( contractBlock.feeRecipient == payload.feeRecipient, (), - ClientError( - s"Miner in block payload (${payload.feeRecipient}) should be equal to miner on contract (${contractBlock.feeRecipient})" - ) + s"Miner in block payload (${payload.feeRecipient}) should be equal to miner on contract (${contractBlock.feeRecipient})" ) _ <- validateE2CTransfers(contractBlock, prevState.options.elBridgeAddress) updatedLastElWithdrawalIndex <- validateWithdrawals(contractBlock, payload, prevState.fullValidationStatus, prevState.options) @@ -1344,7 +1338,7 @@ class ELUpdater( updatedState <- rollbackTo(prevState, referenceBlock, prevState.finalizedBlock) lastValidBlock <- chainContractClient .getBlock(updatedState.lastPayload.hash) - .toRight(ClientError(s"Block ${updatedState.lastPayload.hash} not found at contract")) + .toRight(s"Block ${updatedState.lastPayload.hash} not found at contract") } yield { findAltChain(chainInfo.id, lastValidBlock.hash) match { case Some(altChainInfo) => @@ -1363,7 +1357,7 @@ class ELUpdater( newState } }).fold( - err => logger.error(err.message), + err => logger.error(err), _ => () ) case Some(_) => @@ -1373,9 +1367,9 @@ class ELUpdater( } } - private def getContractBlocksForValidation(curState: Working[ChainStatus]): JobResult[List[BlockForValidation]] = { + private def getContractBlocksForValidation(curState: Working[ChainStatus]): Either[String, List[BlockForValidation]] = { @tailrec - def loop(curBlock: ContractBlock, acc: List[BlockForValidation]): JobResult[List[BlockForValidation]] = { + def loop(curBlock: ContractBlock, acc: List[BlockForValidation]): Either[String, List[BlockForValidation]] = { if (curBlock.height <= curState.fullValidationStatus.lastValidatedBlock.height || curBlock.height <= curState.finalizedBlock.height) { Right(acc) } else { @@ -1388,13 +1382,13 @@ class ELUpdater( case Right(Some(payload)) => loop(parentBlock, BlockForValidation(curBlock, payload) :: acc) case Right(None) => - Left(ClientError(s"Block ${curBlock.hash} payload not found on EC client for full validation")) + Left(s"Block ${curBlock.hash} payload not found on EC client for full validation") case Left(err) => - Left(ClientError(s"Can't get block ${curBlock.hash} payload for full validation: ${err.message}")) + Left(s"Can't get block ${curBlock.hash} payload for full validation: $err") } } case _ => - Left(ClientError(s"Block ${curBlock.parentHash} not found at contract during full validation")) + Left(s"Block ${curBlock.parentHash} not found at contract during full validation") } } } @@ -1463,12 +1457,12 @@ class ELUpdater( } .map(_ => ()) - private def confirmBlock(blockData: CommonBlockData, finalizedBlockData: CommonBlockData): JobResult[PayloadStatus] = { + private def confirmBlock(blockData: CommonBlockData, finalizedBlockData: CommonBlockData): Either[String, PayloadStatus] = { val finalizedBlockHash = if (finalizedBlockData.height > blockData.height) blockData.hash else finalizedBlockData.hash engineApiClient.forkChoiceUpdated(blockData.hash, finalizedBlockHash) } - private def confirmBlock(hash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] = + private def confirmBlock(hash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] = engineApiClient.forkChoiceUpdated(hash, finalizedBlockHash) private def confirmBlockAndStartMining( @@ -1478,7 +1472,7 @@ class ELUpdater( suggestedFeeRecipient: EthAddress, prevRandao: String, withdrawals: Vector[Withdrawal] - ): JobResult[PayloadId] = { + ): Either[String, PayloadId] = { val finalizedBlockHash = if (finalizedBlock.height > lastPayload.height) lastPayload.hash else finalizedBlock.hash engineApiClient .forkChoiceUpdatedWithPayloadId( diff --git a/src/main/scala/units/client/JsonRpcClient.scala b/src/main/scala/units/client/JsonRpcClient.scala index dcc1f99c..738496c4 100644 --- a/src/main/scala/units/client/JsonRpcClient.scala +++ b/src/main/scala/units/client/JsonRpcClient.scala @@ -3,7 +3,7 @@ package units.client import cats.Id import cats.syntax.either.* import units.client.JsonRpcClient.DefaultTimeout -import units.{ClientConfig, ClientError} +import units.ClientConfig import play.api.libs.json.{JsError, JsValue, Reads, Writes} import sttp.client3.* import sttp.client3.playJson.* @@ -23,8 +23,8 @@ trait JsonRpcClient { protected def sendRequest[RQ: Writes, RP: Reads](requestBody: RQ, timeout: FiniteDuration = DefaultTimeout): Either[String, Option[RP]] = sendRequest(mkRequest(requestBody, timeout), config.apiRequestRetries) - protected def parseJson[A: Reads](jsValue: JsValue): Either[ClientError, A] = - Try(jsValue.as[A]).toEither.leftMap(err => ClientError(s"Response parse error: ${err.getMessage}")) + protected def parseJson[A: Reads](jsValue: JsValue): Either[String, A] = + Try(jsValue.as[A]).toEither.leftMap(err => s"Response parse error: ${err.getMessage}") private def mkRequest[A: Writes, B: Reads](requestBody: A, timeout: FiniteDuration): RpcRequest[B] = { val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString diff --git a/src/main/scala/units/client/contract/ContractFunction.scala b/src/main/scala/units/client/contract/ContractFunction.scala index 9efe7a09..7d4fe4f1 100644 --- a/src/main/scala/units/client/contract/ContractFunction.scala +++ b/src/main/scala/units/client/contract/ContractFunction.scala @@ -8,10 +8,10 @@ import com.wavesplatform.lang.v1.FunctionHeader import com.wavesplatform.lang.v1.compiler.Terms.{CONST_BYTESTR, CONST_LONG, CONST_STRING, EVALUATED, FUNCTION_CALL} import org.web3j.utils.Numeric.cleanHexPrefix import units.util.HexBytesConverter.toHexNoPrefix -import units.{BlockHash, ClientError, JobResult} +import units.BlockHash abstract class ContractFunction(name: String, reference: BlockHash, extraArgs: Either[CommonError, List[EVALUATED]]) { - def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): JobResult[FUNCTION_CALL] = (for { + def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): Either[String, FUNCTION_CALL] = (for { hash <- CONST_STRING(cleanHexPrefix(blockHash)) ref <- CONST_STRING(cleanHexPrefix(reference)) trh <- CONST_STRING(toHexNoPrefix(transfersRootHash)) @@ -19,7 +19,7 @@ abstract class ContractFunction(name: String, reference: BlockHash, extraArgs: E } yield FUNCTION_CALL( FunctionHeader.User(name), List(hash, ref) ++ xtra ++ List(trh, CONST_LONG(lastC2ETransferIndex)) - )).leftMap(e => ClientError(s"Error building function call for $name: $e")) + )).leftMap(e => s"Error building function call for $name: $e") } object ContractFunction { diff --git a/src/main/scala/units/client/engine/EngineApiClient.scala b/src/main/scala/units/client/engine/EngineApiClient.scala index 04b97a96..cce20bce 100644 --- a/src/main/scala/units/client/engine/EngineApiClient.scala +++ b/src/main/scala/units/client/engine/EngineApiClient.scala @@ -4,10 +4,10 @@ import play.api.libs.json.* import units.client.engine.EngineApiClient.PayloadId import units.client.engine.model.* import units.eth.EthAddress -import units.{BlockHash, JobResult} +import units.BlockHash trait EngineApiClient { - def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] + def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] def forkChoiceUpdatedWithPayloadId( lastBlockHash: BlockHash, @@ -16,25 +16,25 @@ trait EngineApiClient { suggestedFeeRecipient: EthAddress, prevRandao: String, withdrawals: Vector[Withdrawal] = Vector.empty - ): JobResult[PayloadId] + ): Either[String, PayloadId] - def getPayload(payloadId: PayloadId): JobResult[JsObject] + def getPayload(payloadId: PayloadId): Either[String, JsObject] - def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] + def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] - def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] + def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] - def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] + def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] - def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] + def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] - def getLatestBlock: JobResult[ExecutionPayload] + def getLatestBlock: Either[String, ExecutionPayload] - def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] + def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] - def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] + def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] - def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] + def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] } object EngineApiClient { diff --git a/src/main/scala/units/client/engine/HttpEngineApiClient.scala b/src/main/scala/units/client/engine/HttpEngineApiClient.scala index 03937d99..1b4785ec 100644 --- a/src/main/scala/units/client/engine/HttpEngineApiClient.scala +++ b/src/main/scala/units/client/engine/HttpEngineApiClient.scala @@ -12,7 +12,7 @@ import units.client.engine.model.* import units.client.engine.model.ForkChoiceUpdatedRequest.ForkChoiceAttributes import units.client.engine.model.PayloadStatus.{Syncing, Valid} import units.eth.EthAddress -import units.{BlockHash, ClientConfig, ClientError, JobResult} +import units.{BlockHash, ClientConfig} import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -20,7 +20,7 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide val apiUrl: Uri = uri"${config.executionClientAddress}" - def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] = { + def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] = { sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse]( ForkChoiceUpdatedRequest(blockHash, finalizedBlockHash, None), BlockExecutionTimeout @@ -28,8 +28,8 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide .flatMap { case ForkChoiceUpdatedResponse(ps @ PayloadState(Valid | Syncing, _, _), None) => Right(ps.status) case ForkChoiceUpdatedResponse(PayloadState(_, _, Some(validationError)), _) => - Left(ClientError(s"Payload validation error: $validationError")) - case ForkChoiceUpdatedResponse(payloadState, _) => Left(ClientError(s"Unexpected payload status ${payloadState.status}")) + Left(s"Payload validation error: $validationError") + case ForkChoiceUpdatedResponse(payloadState, _) => Left(s"Unexpected payload status ${payloadState.status}") } } @@ -40,7 +40,7 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide suggestedFeeRecipient: EthAddress, prevRandao: String, withdrawals: Vector[Withdrawal] - ): JobResult[PayloadId] = { + ): Either[String, PayloadId] = { sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse]( ForkChoiceUpdatedRequest( lastBlockHash, @@ -52,74 +52,74 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide case ForkChoiceUpdatedResponse(PayloadState(Valid, _, _), Some(payloadId)) => Right(payloadId) case ForkChoiceUpdatedResponse(_, None) => - Left(ClientError(s"Payload id for $lastBlockHash is not defined")) + Left(s"Payload id for $lastBlockHash is not defined") case ForkChoiceUpdatedResponse(PayloadState(_, _, Some(validationError)), _) => - Left(ClientError(s"Payload validation error for $lastBlockHash: $validationError")) + Left(s"Payload validation error for $lastBlockHash: $validationError") case ForkChoiceUpdatedResponse(payloadState, _) => - Left(ClientError(s"Unexpected payload status for $lastBlockHash: ${payloadState.status}")) + Left(s"Unexpected payload status for $lastBlockHash: ${payloadState.status}") } } - def getPayload(payloadId: PayloadId): JobResult[JsObject] = { + def getPayload(payloadId: PayloadId): Either[String, JsObject] = { sendEngineRequest[GetPayloadRequest, GetPayloadResponse](GetPayloadRequest(payloadId), NonBlockExecutionTimeout).map(_.executionPayload) } - def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] = { + def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] = { sendEngineRequest[NewPayloadRequest, PayloadState](NewPayloadRequest(payloadJson), BlockExecutionTimeout).flatMap { - case PayloadState(_, _, Some(validationError)) => Left(ClientError(s"Payload validation error: $validationError")) + case PayloadState(_, _, Some(validationError)) => Left(s"Payload validation error: $validationError") case PayloadState(Valid, Some(latestValidHash), _) => Right(Some(latestValidHash)) case PayloadState(Syncing, latestValidHash, _) => Right(latestValidHash) - case PayloadState(status, None, _) => Left(ClientError(s"Latest valid hash is not defined at status $status")) - case PayloadState(status, _, _) => Left(ClientError(s"Unexpected payload status: $status")) + case PayloadState(status, None, _) => Left(s"Latest valid hash is not defined at status $status") + case PayloadState(status, _, _) => Left(s"Unexpected payload status: $status") } } - def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] = { + def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] = { sendEngineRequest[GetPayloadBodyByHash, JsArray](GetPayloadBodyByHash(hash), NonBlockExecutionTimeout) .map(_.value.headOption.flatMap(_.asOpt[JsObject])) } - def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] = { + def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] = { for { json <- sendRequest[GetBlockByNumberRequest, JsObject](GetBlockByNumberRequest(number.str)) - .leftMap(err => ClientError(s"Error getting payload by number $number: $err")) + .leftMap(err => s"Error getting payload by number $number: $err") blockMeta <- json.traverse(parseJson[ExecutionPayload](_)) } yield blockMeta } - def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] = { + def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] = { sendRequest[GetBlockByHashRequest, ExecutionPayload](GetBlockByHashRequest(hash)) - .leftMap(err => ClientError(s"Error getting payload by hash $hash: $err")) + .leftMap(err => s"Error getting payload by hash $hash: $err") } - def getLatestBlock: JobResult[ExecutionPayload] = for { + def getLatestBlock: Either[String, ExecutionPayload] = for { lastPayloadOpt <- getBlockByNumber(BlockNumber.Latest) - lastPayload <- Either.fromOption(lastPayloadOpt, ClientError("Impossible: EC doesn't have payloads")) + lastPayload <- Either.fromOption(lastPayloadOpt, "Impossible: EC doesn't have payloads") } yield lastPayload - def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] = { + def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] = { sendRequest[GetBlockByHashRequest, JsObject](GetBlockByHashRequest(hash)) - .leftMap(err => ClientError(s"Error getting block json by hash $hash: $err")) + .leftMap(err => s"Error getting block json by hash $hash: $err") } - def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] = { + def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] = { for { blockJsonOpt <- getBlockJsonByHash(hash) - blockJson <- Either.fromOption(blockJsonOpt, ClientError("block not found")) + blockJson <- Either.fromOption(blockJsonOpt, "block not found") payloadBodyJsonOpt <- getPayloadBodyByHash(hash) - payloadBodyJson <- Either.fromOption(payloadBodyJsonOpt, ClientError("payload body not found")) + payloadBodyJson <- Either.fromOption(payloadBodyJsonOpt, "payload body not found") } yield PayloadJsonData(blockJson, payloadBodyJson) } - def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] = + def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] = sendRequest[GetLogsRequest, List[GetLogsResponseEntry]](GetLogsRequest(hash, address, List(topic))) - .leftMap(err => ClientError(s"Error getting block logs by hash $hash: $err")) + .leftMap(err => s"Error getting block logs by hash $hash: $err") .map(_.getOrElse(List.empty)) - private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): JobResult[B] = { + private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): Either[String, B] = { sendRequest(request, timeout) match { - case Right(response) => response.toRight(ClientError(s"Unexpected engine API empty response")) - case Left(err) => Left(ClientError(s"Engine API request error: $err")) + case Right(response) => response.toRight(s"Unexpected engine API empty response") + case Left(err) => Left(s"Engine API request error: $err") } } } diff --git a/src/main/scala/units/client/engine/LoggedEngineApiClient.scala b/src/main/scala/units/client/engine/LoggedEngineApiClient.scala index e16dc796..f04b4b10 100644 --- a/src/main/scala/units/client/engine/LoggedEngineApiClient.scala +++ b/src/main/scala/units/client/engine/LoggedEngineApiClient.scala @@ -7,7 +7,7 @@ import units.client.engine.EngineApiClient.PayloadId import units.client.engine.LoggedEngineApiClient.excludedJsonFields import units.client.engine.model.* import units.eth.EthAddress -import units.{BlockHash, JobResult} +import units.BlockHash import java.util.concurrent.ThreadLocalRandom import scala.util.chaining.scalaUtilChainingOps @@ -15,8 +15,8 @@ import scala.util.chaining.scalaUtilChainingOps class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient { protected val log: LoggerFacade = LoggerFacade(LoggerFactory.getLogger(underlying.getClass)) - override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] = - wrap(s"forkChoiceUpdated($blockHash, f=$finalizedBlockHash)", underlying.forkChoiceUpdated(blockHash, finalizedBlockHash)) + override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] = + wrap(s"forkChoiceUpdated($blockHash, f=$finalizedBlockHash)", underlying.forkChoiceUpdated(blockHash, finalizedBlockHash))() override def forkChoiceUpdatedWithPayloadId( lastBlockHash: BlockHash, @@ -25,50 +25,48 @@ class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient suggestedFeeRecipient: EthAddress, prevRandao: String, withdrawals: Vector[Withdrawal] - ): JobResult[PayloadId] = wrap( + ): Either[String, PayloadId] = wrap( s"forkChoiceUpdatedWithPayloadId(l=$lastBlockHash, f=$finalizedBlockHash, ts=$unixEpochSeconds, m=$suggestedFeeRecipient, " + s"r=$prevRandao, w={${withdrawals.mkString(", ")}}", underlying.forkChoiceUpdatedWithPayloadId(lastBlockHash, finalizedBlockHash, unixEpochSeconds, suggestedFeeRecipient, prevRandao, withdrawals) - ) + )() - override def getPayload(payloadId: PayloadId): JobResult[JsObject] = - wrap(s"getPayload($payloadId)", underlying.getPayload(payloadId), filteredJsonStr) + override def getPayload(payloadId: PayloadId): Either[String, JsObject] = + wrap(s"getPayload($payloadId)", underlying.getPayload(payloadId))(filteredJsonStr) - override def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] = - wrap(s"applyNewPayload(${filteredJsonStr(payloadJson)})", underlying.applyNewPayload(payloadJson), _.fold("None")(identity)) + override def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] = + wrap(s"applyNewPayload(${filteredJsonStr(payloadJson)})", underlying.applyNewPayload(payloadJson))(_.fold("None")(identity)) - override def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] = - wrap(s"getPayloadBodyByHash($hash)", underlying.getPayloadBodyByHash(hash), _.fold("None")(filteredJsonStr)) + override def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] = + wrap(s"getPayloadBodyByHash($hash)", underlying.getPayloadBodyByHash(hash))(_.fold("None")(filteredJsonStr)) - override def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] = - wrap(s"getBlockByNumber($number)", underlying.getBlockByNumber(number), _.fold("None")(_.toString)) + override def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] = + wrap(s"getBlockByNumber($number)", underlying.getBlockByNumber(number))(_.fold("None")(_.toString)) - override def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] = - wrap(s"getBlockByHash($hash)", underlying.getBlockByHash(hash), _.fold("None")(_.toString)) + override def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] = + wrap(s"getBlockByHash($hash)", underlying.getBlockByHash(hash))(_.fold("None")(_.toString)) - override def getLatestBlock: JobResult[ExecutionPayload] = - wrap("getLatestBlock", underlying.getLatestBlock) + override def getLatestBlock: Either[String, ExecutionPayload] = + wrap("getLatestBlock", underlying.getLatestBlock)() - override def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] = - wrap(s"getBlockJsonByHash($hash)", underlying.getBlockJsonByHash(hash), _.fold("None")(filteredJsonStr)) + override def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] = + wrap(s"getBlockJsonByHash($hash)", underlying.getBlockJsonByHash(hash))(_.fold("None")(filteredJsonStr)) - override def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] = { - wrap( - s"getPayloadJsonDataByHash($hash)", - underlying.getPayloadJsonDataByHash(hash), - pjd => PayloadJsonData(filteredJson(pjd.blockJson), filteredJson(pjd.bodyJson)).toString + override def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] = { + wrap(s"getPayloadJsonDataByHash($hash)", underlying.getPayloadJsonDataByHash(hash))(pjd => + PayloadJsonData(filteredJson(pjd.blockJson), filteredJson(pjd.bodyJson)).toString ) } - override def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] = - wrap(s"getLogs($hash, a=$address, t=$topic)", underlying.getLogs(hash, address, topic), _.view.map(_.data).mkString("{", ", ", "}")) + override def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] = + wrap(s"getLogs($hash, a=$address, t=$topic)", underlying.getLogs(hash, address, topic))(_.view.map(_.data).mkString("{", ", ", "}")) - protected def wrap[R](method: String, f: => JobResult[R], toMsg: R => String = (_: R).toString): JobResult[R] = { + protected def wrap[R](method: String, f: => Either[String, R])(toMsg: R => String = (_: R).toString): Either[String, R] = { val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString log.debug(s"[$currRequestId] $method") f.tap { - case Left(e) => log.debug(s"[$currRequestId] Error: ${e.message}") + case Left(e) => log.debug(s"[$currRequestId] Error: $e") case Right(r) => log.debug(s"[$currRequestId] Success: ${toMsg(r)}") } } diff --git a/src/main/scala/units/network/HistoryReplier.scala b/src/main/scala/units/network/HistoryReplier.scala index b7083d46..03057668 100644 --- a/src/main/scala/units/network/HistoryReplier.scala +++ b/src/main/scala/units/network/HistoryReplier.scala @@ -1,13 +1,12 @@ package units.network -import cats.syntax.either.* import com.wavesplatform.network.id import com.wavesplatform.utils.ScorexLogging import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} import monix.execution.Scheduler import units.client.engine.EngineApiClient -import units.{BlockHash, ClientError} +import units.BlockHash import scala.concurrent.Future import scala.util.{Failure, Success} @@ -39,9 +38,9 @@ class HistoryReplier(engineApiClient: EngineApiClient)(implicit sc: Scheduler) e case _ => super.channelRead(ctx, msg) } - private def loadPayload(hash: BlockHash): Future[Either[ClientError, PayloadMessage]] = Future { + private def loadPayload(hash: BlockHash): Future[Either[String, PayloadMessage]] = Future { engineApiClient.getPayloadJsonDataByHash(hash).flatMap { payloadJsonData => - PayloadMessage(payloadJsonData.toPayloadJson).leftMap(ClientError.apply) + PayloadMessage(payloadJsonData.toPayloadJson) } } } diff --git a/src/main/scala/units/package.scala b/src/main/scala/units/package.scala index b8b13ff3..c20934cd 100644 --- a/src/main/scala/units/package.scala +++ b/src/main/scala/units/package.scala @@ -1,4 +1,3 @@ package object units { - type BlockHash = BlockHash.Type - type JobResult[A] = Either[ClientError, A] + type BlockHash = BlockHash.Type } diff --git a/src/test/scala/units/HasJobLogging.scala b/src/test/scala/units/HasJobLogging.scala index c3fcc229..a337af7d 100644 --- a/src/test/scala/units/HasJobLogging.scala +++ b/src/test/scala/units/HasJobLogging.scala @@ -7,15 +7,15 @@ import java.util.concurrent.ThreadLocalRandom import scala.util.chaining.scalaUtilChainingOps trait HasJobLogging extends ScorexLogging { - protected def wrap[A](method: String, f: => JobResult[A], toMsg: A => String = (_: A).toString): JobResult[A] = { + protected def wrap[A](method: String, f: => Either[String, A], toMsg: A => String = (_: A).toString): Either[String, A] = { val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString log.debug(s"[$currRequestId] $method") f.tap { - case Left(e) => log.debug(s"[$currRequestId] Error: ${e.message}") + case Left(e) => log.debug(s"[$currRequestId] Error: $e") case Right(r) => log.debug(s"[$currRequestId] Result: ${toMsg(r)}") } } - protected def l(text: String): JobResult[Unit] = log.debug(text).asRight + protected def l(text: String): Either[String, Unit] = log.debug(text).asRight } diff --git a/src/test/scala/units/client/TestEcClients.scala b/src/test/scala/units/client/TestEcClients.scala index 81adcae0..9ab2209d 100644 --- a/src/test/scala/units/client/TestEcClients.scala +++ b/src/test/scala/units/client/TestEcClients.scala @@ -13,7 +13,7 @@ import units.client.engine.{EngineApiClient, LoggedEngineApiClient} import units.collections.ListOps.* import units.eth.EthAddress import units.network.PayloadMessage -import units.{BlockHash, JobResult} +import units.BlockHash class TestEcClients private ( knownBlocks: Atomic[Map[BlockHash, ChainId]], @@ -65,7 +65,7 @@ class TestEcClients private ( val engineApi = new LoggedEngineApiClient( new EngineApiClient { - override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] = { + override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] = { knownBlocks.get().get(blockHash) match { case Some(cid) => currChainIdValue.set(cid) @@ -87,7 +87,7 @@ class TestEcClients private ( suggestedFeeRecipient: EthAddress, prevRandao: String, withdrawals: Vector[Withdrawal] - ): JobResult[PayloadId] = + ): Either[String, PayloadId] = forgingBlocks .get() .collectFirst { case fb if fb.testPayload.parentHash == lastBlockHash => fb } match { @@ -99,7 +99,7 @@ class TestEcClients private ( fb.payloadId.asRight } - override def getPayload(payloadId: PayloadId): JobResult[JsObject] = + override def getPayload(payloadId: PayloadId): Either[String, JsObject] = forgingBlocks.transformAndExtract(_.withoutFirst { fb => fb.payloadId == payloadId }) match { case Some(fb) => TestPayloads.toPayloadJson(fb.testPayload, fb.testPayload.prevRandao).asRight case None => @@ -108,7 +108,7 @@ class TestEcClients private ( ) } - override def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] = { + override def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] = { val newPayload = PayloadMessage(payloadJson).flatMap(_.payloadInfo).explicitGet().payload knownBlocks.get().get(newPayload.parentHash) match { case Some(cid) => @@ -129,16 +129,16 @@ class TestEcClients private ( Some(newPayload.hash) }.asRight - override def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] = + override def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] = notImplementedMethodJob("getPayloadBodyJsonByHash") - override def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] = + override def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] = number match { case BlockNumber.Latest => currChain.headOption.asRight case BlockNumber.Number(n) => currChain.find(_.height == n).asRight } - override def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] = { + override def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] = { for { cid <- knownBlocks.get().get(hash) c <- chains.get().get(cid) @@ -146,23 +146,23 @@ class TestEcClients private ( } yield b }.asRight - override def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] = + override def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] = notImplementedMethodJob("getBlockJsonByHash") - override def getLatestBlock: JobResult[ExecutionPayload] = currChain.head.asRight + override def getLatestBlock: Either[String, ExecutionPayload] = currChain.head.asRight - override def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] = { + override def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] = { val request = GetLogsRequest(hash, address, List(topic)) getLogsCalls.transform(_ + hash) logs.get().getOrElse(request, throw notImplementedCase("call setBlockLogs")) }.asRight - override def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] = + override def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] = notImplementedMethodJob("getPayloadJsonDataByHash") } ) - protected def notImplementedMethodJob[A](text: String): JobResult[A] = throw new NotImplementedMethod(text) + protected def notImplementedMethodJob[A](text: String): Either[String, A] = throw new NotImplementedMethod(text) protected def notImplementedCase(text: String): Throwable = new NotImplementedCase(text) } From da82aa74fc992fb4b649e6d1f22d2fb5c7e91c1b Mon Sep 17 00:00:00 2001 From: Ivan Mashonskii Date: Wed, 2 Oct 2024 18:15:21 +0300 Subject: [PATCH 3/3] Better logs --- src/main/scala/units/ELUpdater.scala | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index f4733aa0..75c80beb 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -68,6 +68,7 @@ class ELUpdater( if (payload.timestamp - now <= MaxTimeDrift) { state match { case WaitingForSyncHead(target, _) if payload.hash == target.hash => + logger.debug(s"New block ${payload.hash} payload is for sync target block") val syncStarted = for { _ <- engineApiClient.applyNewPayload(epi.payloadJson) fcuStatus <- confirmBlock(target, target) @@ -84,6 +85,7 @@ class ELUpdater( } case w @ Working(_, lastPayload, _, _, _, FollowingChain(nodeChainInfo, _), _, returnToMainChainInfo) if payload.parentHash == lastPayload.hash => + logger.debug(s"New block ${payload.hash} payload refers to the last applied block") validateAndApply(epi, w, lastPayload, nodeChainInfo, returnToMainChainInfo) case w: Working[ChainStatus] => w.returnToMainChainInfo match { @@ -183,10 +185,12 @@ class ELUpdater( Proofs.empty, blockchain.settings.addressSchemeCharacter.toByte ).signWith(invoker.privateKey) + + cleanPriorityPool() + logger.info( s"Invoking ${config.chainContractAddress} '${fc.function.funcName}' for block ${payload.hash}->${payload.parentHash}, txId=${tx.id()}" ) - cleanPriorityPool() broadcastTx(tx).resultE match { case Right(true) => Either.unit @@ -865,9 +869,11 @@ class ELUpdater( (chainContractClient.getMainChainInfo, chainContractClient.getChainInfo(prevChainId)) match { case (Some(mainChainInfo), Some(prevChainInfo)) => if (mainChainInfo.id != prevState.mainChainInfo.id) { + logger.debug(s"Main chain was changed: switching to the chain ${mainChainInfo.id} (previous is ${prevState.mainChainInfo.id})") val updatedLastPayload = findLastPayload(mainChainInfo.lastBlock) rollbackAndFollowMainChain(updatedLastPayload, mainChainInfo) } else if (prevChainInfo.firstBlock.height < finalizedContractBlock.height && !prevChainInfo.isMain) { + logger.debug(s"Current chain ${prevChainInfo.id} became inactive, switching to main chain ${mainChainInfo.id}") val targetBlockHash = prevChainInfo.firstBlock.parentHash chainContractClient.getBlock(targetBlockHash) match { case Some(targetBlock) => rollbackAndFollowMainChain(targetBlock, mainChainInfo) @@ -876,6 +882,7 @@ class ELUpdater( None } } else if (isLastBlockOnFork(prevChainInfo, prevState.lastPayload)) { + logger.debug(s"Last applied block ${prevState.lastPayload.hash} is on fork, trying to rollback to the most recent block from contract") val updatedLastPayload = findLastPayload(prevChainInfo.lastBlock) rollbackAndFollowChain(updatedLastPayload, prevChainInfo, mainChainInfo, prevState.returnToMainChainInfo) } else { @@ -1052,9 +1059,10 @@ class ELUpdater( // we should check block miner based on epochInfo if block is not at contract yet val epochInfo = if (contractBlock.isEmpty) Some(prevState.epochInfo) else None + logger.debug(s"Trying to do prevalidation of block ${payload.hash}") preValidateBlock(epi, parentPayload, epochInfo) match { case Right(_) => - logger.debug(s"Block ${payload.hash} was successfully partially validated") + logger.debug(s"Block ${payload.hash} was successfully prevalidated") broadcastAndConfirmBlock(epi, prevState, nodeChainInfo, returnToMainChainInfo) case Left(err) => logger.error(s"Block ${payload.hash} prevalidation error: $err, ignoring block") @@ -1099,7 +1107,7 @@ class ELUpdater( confirmBlockAndFollowChain(epi.payload, prevState, nodeChainInfo, returnToMainChainInfo) } - private def findBlockChild(parent: BlockHash, lastBlockHash: BlockHash): Either[String, ContractBlock] = { + private def findBlockChild(parent: BlockHash, lastBlockHash: BlockHash): Option[ContractBlock] = { @tailrec def loop(b: BlockHash): Option[ContractBlock] = chainContractClient.getBlock(b) match { case None => None @@ -1108,7 +1116,7 @@ class ELUpdater( else loop(cb.parentHash) } - loop(lastBlockHash).toRight(s"Could not find child of $parent") + loop(lastBlockHash) } @tailrec @@ -1116,10 +1124,7 @@ class ELUpdater( if (prevState.lastPayload.height < prevState.chainStatus.nodeChainInfo.lastBlock.height) { logger.debug(s"EC chain is not synced, trying to find next block to request payload") findBlockChild(prevState.lastPayload.hash, prevState.chainStatus.nodeChainInfo.lastBlock.hash) match { - case Left(error) => - logger.error(s"Could not find child of ${prevState.lastPayload.hash} on contract: $error") - prevState - case Right(contractBlock) => + case Some(contractBlock) => requestBlockPayload(contractBlock) match { case PayloadRequestResult.Exists(payload) => logger.debug(s"Block ${contractBlock.hash} payload exists at EC chain, trying to confirm") @@ -1140,6 +1145,9 @@ class ELUpdater( setState("8", newState) newState } + case _ => + logger.error(s"Could not find child of ${prevState.lastPayload.hash} on contract") + prevState } } else { logger.trace(s"EC chain ${prevState.chainStatus.nodeChainInfo.id} is synced, no need to request block payloads")