Skip to content

Commit

Permalink
Rename block mentions
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-mashonskiy committed Sep 30, 2024
1 parent 2c26c3d commit 2ff6c0f
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab

val config: ClientConfig = context.settings.config.as[ClientConfig]("waves.l2")

private val payloadObserverScheduler = Schedulers.singleThread("block-observer-l2", reporter = { e => log.warn("Error in BlockObserver", e) })
private val payloadObserverScheduler = Schedulers.singleThread("payload-observer-l2", reporter = { e => log.warn("Error in PayloadObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService = Scheduler.singleThread("el-updater", reporter = { e => log.warn("Exception in ELUpdater", e) })

Expand Down
93 changes: 48 additions & 45 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ELUpdater(

def executionPayloadReceived(epi: ExecutionPayloadInfo): Unit = scheduler.execute { () =>
val payload = epi.payload
logger.debug(s"New block ${payload.hash}->${payload.parentHash} (timestamp=${payload.timestamp}, height=${payload.height}) appeared")
logger.debug(s"New payload for block ${payload.hash}->${payload.parentHash} (timestamp=${payload.timestamp}, height=${payload.height}) appeared")

val now = time.correctedTime() / 1000
if (payload.timestamp - now <= MaxTimeDrift) {
Expand Down Expand Up @@ -90,19 +90,19 @@ class ELUpdater(
case Some(rInfo) if rInfo.missedBlock.hash == payload.hash =>
chainContractClient.getChainInfo(rInfo.chainId) match {
case Some(chainInfo) if chainInfo.isMain =>
validateAndApplyMissedBlock(epi, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo)
validateAndApplyMissed(epi, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo)
case Some(_) =>
logger.debug(s"Chain ${rInfo.chainId} is not main anymore, ignoring ${payload.hash}")
case _ =>
logger.error(s"Failed to get chain ${rInfo.chainId} info, ignoring ${payload.hash}")
}
case _ => logger.debug(s"Expecting ${w.returnToMainChainInfo.fold("no block")(_.toString)}, ignoring unexpected ${payload.hash}")
case _ => logger.debug(s"Expecting ${w.returnToMainChainInfo.fold("no block payload")(_.toString)}, ignoring unexpected ${payload.hash}")
}
case other =>
logger.debug(s"$other: ignoring ${payload.hash}")
}
} else {
logger.debug(s"Block ${payload.hash} is from future: timestamp=${payload.timestamp}, now=$now, Δ${payload.timestamp - now}s")
logger.debug(s"Payload for block ${payload.hash} is from future: timestamp=${payload.timestamp}, now=$now, Δ${payload.timestamp - now}s")
}
}

Expand Down Expand Up @@ -453,7 +453,7 @@ class ELUpdater(
lastElWithdrawalIndex = None
)
val options = chainContractClient.getOptions
followChainAndRequestNextBlock(
followChainAndRequestNextBlockPayload(
newEpochInfo,
mainChainInfo,
lastPayload,
Expand All @@ -468,8 +468,8 @@ class ELUpdater(
_ => ()
)
case Right(None) =>
logger.trace(s"Finalized block ${finalizedBlock.hash} payload is not in EC, requesting block from peers")
setState("15", WaitingForSyncHead(finalizedBlock, requestAndProcessPayload(finalizedBlock.hash)))
logger.trace(s"Finalized block ${finalizedBlock.hash} payload is not in EC, requesting from peers")
setState("15", WaitingForSyncHead(finalizedBlock, requestAndProcessBlockPayload(finalizedBlock.hash)))
}
}
}
Expand Down Expand Up @@ -509,12 +509,12 @@ class ELUpdater(
}
}

private def requestBlocksAndStartMining(prevState: Working[FollowingChain]): Unit = {
private def requestPayloadsAndStartMining(prevState: Working[FollowingChain]): Unit = {
def check(missedBlock: ContractBlock): Unit = {
state match {
case w @ Working(epochInfo, lastPayload, finalizedBlock, mainChainInfo, _, fc: FollowingChain, _, returnToMainChainInfo)
if fc.nextExpectedBlock.map(_.hash).contains(missedBlock.hash) && canSupportAnotherAltChain(fc.nodeChainInfo) =>
logger.debug(s"Block ${missedBlock.hash} wasn't received for $WaitRequestedBlockTimeout, need to switch to alternative chain")
logger.debug(s"Block ${missedBlock.hash} payload wasn't received for $WaitRequestedPayloadTimeout, need to switch to alternative chain")
(for {
lastValidBlock <- getAltChainReferenceBlock(fc.nodeChainInfo, missedBlock)
updatedState <- rollbackTo(w, lastValidBlock, finalizedBlock)
Expand Down Expand Up @@ -556,8 +556,8 @@ class ELUpdater(
case w: Working[ChainStatus] =>
w.chainStatus match {
case FollowingChain(_, Some(nextExpectedBlock)) =>
logger.debug(s"Waiting for block $nextExpectedBlock from peers")
scheduler.scheduleOnce(WaitRequestedBlockTimeout) {
logger.debug(s"Waiting for block $nextExpectedBlock payload from peers")
scheduler.scheduleOnce(WaitRequestedPayloadTimeout) {
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
Expand All @@ -574,7 +574,7 @@ class ELUpdater(

prevState.chainStatus.nextExpectedBlock match {
case Some(missedBlock) =>
scheduler.scheduleOnce(WaitRequestedBlockTimeout) {
scheduler.scheduleOnce(WaitRequestedPayloadTimeout) {
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
Expand All @@ -600,7 +600,7 @@ class ELUpdater(
finalizedBlock,
options
).foreach { newState =>
requestBlocksAndStartMining(newState)
requestPayloadsAndStartMining(newState)
}
}

Expand Down Expand Up @@ -703,14 +703,14 @@ class ELUpdater(
}
}
validateAppliedBlocks()
requestMainChainBlock()
requestMainChainBlockPayload()
case Right(None) =>
logger.trace(s"Finalized block ${finalizedBlock.hash} payload is not in EC, requesting block from peers")
setState("19", WaitingForSyncHead(finalizedBlock, requestAndProcessPayload(finalizedBlock.hash)))
logger.trace(s"Finalized block ${finalizedBlock.hash} payload is not in EC, requesting from peers")
setState("19", WaitingForSyncHead(finalizedBlock, requestAndProcessBlockPayload(finalizedBlock.hash)))
}
}

private def followChainAndRequestNextBlock(
private def followChainAndRequestNextBlockPayload(
epochInfo: EpochInfo,
nodeChainInfo: ChainInfo,
lastPayload: ExecutionPayload,
Expand All @@ -731,29 +731,29 @@ class ELUpdater(
returnToMainChainInfo
)
setState("3", newState)
maybeRequestNextBlock(newState, finalizedBlock)
maybeRequestNextBlockPayload(newState, finalizedBlock)
}

private def requestPayload(contractBlock: ContractBlock): PayloadRequestResult = {
private def requestBlockPayload(contractBlock: ContractBlock): PayloadRequestResult = {
logger.debug(s"Requesting payload for block ${contractBlock.hash}")
engineApiClient.getBlockByHash(contractBlock.hash) match {
case Right(Some(payload)) => PayloadRequestResult.Exists(payload)
case Right(None) =>
requestAndProcessPayload(contractBlock.hash)
requestAndProcessBlockPayload(contractBlock.hash)
PayloadRequestResult.Requested(contractBlock)
case Left(err) =>
logger.warn(s"Failed to get block ${contractBlock.hash} payload by hash: ${err.message}")
requestAndProcessPayload(contractBlock.hash)
requestAndProcessBlockPayload(contractBlock.hash)
PayloadRequestResult.Requested(contractBlock)
}
}

private def requestMainChainBlock(): Unit = {
private def requestMainChainBlockPayload(): Unit = {
state match {
case w: Working[ChainStatus] =>
w.returnToMainChainInfo.foreach { returnToMainChainInfo =>
if (w.mainChainInfo.id == returnToMainChainInfo.chainId) {
requestPayload(returnToMainChainInfo.missedBlock) match {
requestBlockPayload(returnToMainChainInfo.missedBlock) match {
case PayloadRequestResult.Exists(payload) =>
logger.debug(s"Block ${returnToMainChainInfo.missedBlock.hash} payload exists at execution chain, trying to validate")
validateAppliedBlock(returnToMainChainInfo.missedBlock, payload, w) match {
Expand All @@ -776,7 +776,7 @@ class ELUpdater(
}
}

private def requestAndProcessPayload(hash: BlockHash): CancelableFuture[ExecutionPayloadInfo] = {
private def requestAndProcessBlockPayload(hash: BlockHash): CancelableFuture[ExecutionPayloadInfo] = {
payloadObserver
.loadPayload(hash)
.andThen {
Expand Down Expand Up @@ -828,7 +828,7 @@ class ELUpdater(
returnToMainChainInfo.filter(rInfo => rInfo.chainId != prevChainId && mainChainInfo.id == rInfo.chainId)
)
setState("16", newState)
maybeRequestNextBlock(newState, finalizedContractBlock)
maybeRequestNextBlockPayload(newState, finalizedContractBlock)
}

def rollbackAndFollowChain(
Expand Down Expand Up @@ -903,7 +903,7 @@ class ELUpdater(
lastValidatedBlock = target,
lastElWithdrawalIndex = None
)
followChainAndRequestNextBlock(
followChainAndRequestNextBlockPayload(
newEpochInfo,
mainChainInfo,
lastPayload,
Expand Down Expand Up @@ -994,16 +994,16 @@ class ELUpdater(
.toRight(ClientError(s"Can't find a last block $referenceBlockHash of epoch #${lastEpoch.prevEpoch} on contract"))
} yield referenceBlock
} else {
val blockId = nodeChainInfo.firstBlock.parentHash
val blockHash = nodeChainInfo.firstBlock.parentHash
chainContractClient
.getBlock(blockId)
.getBlock(blockHash)
.toRight(
ClientError(s"Parent block $blockId for first block ${nodeChainInfo.firstBlock.hash} of chain ${nodeChainInfo.id} not found at contract")
ClientError(s"Parent block $blockHash for first block ${nodeChainInfo.firstBlock.hash} of chain ${nodeChainInfo.id} not found at contract")
)
}
}

private def validateAndApplyMissedBlock(
private def validateAndApplyMissed(
epi: ExecutionPayloadInfo,
prevState: Working[ChainStatus],
contractBlock: ContractBlock,
Expand Down Expand Up @@ -1065,7 +1065,7 @@ class ELUpdater(
err => logger.error(s"Can't confirm block ${payload.hash} of chain ${nodeChainInfo.id}: ${err.message}"),
_ => {
logger.info(s"Successfully confirmed block ${payload.hash} of chain ${nodeChainInfo.id}")
followChainAndRequestNextBlock(
followChainAndRequestNextBlockPayload(
prevState.epochInfo,
nodeChainInfo,
payload,
Expand Down Expand Up @@ -1103,15 +1103,15 @@ class ELUpdater(
}

@tailrec
private def maybeRequestNextBlock(prevState: Working[FollowingChain], finalizedBlock: ContractBlock): Working[FollowingChain] = {
private def maybeRequestNextBlockPayload(prevState: Working[FollowingChain], finalizedBlock: ContractBlock): Working[FollowingChain] = {
if (prevState.lastPayload.height < prevState.chainStatus.nodeChainInfo.lastBlock.height) {
logger.debug(s"EC chain is not synced, trying to find next block to request")
logger.debug(s"EC chain is not synced, trying to find next block to request payload")
findBlockChild(prevState.lastPayload.hash, prevState.chainStatus.nodeChainInfo.lastBlock.hash) match {
case Left(error) =>
logger.error(s"Could not find child of ${prevState.lastPayload.hash} on contract: $error")
prevState
case Right(contractBlock) =>
requestPayload(contractBlock) match {
requestBlockPayload(contractBlock) match {
case PayloadRequestResult.Exists(payload) =>
logger.debug(s"Block ${contractBlock.hash} payload exists at EC chain, trying to confirm")
confirmBlock(payload, finalizedBlock) match {
Expand All @@ -1121,7 +1121,7 @@ class ELUpdater(
chainStatus = FollowingChain(prevState.chainStatus.nodeChainInfo, None)
)
setState("7", newState)
maybeRequestNextBlock(newState, finalizedBlock)
maybeRequestNextBlockPayload(newState, finalizedBlock)
case Left(err) =>
logger.error(s"Failed to confirm next block ${payload.hash}: ${err.message}")
prevState
Expand All @@ -1133,19 +1133,22 @@ class ELUpdater(
}
}
} else {
logger.trace(s"EC chain ${prevState.chainStatus.nodeChainInfo.id} is synced, no need to request blocks")
logger.trace(s"EC chain ${prevState.chainStatus.nodeChainInfo.id} is synced, no need to request block payloads")
prevState
}
}

private def mkRollbackBlock(rollbackTargetBlockId: BlockHash): JobResult[RollbackBlock] = for {
targetBlockDataOpt <- chainContractClient.getBlock(rollbackTargetBlockId) match {
case None => engineApiClient.getBlockByHash(rollbackTargetBlockId)
private def mkRollbackBlock(rollbackTargetBlockHash: BlockHash): JobResult[RollbackBlock] = for {
targetBlockDataOpt <- chainContractClient.getBlock(rollbackTargetBlockHash) match {
case None => engineApiClient.getBlockByHash(rollbackTargetBlockHash)
case x => Right(x)
}
targetBlockData <- Either.fromOption(targetBlockDataOpt, ClientError(s"Can't find block $rollbackTargetBlockId neither on a contract, nor in EC"))
targetBlockData <- Either.fromOption(
targetBlockDataOpt,
ClientError(s"Can't find block $rollbackTargetBlockHash neither on a contract, nor in EC")
)
parentPayloadOpt <- engineApiClient.getBlockByHash(targetBlockData.parentHash)
parentPayload <- Either.fromOption(parentPayloadOpt, ClientError(s"Can't find block $rollbackTargetBlockId parent payload in execution client"))
parentPayload <- Either.fromOption(parentPayloadOpt, ClientError(s"Can't find block $rollbackTargetBlockHash parent payload in execution client"))
rollbackBlockOpt <- engineApiClient.applyNewPayload(EmptyPayload.mkExecutionPayloadJson(parentPayload))
rollbackBlock <- Either.fromOption(rollbackBlockOpt, ClientError("Rollback block hash is not defined as latest valid hash"))
} yield RollbackBlock(rollbackBlock, parentPayload)
Expand Down Expand Up @@ -1186,7 +1189,7 @@ class ELUpdater(
}
} yield rootHash

private def skipFinalizedBlocksValidation(curState: Working[ChainStatus]) = {
private def skipFinalizedBlocksValidation(curState: Working[ChainStatus]): Working[ChainStatus] = {
if (curState.finalizedBlock.height > curState.fullValidationStatus.lastValidatedBlock.height) {
val newState = curState.copy(fullValidationStatus = FullValidationStatus(curState.finalizedBlock, None))
setState("4", newState)
Expand Down Expand Up @@ -1497,7 +1500,7 @@ object ELUpdater {
val WaitForReferenceConfirmInterval: FiniteDuration = 500.millis
val ClChangedProcessingDelay: FiniteDuration = 50.millis
val MiningRetryInterval: FiniteDuration = 5.seconds
val WaitRequestedBlockTimeout: FiniteDuration = 2.seconds
val WaitRequestedPayloadTimeout: FiniteDuration = 2.seconds

case class EpochInfo(number: Int, miner: Address, rewardAddress: EthAddress, hitSource: ByteStr, prevEpochLastBlockHash: Option[BlockHash])

Expand Down Expand Up @@ -1551,8 +1554,8 @@ object ELUpdater {

case class ChainSwitchInfo(prevChainId: Long, referenceBlock: ContractBlock)

/** We haven't received a EC-block {@link missedBlock} of a previous epoch when started a mining on a new epoch. We can return to the main chain, if
* get a missed EC-block.
/** We haven't received block payload of a previous epoch when started a mining on a new epoch. We can return to the main chain, if get a missed
* block payload.
*/
case class ReturnToMainChainInfo(missedBlock: ContractBlock, missedBlockParentPayload: ExecutionPayload, chainId: Long)

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/units/network/HistoryReplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class HistoryReplier(engineApiClient: EngineApiClient)(implicit sc: Scheduler) e
ctx,
loadPayload(hash)
.map {
case Right(block) =>
RawBytes(PayloadSpec.messageCode, PayloadSpec.serializeData(block))
case Left(err) => throw new NoSuchElementException(s"Error loading block $hash: $err")
case Right(payloadMsg) =>
RawBytes(PayloadSpec.messageCode, PayloadSpec.serializeData(payloadMsg))
case Left(err) => throw new NoSuchElementException(s"Error loading block $hash payload: $err")
}
)
case _ => super.channelRead(ctx, msg)
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/units/network/PayloadMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ object PayloadMessage {
.leftMap(err => s"Error creating payload message: $err")

def apply(payloadBytes: Array[Byte], signature: Option[ByteStr]): Either[String, PayloadMessage] = for {
payload <- Try(Json.parse(payloadBytes).as[JsObject]).toEither.leftMap(err => s"Payload bytes are not a valid JSON object: ${err.getMessage}")
block <- apply(payload, signature)
} yield block
payload <- Try(Json.parse(payloadBytes).as[JsObject]).toEither.leftMap(err => s"Payload bytes are not a valid JSON object: ${err.getMessage}")
payloadMsg <- apply(payload, signature)
} yield payloadMsg

def signed(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage] = {
val signature = crypto.sign(signer, Json.toBytes(payloadJson))
Expand All @@ -99,5 +99,5 @@ object PayloadMessage {
}

private def validateSignatureLength(signature: Option[ByteStr]): Either[String, Unit] =
Either.cond(signature.forall(_.size == SignatureLength), (), "Invalid block signature size")
Either.cond(signature.forall(_.size == SignatureLength), (), "Invalid payload signature size")
}
4 changes: 0 additions & 4 deletions src/main/scala/units/util/HexBytesConverter.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package units.util

import com.wavesplatform.common.state.ByteStr
import units.BlockHash
import org.web3j.abi.datatypes.generated.Uint256
import org.web3j.utils.Numeric

import java.math.BigInteger

object HexBytesConverter {

def toByteStr(hash: BlockHash): ByteStr =
ByteStr(toBytes(hash))

def toInt(intHex: String): Int =
Numeric.toBigInt(intHex).intValueExact()

Expand Down
Loading

0 comments on commit 2ff6c0f

Please sign in to comment.