Skip to content

Commit

Permalink
Simplify blocks full validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mashonskii committed Sep 16, 2024
1 parent 0278e98 commit 3828447
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 178 deletions.
164 changes: 66 additions & 98 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.netty.channel.group.DefaultChannelGroup
import monix.execution.cancelables.SerialCancelable
import monix.execution.{CancelableFuture, Scheduler}
import play.api.libs.json.*
import units.ELUpdater.FullValidationState.{Continue, Stop}
import units.ELUpdater.State.*
import units.ELUpdater.State.ChainStatus.{FollowingChain, Mining, WaitForNewChain}
import units.client.L2BlockLike
Expand Down Expand Up @@ -165,7 +164,7 @@ class ELUpdater(
}
}

private def callContract(fc: FUNCTION_CALL, blockData: EcBlock, invoker: KeyPair): Job[Unit] = {
private def callContract(fc: FUNCTION_CALL, blockData: EcBlock, invoker: KeyPair): JobResult[Unit] = {
val extraFee = if (blockchain.hasPaidVerifier(invoker.toAddress)) ScriptExtraFee else 0

val tx = InvokeScriptTransaction(
Expand Down Expand Up @@ -258,7 +257,7 @@ class ELUpdater(
}
}

private def rollbackTo(prevState: Working[ChainStatus], target: L2BlockLike, finalizedBlock: ContractBlock): Job[Working[ChainStatus]] = {
private def rollbackTo(prevState: Working[ChainStatus], target: L2BlockLike, finalizedBlock: ContractBlock): JobResult[Working[ChainStatus]] = {
val targetHash = target.hash
for {
rollbackBlock <- mkRollbackBlock(targetHash)
Expand Down Expand Up @@ -298,7 +297,7 @@ class ELUpdater(
lastElWithdrawalIndex: WithdrawalIndex,
chainContractOptions: ChainContractOptions,
prevEpochMinerRewardAddress: Option[EthAddress]
): Job[MiningData] = {
): JobResult[MiningData] = {
val firstElWithdrawalIndex = lastElWithdrawalIndex + 1
val startC2ETransferIndex = lastC2ETransferIndex + 1

Expand Down Expand Up @@ -933,7 +932,7 @@ class ELUpdater(
logger.debug(s"Unexpected state on sync: $other")
})

private def validateRandao(block: EcBlock, epochNumber: Int): Job[Unit] =
private def validateRandao(block: EcBlock, epochNumber: Int): JobResult[Unit] =
blockchain.vrf(epochNumber) match {
case None => ClientError(s"VRF of $epochNumber epoch is empty").asLeft
case Some(vrf) =>
Expand All @@ -945,7 +944,7 @@ class ELUpdater(
)
}

private def validateMiner(block: NetworkL2Block, epochInfo: Option[EpochInfo]): Job[Unit] = {
private def validateMiner(block: NetworkL2Block, epochInfo: Option[EpochInfo]): JobResult[Unit] = {
epochInfo match {
case Some(epochMeta) =>
for {
Expand All @@ -969,7 +968,7 @@ class ELUpdater(
}
}

private def validateTimestamp(newNetworkBlock: NetworkL2Block, parentEcBlock: EcBlock): Job[Unit] = {
private def validateTimestamp(newNetworkBlock: NetworkL2Block, parentEcBlock: EcBlock): JobResult[Unit] = {
val minAppendTs = parentEcBlock.timestamp + config.blockDelay.toSeconds
Either.cond(
newNetworkBlock.timestamp >= minAppendTs,
Expand All @@ -985,15 +984,15 @@ class ELUpdater(
networkBlock: NetworkL2Block,
parentBlock: EcBlock,
epochInfo: Option[EpochInfo]
): Job[Unit] = {
): JobResult[Unit] = {
for {
_ <- validateTimestamp(networkBlock, parentBlock)
_ <- validateMiner(networkBlock, epochInfo)
_ <- engineApiClient.applyNewPayload(networkBlock.payload)
} yield ()
}

private def getAltChainReferenceBlock(nodeChainInfo: ChainInfo, lastContractBlock: ContractBlock): Job[ContractBlock] = {
private def getAltChainReferenceBlock(nodeChainInfo: ChainInfo, lastContractBlock: ContractBlock): JobResult[ContractBlock] = {
if (nodeChainInfo.isMain) {
for {
lastEpoch <- chainContractClient
Expand Down Expand Up @@ -1160,7 +1159,7 @@ class ELUpdater(
}
}

private def mkRollbackBlock(rollbackTargetBlockId: BlockHash): Job[RollbackBlock] = for {
private def mkRollbackBlock(rollbackTargetBlockId: BlockHash): JobResult[RollbackBlock] = for {
targetBlockFromContract <- Right(chainContractClient.getBlock(rollbackTargetBlockId))
targetBlockOpt <- targetBlockFromContract match {
case None => engineApiClient.getBlockByHash(rollbackTargetBlockId)
Expand All @@ -1179,7 +1178,7 @@ class ELUpdater(
Withdrawal(index, x.destElAddress, Bridge.clToGweiNativeTokenAmount(x.amount))
}

private def getLastWithdrawalIndex(hash: BlockHash): Job[WithdrawalIndex] =
private def getLastWithdrawalIndex(hash: BlockHash): JobResult[WithdrawalIndex] =
engineApiClient.getBlockByHash(hash).flatMap {
case None => Left(ClientError(s"Can't find $hash block on EC during withdrawal search"))
case Some(ecBlock) =>
Expand All @@ -1191,7 +1190,7 @@ class ELUpdater(
}
}

private def getE2CTransfersRootHash(hash: BlockHash, elBridgeAddress: EthAddress): Job[Digest] =
private def getE2CTransfersRootHash(hash: BlockHash, elBridgeAddress: EthAddress): JobResult[Digest] =
for {
elRawLogs <- engineApiClient.getLogs(hash, elBridgeAddress, Bridge.ElSentNativeEventTopic)
rootHash <- {
Expand All @@ -1209,49 +1208,43 @@ class ELUpdater(
}
} yield rootHash

private def skipFinalizedBlocksValidation(curState: Working[ChainStatus]) = {
if (curState.finalizedBlock.height > curState.fullValidationStatus.lastValidatedBlock.height) {
val newState = curState.copy(fullValidationStatus = FullValidationStatus(curState.finalizedBlock, None))
setState("4", newState)
newState
} else curState
}

private def validateAppliedBlocks(): Unit = {
state match {
case w: Working[ChainStatus] =>
val blocksToValidate = getContractBlocksForValidation(w.lastContractBlock, w.fullValidationStatus.lastValidatedBlock)
blocksToValidate.foldLeft[FullValidationState](Continue(w)) {
case (Continue(curState), contractBlock) =>
getBlockForValidation(contractBlock, curState.lastEcBlock, curState.finalizedBlock)
.map {
case BlockForValidation.Found(contractBlock, ecBlock) =>
logger.debug(s"Trying to validate applied block ${contractBlock.hash}")
validateAppliedBlock(contractBlock, ecBlock, curState) match {
case Right(updatedState) =>
logger.debug(s"Block ${contractBlock.hash} was successfully validated")
Continue(updatedState)
case Left(err) =>
logger.debug(s"Validation of applied block ${contractBlock.hash} failed: ${err.message}")
processInvalidBlock(contractBlock, curState, None)
Stop
}
case BlockForValidation.SkippedFinalized(block) =>
logger.debug(s"Validation of applied block ${block.hash} skipped, because of finalization")
val newState =
curState.copy(fullValidationStatus = curState.fullValidationStatus.copy(lastValidatedBlock = block, lastElWithdrawalIndex = None))
setState("4", newState)
Continue(newState)
case BlockForValidation.NotFound => Stop
}
.fold[FullValidationState](
err => {
logger.warn(s"Validation of applied blocks error: ${err.message}")
Stop
},
identity
)
case _ => Stop
}
val startState = skipFinalizedBlocksValidation(w)
getContractBlocksForValidation(startState).fold[Unit](
err => logger.error(s"Validation of applied blocks error: ${err.message}"),
blocksToValidate =>
blocksToValidate.foldLeft[JobResult[Working[ChainStatus]]](Right(startState)) {
case (Right(curState), block) =>
logger.debug(s"Trying to validate applied block ${block.hash}")
validateAppliedBlock(block.contractBlock, block.ecBlock, curState) match {
case Right(updatedState) =>
logger.debug(s"Block ${block.hash} was successfully validated")
Right(updatedState)
case Left(err) =>
logger.debug(s"Validation of applied block ${block.hash} failed: ${err.message}")
processInvalidBlock(block.contractBlock, curState, None)
Left(err)
}
case (err, _) => err
}
)
case other =>
logger.debug(s"Skipping validation of applied blocks: $other")
Either.unit
}
}

private def validateE2CTransfers(contractBlock: ContractBlock, elBridgeAddress: EthAddress): Job[Unit] =
private def validateE2CTransfers(contractBlock: ContractBlock, elBridgeAddress: EthAddress): JobResult[Unit] =
getE2CTransfersRootHash(contractBlock.hash, elBridgeAddress).flatMap { elRootHash =>
// elRootHash is the source of true
if (java.util.Arrays.equals(contractBlock.e2cTransfersRootHash, elRootHash)) Either.unit
Expand All @@ -1270,7 +1263,7 @@ class ELUpdater(
ecBlock: EcBlock,
fullValidationStatus: FullValidationStatus,
chainContractOptions: ChainContractOptions
): Job[Option[WithdrawalIndex]] = {
): JobResult[Option[WithdrawalIndex]] = {
val blockEpoch = chainContractClient
.getEpochMeta(contractBlock.epoch)
.getOrElse(throw new RuntimeException(s"Can't find an epoch ${contractBlock.epoch} data of block ${contractBlock.hash} on chain contract"))
Expand Down Expand Up @@ -1308,7 +1301,7 @@ class ELUpdater(
contractBlock: ContractBlock,
parentBlock: EcBlock,
prevState: Working[ChainStatus]
): Job[Working[ChainStatus]] = {
): JobResult[Working[ChainStatus]] = {
logger.debug(s"Trying to do full validation of block ${networkBlock.hash}")
for {
_ <- preValidateBlock(networkBlock, parentBlock, None)
Expand All @@ -1322,7 +1315,7 @@ class ELUpdater(
contractBlock: ContractBlock,
ecBlock: EcBlock,
prevState: Working[ChainStatus]
): Job[Working[ChainStatus]] = {
): JobResult[Working[ChainStatus]] = {
val validationResult =
for {
_ <- Either.cond(
Expand Down Expand Up @@ -1388,49 +1381,33 @@ class ELUpdater(
}
}

private def getContractBlocksForValidation(
lastContractBlock: ContractBlock,
lastValidatedBlock: ContractBlock
): List[ContractBlock] = {
private def getContractBlocksForValidation(curState: Working[ChainStatus]): JobResult[List[BlockForValidation]] = {
@tailrec
def loop(curBlock: ContractBlock, acc: List[ContractBlock]): List[ContractBlock] = {
if (curBlock.parentHash == lastValidatedBlock.hash) {
curBlock :: acc
def loop(curBlock: ContractBlock, acc: List[BlockForValidation]): JobResult[List[BlockForValidation]] = {
if (curBlock.height <= curState.fullValidationStatus.lastValidatedBlock.height || curBlock.height <= curState.finalizedBlock.height) {
Right(acc)
} else {
chainContractClient.getBlock(curBlock.parentHash) match {
case Some(block) =>
loop(block, curBlock :: acc)
case Some(parentBlock) =>
if (curBlock.height > curState.lastEcBlock.height) {
loop(parentBlock, acc)
} else {
engineApiClient.getBlockByHash(curBlock.hash) match {
case Right(Some(ecBlock)) =>
loop(parentBlock, BlockForValidation(curBlock, ecBlock) :: acc)
case Right(None) =>
Left(ClientError(s"Block ${curBlock.hash} not found on EC client for full validation"))
case Left(err) =>
Left(ClientError(s"Can't get EC block ${curBlock.hash} for full validation: ${err.message}"))
}
}
case _ =>
logger.error(s"Block ${curBlock.parentHash} not found at contract during full validation")
List.empty
Left(ClientError(s"Block ${curBlock.parentHash} not found at contract during full validation"))
}
}
}

if (lastContractBlock.height <= lastValidatedBlock.height) {
List.empty
} else {
loop(lastContractBlock, List.empty)
}
}

private def getBlockForValidation(
contractBlock: ContractBlock,
lastEcBlock: EcBlock,
finalizedBlock: ContractBlock
): Job[BlockForValidation] = {
if (contractBlock.height <= lastEcBlock.height) {
if (contractBlock.height <= finalizedBlock.height) Right(BlockForValidation.SkippedFinalized(contractBlock))
else
engineApiClient.getBlockByHash(contractBlock.hash).map {
case Some(ecBlock) => BlockForValidation.Found(contractBlock, ecBlock)
case None =>
logger.debug(s"Can't find a block ${contractBlock.hash} on EC client for a full validation")
BlockForValidation.NotFound
}
} else {
Right(BlockForValidation.NotFound)
}
loop(curState.lastContractBlock, List.empty)
}

private def validateC2ETransfers(
Expand Down Expand Up @@ -1494,12 +1471,12 @@ class ELUpdater(
}
.map(_ => ())

private def confirmBlock(block: L2BlockLike, finalizedBlock: L2BlockLike): Job[String] = {
private def confirmBlock(block: L2BlockLike, finalizedBlock: L2BlockLike): JobResult[String] = {
val finalizedBlockHash = if (finalizedBlock.height > block.height) block.hash else finalizedBlock.hash
engineApiClient.forkChoiceUpdate(block.hash, finalizedBlockHash)
}

private def confirmBlock(hash: BlockHash, finalizedBlockHash: BlockHash): Job[String] =
private def confirmBlock(hash: BlockHash, finalizedBlockHash: BlockHash): JobResult[String] =
engineApiClient.forkChoiceUpdate(hash, finalizedBlockHash)

private def confirmBlockAndStartMining(
Expand All @@ -1509,7 +1486,7 @@ class ELUpdater(
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal]
): Job[String] = {
): JobResult[String] = {
val finalizedBlockHash = if (finalizedBlock.height > lastBlock.height) lastBlock.hash else finalizedBlock.hash
engineApiClient
.forkChoiceUpdateWithPayloadId(
Expand Down Expand Up @@ -1607,17 +1584,8 @@ object ELUpdater {

private case class MiningData(payloadId: PayloadId, nextBlockUnixTs: Long, lastC2ETransferIndex: Long, lastElWithdrawalIndex: WithdrawalIndex)

private sealed trait BlockForValidation
private object BlockForValidation {
case class Found(contractBlock: ContractBlock, ecBlock: EcBlock) extends BlockForValidation
case class SkippedFinalized(block: ContractBlock) extends BlockForValidation
case object NotFound extends BlockForValidation
}

sealed trait FullValidationState
object FullValidationState {
case class Continue(state: Working[ChainStatus]) extends FullValidationState
case object Stop extends FullValidationState
private case class BlockForValidation(contractBlock: ContractBlock, ecBlock: EcBlock) {
val hash: BlockHash = contractBlock.hash
}

case class FullValidationStatus(lastValidatedBlock: ContractBlock, lastElWithdrawalIndex: Option[WithdrawalIndex]) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/units/client/contract/ContractFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import com.wavesplatform.lang.v1.FunctionHeader
import com.wavesplatform.lang.v1.compiler.Terms.{CONST_BYTESTR, CONST_LONG, CONST_STRING, EVALUATED, FUNCTION_CALL}
import org.web3j.utils.Numeric.cleanHexPrefix
import units.util.HexBytesConverter.toHexNoPrefix
import units.{BlockHash, ClientError, Job}
import units.{BlockHash, ClientError, JobResult}

abstract class ContractFunction(name: String, reference: BlockHash, extraArgs: Either[CommonError, List[EVALUATED]]) {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): Job[FUNCTION_CALL] = (for {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): JobResult[FUNCTION_CALL] = (for {
hash <- CONST_STRING(cleanHexPrefix(blockHash))
ref <- CONST_STRING(cleanHexPrefix(reference))
trh <- CONST_STRING(toHexNoPrefix(transfersRootHash))
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/units/client/engine/EngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import play.api.libs.json.*
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, Job}
import units.{BlockHash, JobResult}

trait EngineApiClient {
def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] // TODO Replace String with an appropriate type
def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[String] // TODO Replace String with an appropriate type

def forkChoiceUpdateWithPayloadId(
lastBlockHash: BlockHash,
Expand All @@ -16,25 +16,25 @@ trait EngineApiClient {
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal] = Vector.empty
): Job[PayloadId]
): JobResult[PayloadId]

def getPayload(payloadId: PayloadId): Job[JsObject]
def getPayload(payloadId: PayloadId): JobResult[JsObject]

def applyNewPayload(payload: JsObject): Job[Option[BlockHash]]
def applyNewPayload(payload: JsObject): JobResult[Option[BlockHash]]

def getPayloadBodyByHash(hash: BlockHash): Job[Option[JsObject]]
def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]]

def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]]
def getBlockByNumber(number: BlockNumber): JobResult[Option[EcBlock]]

def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]]
def getBlockByHash(hash: BlockHash): JobResult[Option[EcBlock]]

def getBlockByHashJson(hash: BlockHash): Job[Option[JsObject]]
def getBlockByHashJson(hash: BlockHash): JobResult[Option[JsObject]]

def getLastExecutionBlock: Job[EcBlock]
def getLastExecutionBlock: JobResult[EcBlock]

def blockExists(hash: BlockHash): Job[Boolean]
def blockExists(hash: BlockHash): JobResult[Boolean]

def getLogs(hash: BlockHash, address: EthAddress, topic: String): Job[List[GetLogsResponseEntry]]
def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]]
}

object EngineApiClient {
Expand Down
Loading

0 comments on commit 3828447

Please sign in to comment.