Skip to content

Commit

Permalink
Better blocks validation (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-mashonskiy authored Sep 17, 2024
1 parent 6d10b34 commit 954d308
Show file tree
Hide file tree
Showing 21 changed files with 709 additions and 756 deletions.
1 change: 0 additions & 1 deletion src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class ConsensusClient(

def shutdown(): Future[Unit] = Future {
blocksStreamCancelable.cancel()
elu.close()
ownedResources.close()
}(globalScheduler)

Expand Down
973 changes: 474 additions & 499 deletions src/main/scala/units/ELUpdater.scala

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/main/scala/units/NetworkL2Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class NetworkL2Block private (
height = height,
timestamp = timestamp,
minerRewardL2Address = minerRewardL2Address,
baseFeePerGas = baseFeePerGas,
gasLimit = gasLimit,
gasUsed = gasUsed,
withdrawals = withdrawals,
baseFeePerGas = baseFeePerGas
prevRandao = prevRandao,
withdrawals = withdrawals
)

override def toString: String = s"NetworkL2Block($hash)"
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/units/client/contract/ChainContractClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ trait ChainContractClient {
val clGenerator = ByteStr(bb.getByteArray(Address.AddressLength))
val chainId = if (bb.remaining() >= 8) bb.getLong() else 0L

val e2CTransfersRootHash =
if (bb.remaining() >= ContractBlock.ElToClTransfersRootHashLength) bb.getByteArray(ContractBlock.ElToClTransfersRootHashLength)
val e2cTransfersRootHash =
if (bb.remaining() >= ContractBlock.E2CTransfersRootHashLength) bb.getByteArray(ContractBlock.E2CTransfersRootHashLength)
else Array.emptyByteArray

val lastC2ETransferIndex = if (bb.remaining() >= 8) bb.getLong() else -1L
Expand All @@ -76,7 +76,7 @@ trait ChainContractClient {
!bb.hasRemaining,
s"Not parsed ${bb.remaining()} bytes from ${blockMeta.base64}, read data: " +
s"chainHeight=$chainHeight, epoch=$epoch, parentHash=$parentHash, clGenerator=$clGenerator, chainId=$chainId, " +
s"e2CTransfersRootHash=${HexBytesConverter.toHex(e2CTransfersRootHash)}, lastC2ETransferIndex=$lastC2ETransferIndex"
s"e2cTransfersRootHash=${HexBytesConverter.toHex(e2cTransfersRootHash)}, lastC2ETransferIndex=$lastC2ETransferIndex"
)

val minerRewardElAddress =
Expand All @@ -91,7 +91,7 @@ trait ChainContractClient {
clGenerator,
minerRewardElAddress,
chainId,
e2CTransfersRootHash,
e2cTransfersRootHash,
lastC2ETransferIndex
)
} catch {
Expand Down Expand Up @@ -290,7 +290,7 @@ object ChainContractClient {
private val BlockHashBytesSize = 32
private val Sep = ","

val MaxClToElTransfers = 16
val MaxC2ETransfers = 16

private class InconsistentContractData(message: String, cause: Throwable = null)
extends IllegalStateException(s"Probably, your have to upgrade your client. $message", cause)
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/units/client/contract/ContractBlock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ case class ContractBlock(
generator: ByteStr,
minerRewardL2Address: EthAddress,
chainId: Long,
elToClTransfersRootHash: Digest,
lastClToElTransferIndex: Long
e2cTransfersRootHash: Digest,
lastC2ETransferIndex: Long
) extends L2BlockLike {
override def toString: String =
s"ContractBlock($hash, p=$parentHash, e=$epoch, h=$height, m=$minerRewardL2Address ($generator), c=$chainId, " +
s"e2c=${if (elToClTransfersRootHash.isEmpty) "" else toHex(elToClTransfersRootHash)}, c2e=$lastClToElTransferIndex)"
s"e2c=${if (e2cTransfersRootHash.isEmpty) "" else toHex(e2cTransfersRootHash)}, c2e=$lastC2ETransferIndex)"
}

object ContractBlock {
val ElToClTransfersRootHashLength = 32 // bytes
val E2CTransfersRootHashLength = 32 // bytes
}
6 changes: 3 additions & 3 deletions src/main/scala/units/client/contract/ContractFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import com.wavesplatform.lang.v1.FunctionHeader
import com.wavesplatform.lang.v1.compiler.Terms.{CONST_BYTESTR, CONST_LONG, CONST_STRING, EVALUATED, FUNCTION_CALL}
import org.web3j.utils.Numeric.cleanHexPrefix
import units.util.HexBytesConverter.toHexNoPrefix
import units.{BlockHash, ClientError, Job}
import units.{BlockHash, ClientError, JobResult}

abstract class ContractFunction(name: String, reference: BlockHash, extraArgs: Either[CommonError, List[EVALUATED]]) {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastClToElTransferIndex: Long): Job[FUNCTION_CALL] = (for {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): JobResult[FUNCTION_CALL] = (for {
hash <- CONST_STRING(cleanHexPrefix(blockHash))
ref <- CONST_STRING(cleanHexPrefix(reference))
trh <- CONST_STRING(toHexNoPrefix(transfersRootHash))
xtra <- extraArgs
} yield FUNCTION_CALL(
FunctionHeader.User(name),
List(hash, ref) ++ xtra ++ List(trh, CONST_LONG(lastClToElTransferIndex))
List(hash, ref) ++ xtra ++ List(trh, CONST_LONG(lastC2ETransferIndex))
)).leftMap(e => ClientError(s"Error building function call for $name: $e"))
}

Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/units/client/engine/EngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import play.api.libs.json.*
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, Job}
import units.{BlockHash, JobResult}

trait EngineApiClient {
def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] // TODO Replace String with an appropriate type
def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[String] // TODO Replace String with an appropriate type

def forkChoiceUpdateWithPayloadId(
lastBlockHash: BlockHash,
Expand All @@ -16,25 +16,25 @@ trait EngineApiClient {
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal] = Vector.empty
): Job[PayloadId]
): JobResult[PayloadId]

def getPayload(payloadId: PayloadId): Job[JsObject]
def getPayload(payloadId: PayloadId): JobResult[JsObject]

def applyNewPayload(payload: JsObject): Job[Option[BlockHash]]
def applyNewPayload(payload: JsObject): JobResult[Option[BlockHash]]

def getPayloadBodyByHash(hash: BlockHash): Job[Option[JsObject]]
def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]]

def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]]
def getBlockByNumber(number: BlockNumber): JobResult[Option[EcBlock]]

def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]]
def getBlockByHash(hash: BlockHash): JobResult[Option[EcBlock]]

def getBlockByHashJson(hash: BlockHash): Job[Option[JsObject]]
def getBlockByHashJson(hash: BlockHash): JobResult[Option[JsObject]]

def getLastExecutionBlock: Job[EcBlock]
def getLastExecutionBlock: JobResult[EcBlock]

def blockExists(hash: BlockHash): Job[Boolean]
def blockExists(hash: BlockHash): JobResult[Boolean]

def getLogs(hash: BlockHash, address: EthAddress, topic: String): Job[List[GetLogsResponseEntry]]
def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]]
}

object EngineApiClient {
Expand Down
28 changes: 14 additions & 14 deletions src/main/scala/units/client/engine/HttpEngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import units.client.engine.HttpEngineApiClient.*
import units.client.engine.model.*
import units.client.engine.model.ForkChoiceUpdatedRequest.ForkChoiceAttributes
import units.eth.EthAddress
import units.{BlockHash, ClientConfig, ClientError, Job}
import units.{BlockHash, ClientConfig, ClientError, JobResult}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Identity, ?]) extends EngineApiClient with JsonRpcClient {

val apiUrl: Uri = uri"${config.executionClientAddress}"

def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] = {
def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[String] = {
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](
ForkChoiceUpdatedRequest(blockHash, finalizedBlockHash, None),
BlockExecutionTimeout
Expand All @@ -39,7 +39,7 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal]
): Job[PayloadId] = {
): JobResult[PayloadId] = {
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](
ForkChoiceUpdatedRequest(
lastBlockHash,
Expand All @@ -59,11 +59,11 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
}
}

def getPayload(payloadId: PayloadId): Job[JsObject] = {
def getPayload(payloadId: PayloadId): JobResult[JsObject] = {
sendEngineRequest[GetPayloadRequest, GetPayloadResponse](GetPayloadRequest(payloadId), NonBlockExecutionTimeout).map(_.executionPayload)
}

def applyNewPayload(payload: JsObject): Job[Option[BlockHash]] = {
def applyNewPayload(payload: JsObject): JobResult[Option[BlockHash]] = {
sendEngineRequest[NewPayloadRequest, PayloadStatus](NewPayloadRequest(payload), BlockExecutionTimeout).flatMap {
case PayloadStatus(_, _, Some(validationError)) => Left(ClientError(s"Payload validation error: $validationError"))
case PayloadStatus(status, Some(latestValidHash), _) if status == "VALID" => Right(Some(latestValidHash))
Expand All @@ -73,47 +73,47 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
}
}

def getPayloadBodyByHash(hash: BlockHash): Job[Option[JsObject]] = {
def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] = {
sendEngineRequest[GetPayloadBodyByHash, JsArray](GetPayloadBodyByHash(hash), NonBlockExecutionTimeout)
.map(_.value.headOption.flatMap(_.asOpt[JsObject]))
}

def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]] = {
def getBlockByNumber(number: BlockNumber): JobResult[Option[EcBlock]] = {
for {
json <- getBlockByNumberJson(number.str)
blockMeta <- json.traverse(parseJson[EcBlock](_))
} yield blockMeta
}

def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]] = {
def getBlockByHash(hash: BlockHash): JobResult[Option[EcBlock]] = {
sendRequest[GetBlockByHashRequest, EcBlock](GetBlockByHashRequest(hash))
.leftMap(err => ClientError(s"Error getting block by hash $hash: $err"))
}

def getBlockByHashJson(hash: BlockHash): Job[Option[JsObject]] = {
def getBlockByHashJson(hash: BlockHash): JobResult[Option[JsObject]] = {
sendRequest[GetBlockByHashRequest, JsObject](GetBlockByHashRequest(hash))
.leftMap(err => ClientError(s"Error getting block json by hash $hash: $err"))
}

def getLastExecutionBlock: Job[EcBlock] = for {
def getLastExecutionBlock: JobResult[EcBlock] = for {
lastEcBlockOpt <- getBlockByNumber(BlockNumber.Latest)
lastEcBlock <- Either.fromOption(lastEcBlockOpt, ClientError("Impossible: EC doesn't have blocks"))
} yield lastEcBlock

def blockExists(hash: BlockHash): Job[Boolean] =
def blockExists(hash: BlockHash): JobResult[Boolean] =
getBlockByHash(hash).map(_.isDefined)

private def getBlockByNumberJson(number: String): Job[Option[JsObject]] = {
private def getBlockByNumberJson(number: String): JobResult[Option[JsObject]] = {
sendRequest[GetBlockByNumberRequest, JsObject](GetBlockByNumberRequest(number))
.leftMap(err => ClientError(s"Error getting block by number $number: $err"))
}

override def getLogs(hash: BlockHash, address: EthAddress, topic: String): Job[List[GetLogsResponseEntry]] =
override def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] =
sendRequest[GetLogsRequest, List[GetLogsResponseEntry]](GetLogsRequest(hash, address, List(topic)))
.leftMap(err => ClientError(s"Error getting block logs by hash $hash: $err"))
.map(_.getOrElse(List.empty))

private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): Job[B] = {
private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): JobResult[B] = {
sendRequest(request, timeout) match {
case Right(response) => response.toRight(ClientError(s"Unexpected engine API empty response"))
case Left(err) => Left(ClientError(s"Engine API request error: $err"))
Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/units/client/engine/LoggedEngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.LoggedEngineApiClient.excludedJsonFields
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, Job}
import units.{BlockHash, JobResult}

import java.util.concurrent.ThreadLocalRandom
import scala.util.chaining.scalaUtilChainingOps

class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient {
protected val log = LoggerFacade(LoggerFactory.getLogger(underlying.getClass))

override def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] =
override def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[String] =
wrap(s"forkChoiceUpdate($blockHash, f=$finalizedBlockHash)", underlying.forkChoiceUpdate(blockHash, finalizedBlockHash))

override def forkChoiceUpdateWithPayloadId(
Expand All @@ -25,40 +25,40 @@ class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal]
): Job[PayloadId] = wrap(
): JobResult[PayloadId] = wrap(
s"forkChoiceUpdateWithPayloadId(l=$lastBlockHash, f=$finalizedBlockHash, ts=$unixEpochSeconds, m=$suggestedFeeRecipient, " +
s"r=$prevRandao, w={${withdrawals.mkString(", ")}}",
underlying.forkChoiceUpdateWithPayloadId(lastBlockHash, finalizedBlockHash, unixEpochSeconds, suggestedFeeRecipient, prevRandao, withdrawals)
)

override def getPayload(payloadId: PayloadId): Job[JsObject] =
override def getPayload(payloadId: PayloadId): JobResult[JsObject] =
wrap(s"getPayload($payloadId)", underlying.getPayload(payloadId), filteredJson)

override def applyNewPayload(payload: JsObject): Job[Option[BlockHash]] =
override def applyNewPayload(payload: JsObject): JobResult[Option[BlockHash]] =
wrap(s"applyNewPayload(${filteredJson(payload)})", underlying.applyNewPayload(payload), _.fold("None")(_.toString))

override def getPayloadBodyByHash(hash: BlockHash): Job[Option[JsObject]] =
override def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] =
wrap(s"getPayloadBodyByHash($hash)", underlying.getPayloadBodyByHash(hash), _.fold("None")(filteredJson))

override def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]] =
override def getBlockByNumber(number: BlockNumber): JobResult[Option[EcBlock]] =
wrap(s"getBlockByNumber($number)", underlying.getBlockByNumber(number), _.fold("None")(_.toString))

override def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]] =
override def getBlockByHash(hash: BlockHash): JobResult[Option[EcBlock]] =
wrap(s"getBlockByHash($hash)", underlying.getBlockByHash(hash), _.fold("None")(_.toString))

override def getBlockByHashJson(hash: BlockHash): Job[Option[JsObject]] =
override def getBlockByHashJson(hash: BlockHash): JobResult[Option[JsObject]] =
wrap(s"getBlockByHashJson($hash)", underlying.getBlockByHashJson(hash), _.fold("None")(filteredJson))

override def getLastExecutionBlock: Job[EcBlock] =
override def getLastExecutionBlock: JobResult[EcBlock] =
wrap("getLastExecutionBlock", underlying.getLastExecutionBlock)

override def blockExists(hash: BlockHash): Job[Boolean] =
override def blockExists(hash: BlockHash): JobResult[Boolean] =
wrap(s"blockExists($hash)", underlying.blockExists(hash))

override def getLogs(hash: BlockHash, address: EthAddress, topic: String): Job[List[GetLogsResponseEntry]] =
override def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] =
wrap(s"getLogs($hash, a=$address, t=$topic)", underlying.getLogs(hash, address, topic), _.view.map(_.data).mkString("{", ", ", "}"))

protected def wrap[R](method: String, f: => Job[R], toMsg: R => String = (_: R).toString): Job[R] = {
protected def wrap[R](method: String, f: => JobResult[R], toMsg: R => String = (_: R).toString): JobResult[R] = {
val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString
log.debug(s"[$currRequestId] $method")

Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/units/client/engine/model/EcBlock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ case class EcBlock(
baseFeePerGas: Uint256,
gasLimit: Long,
gasUsed: Long,
prevRandao: String,
withdrawals: Vector[Withdrawal]
) extends L2BlockLike {
override def toString: String =
Expand All @@ -34,13 +35,14 @@ object EcBlock {
implicit val reads: Reads[EcBlock] = (
(JsPath \ "hash").read[BlockHash] and
(JsPath \ "parentHash").read[BlockHash] and
(JsPath \ "stateRoot").read[BlockHash] and
(JsPath \ "stateRoot").read[String] and
(JsPath \ "number").read[String].map(toLong) and
(JsPath \ "timestamp").read[String].map(toLong) and
(JsPath \ "miner").read[EthAddress] and
(JsPath \ "baseFeePerGas").read[String].map(toUint256) and
(JsPath \ "gasLimit").read[String].map(toLong) and
(JsPath \ "gasUsed").read[String].map(toLong) and
(JsPath \ "mixHash").read[String] and
(JsPath \ "withdrawals").readWithDefault(Vector.empty[Withdrawal])
)(EcBlock.apply _)
}
12 changes: 6 additions & 6 deletions src/main/scala/units/network/NetworkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import java.util.concurrent.ConcurrentHashMap
object NetworkServer {

def apply(
settings: ClientConfig,
historyReplier: HistoryReplier,
peerDatabase: PeerDatabase,
messageObserver: MessageObserver,
allChannels: ChannelGroup,
peerInfo: ConcurrentHashMap[Channel, PeerInfo]
settings: ClientConfig,
historyReplier: HistoryReplier,
peerDatabase: PeerDatabase,
messageObserver: MessageObserver,
allChannels: ChannelGroup,
peerInfo: ConcurrentHashMap[Channel, PeerInfo]
): NS = {
val applicationName = s"${Constants.ApplicationName}l2-${settings.chainContract.take(8)}"

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/units/package.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package object units {
type BlockHash = BlockHash.Type
type Job[A] = Either[ClientError, A]
type BlockHash = BlockHash.Type
type JobResult[A] = Either[ClientError, A]
}
Loading

0 comments on commit 954d308

Please sign in to comment.