Skip to content

Commit

Permalink
Signature is on network level
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-mashonskiy committed Sep 26, 2024
1 parent 9208c16 commit fa06b2b
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 85 deletions.
7 changes: 2 additions & 5 deletions src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ConsensusClient(
context: ExtensionContext,
engineApiClient: EngineApiClient,
payloadObserver: PayloadObserver,
allChannels: DefaultChannelGroup,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
ownedResources: AutoCloseable
Expand All @@ -41,7 +40,6 @@ class ConsensusClient(
context,
deps.engineApiClient,
deps.payloadObserver,
deps.allChannels,
deps.globalScheduler,
deps.eluScheduler,
deps
Expand All @@ -54,18 +52,17 @@ class ConsensusClient(
engineApiClient,
context.blockchain,
context.utx,
allChannels,
payloadObserver,
config,
context.time,
context.wallet,
payloadObserver.loadPayload,
context.broadcastTransaction,
eluScheduler,
globalScheduler
)

private val payloadsStreamCancelable: CancelableFuture[Unit] =
payloadObserver.getPayloadStream.foreach { case (ch, ep) => elu.executionPayloadReceived(ep, ch) }(globalScheduler)
payloadObserver.getPayloadStream.foreach(elu.executionPayloadReceived)(globalScheduler)

override def start(): Unit = {}

Expand Down
50 changes: 17 additions & 33 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.crypto
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.lang.v1.compiler.Terms.FUNCTION_CALL
import com.wavesplatform.network.ChannelGroupExt
import com.wavesplatform.state.Blockchain
import com.wavesplatform.state.diffs.FeeValidation.{FeeConstants, FeeUnit, ScriptExtraFee}
import com.wavesplatform.state.diffs.TransactionDiffer.TransactionValidationError
Expand All @@ -21,7 +20,6 @@ import com.wavesplatform.utils.{EthEncoding, Time, UnsupportedFeature, forceStop
import com.wavesplatform.utx.UtxPool
import com.wavesplatform.wallet.Wallet
import io.netty.channel.Channel
import io.netty.channel.group.DefaultChannelGroup
import monix.execution.cancelables.SerialCancelable
import monix.execution.{CancelableFuture, Scheduler}
import play.api.libs.json.*
Expand All @@ -34,7 +32,7 @@ import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.client.engine.model.Withdrawal.WithdrawalIndex
import units.eth.{EmptyPayload, EthAddress, EthereumConstants}
import units.network.PayloadMessage
import units.network.PayloadObserver
import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import units.util.HexBytesConverter
import units.util.HexBytesConverter.toHexNoPrefix
Expand All @@ -47,11 +45,10 @@ class ELUpdater(
engineApiClient: EngineApiClient,
blockchain: Blockchain,
utx: UtxPool,
allChannels: DefaultChannelGroup,
payloadObserver: PayloadObserver,
config: ClientConfig,
time: Time,
wallet: Wallet,
requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadInfoWithChannel],
broadcastTx: Transaction => TracedResult[ValidationError, Boolean],
scheduler: Scheduler,
globalScheduler: Scheduler
Expand All @@ -67,7 +64,7 @@ class ELUpdater(
def consensusLayerChanged(): Unit =
handleNextUpdate := scheduler.scheduleOnce(ClChangedProcessingDelay)(handleConsensusLayerChanged())

def executionPayloadReceived(epi: ExecutionPayloadInfo, ch: Channel): Unit = scheduler.execute { () =>
def executionPayloadReceived(epi: ExecutionPayloadInfo): Unit = scheduler.execute { () =>
val payload = epi.payload
logger.debug(s"New block ${payload.hash}->${payload.parentHash} (timestamp=${payload.timestamp}, height=${payload.height}) appeared")

Expand All @@ -91,13 +88,13 @@ class ELUpdater(
}
case w @ Working(_, lastPayload, _, _, _, FollowingChain(nodeChainInfo, _), _, returnToMainChainInfo)
if payload.parentHash == lastPayload.hash =>
validateAndApply(epi, ch, w, lastPayload, nodeChainInfo, returnToMainChainInfo)
validateAndApply(epi, w, lastPayload, nodeChainInfo, returnToMainChainInfo)
case w: Working[ChainStatus] =>
w.returnToMainChainInfo match {
case Some(rInfo) if rInfo.missedBlock.hash == payload.hash =>
chainContractClient.getChainInfo(rInfo.chainId) match {
case Some(chainInfo) if chainInfo.isMain =>
validateAndApplyMissedBlock(epi, ch, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo)
validateAndApplyMissedBlock(epi, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo)
case Some(_) =>
logger.debug(s"Chain ${rInfo.chainId} is not main anymore, ignoring ${payload.hash}")
case _ =>
Expand Down Expand Up @@ -233,11 +230,7 @@ class ELUpdater(
latestValidHashOpt <- engineApiClient.applyNewPayload(payloadJson)
latestValidHash <- Either.fromOption(latestValidHashOpt, ClientError("Latest valid hash not defined"))
_ = logger.info(s"Applied payload $payloadId, block hash is $latestValidHash, timestamp = $timestamp")
newPm <- PayloadMessage.signed(payloadJson, m.keyPair.privateKey).leftMap(ClientError.apply)
_ = logger.debug(s"Broadcasting block ${newPm.hash} payload")
_ <- Try(allChannels.broadcast(newPm)).toEither.leftMap(err =>
ClientError(s"Failed to broadcast block ${newPm.hash} payload: ${err.toString}")
)
newPm <- payloadObserver.broadcastSigned(payloadJson, m.keyPair.privateKey).leftMap(ClientError.apply)
payloadInfo <- newPm.payloadInfo.leftMap(ClientError.apply)
payload = payloadInfo.payload
transfersRootHash <- getE2CTransfersRootHash(payload.hash, chainContractOptions.elBridgeAddress)
Expand Down Expand Up @@ -785,10 +778,12 @@ class ELUpdater(
}

private def requestAndProcessPayload(hash: BlockHash): CancelableFuture[(Channel, ExecutionPayloadInfo)] = {
requestPayloadFromPeers(hash).andThen {
case Success((ch, epi)) => executionPayloadReceived(epi, ch)
case Failure(exception) => logger.error(s"Error loading block $hash payload", exception)
}(globalScheduler)
payloadObserver
.loadPayload(hash)
.andThen {
case Success((ch, epi)) => executionPayloadReceived(epi)
case Failure(exception) => logger.error(s"Error loading block $hash payload", exception)
}(globalScheduler)
}

private def updateToFollowChain(
Expand Down Expand Up @@ -1023,7 +1018,6 @@ class ELUpdater(

private def validateAndApplyMissedBlock(
epi: ExecutionPayloadInfo,
ch: Channel,
prevState: Working[ChainStatus],
contractBlock: ContractBlock,
parentPayload: ExecutionPayload,
Expand All @@ -1033,15 +1027,14 @@ class ELUpdater(
validateBlockFull(epi, contractBlock, parentPayload, prevState) match {
case Right(updatedState) =>
logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} was successfully validated")
broadcastAndConfirmBlock(epi, ch, updatedState, nodeChainInfo, None)
broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, None)
case Left(err) =>
logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} validation error: ${err.message}, ignoring block")
}
}

private def validateAndApply(
epi: ExecutionPayloadInfo,
ch: Channel,
prevState: Working[ChainStatus],
parentPayload: ExecutionPayload,
nodeChainInfo: ChainInfo,
Expand All @@ -1054,7 +1047,7 @@ class ELUpdater(
validateBlockFull(epi, contractBlock, parentPayload, prevState) match {
case Right(updatedState) =>
logger.debug(s"Block ${payload.hash} was successfully validated")
broadcastAndConfirmBlock(epi, ch, updatedState, nodeChainInfo, returnToMainChainInfo)
broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, returnToMainChainInfo)
case Left(err) =>
logger.debug(s"Block ${payload.hash} validation error: ${err.message}")
processInvalidBlock(contractBlock, prevState, Some(nodeChainInfo))
Expand All @@ -1066,7 +1059,7 @@ class ELUpdater(
preValidateBlock(epi, parentPayload, epochInfo) match {
case Right(_) =>
logger.debug(s"Block ${payload.hash} was successfully partially validated")
broadcastAndConfirmBlock(epi, ch, prevState, nodeChainInfo, returnToMainChainInfo)
broadcastAndConfirmBlock(epi, prevState, nodeChainInfo, returnToMainChainInfo)
case Left(err) =>
logger.error(s"Block ${payload.hash} prevalidation error: ${err.message}, ignoring block")
}
Expand Down Expand Up @@ -1102,21 +1095,12 @@ class ELUpdater(

private def broadcastAndConfirmBlock(
epi: ExecutionPayloadInfo,
ch: Channel,
prevState: Working[ChainStatus],
nodeChainInfo: ChainInfo,
returnToMainChainInfo: Option[ReturnToMainChainInfo]
): Unit = {
val payload = epi.payload

(for {
pm <- PayloadMessage(epi.payloadJson, epi.signature)
_ <- Try(allChannels.broadcast(pm, Some(ch))).toEither.leftMap(_.getMessage)
} yield ()).recover { err =>
logger.error(s"Failed to broadcast block ${payload.hash} payload: $err")
}

confirmBlockAndFollowChain(payload, prevState, nodeChainInfo, returnToMainChainInfo)
payloadObserver.broadcast(epi.payload.hash)
confirmBlockAndFollowChain(epi.payload, prevState, nodeChainInfo, returnToMainChainInfo)
}

private def findBlockChild(parent: BlockHash, lastBlockHash: BlockHash): Either[String, ContractBlock] = {
Expand Down
10 changes: 3 additions & 7 deletions src/main/scala/units/network/MessageObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging {

private implicit val scheduler: SchedulerService = Schedulers.fixedPool(2, "message-observer-l2")

val payloads: Subject[(Channel, ExecutionPayloadInfo), (Channel, ExecutionPayloadInfo)] = ConcurrentSubject.publish[(Channel, ExecutionPayloadInfo)]
val payloads: Subject[PayloadMessageWithChannel, PayloadMessageWithChannel] = ConcurrentSubject.publish[PayloadMessageWithChannel]

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
case pm: PayloadMessage =>
pm.payloadInfo match {
case Right(epi) => payloads.onNext((ctx.channel(), epi))
case Left(err) => log.warn(err)
}
case _ => super.channelRead(ctx, msg)
case pm: PayloadMessage => payloads.onNext(PayloadMessageWithChannel(pm, ctx.channel()))
case _ => super.channelRead(ctx, msg)
}

def shutdown(): Unit = {
Expand Down
23 changes: 15 additions & 8 deletions src/main/scala/units/network/PayloadMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package units.network

import cats.syntax.either.*
import com.google.common.primitives.Bytes
import com.wavesplatform.account.PrivateKey
import com.wavesplatform.account.{PrivateKey, PublicKey}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.crypto
import com.wavesplatform.crypto.SignatureLength
Expand All @@ -17,8 +17,11 @@ import scala.util.Try
class PayloadMessage private (
payloadJson: JsObject,
val hash: BlockHash,
val feeRecipient: EthAddress,
val signature: Option[ByteStr]
) {
lazy val jsonBytes: Array[Byte] = Json.toBytes(payloadJson)

lazy val payloadInfo: Either[String, ExecutionPayloadInfo] = {
(for {
timestamp <- (payloadJson \ "timestamp").asOpt[String].map(toLong).toRight("timestamp not defined")
Expand Down Expand Up @@ -54,22 +57,26 @@ class PayloadMessage private (

def toBytes: Array[Byte] = {
val signatureBytes = signature.map(sig => Bytes.concat(Array(1.toByte), sig.arr)).getOrElse(Array(0.toByte))
Bytes.concat(signatureBytes, Json.toBytes(payloadJson))
Bytes.concat(signatureBytes, jsonBytes)
}

def isSignatureValid(pk: PublicKey): Boolean =
signature.exists(crypto.verify(_, jsonBytes, pk, checkWeakPk = true))
}

object PayloadMessage {
def apply(payloadJson: JsObject): Either[String, PayloadMessage] =
apply(payloadJson, None)

def apply(payloadJson: JsObject, hash: BlockHash, signature: Option[ByteStr]): PayloadMessage =
new PayloadMessage(payloadJson, hash, signature)
def apply(payloadJson: JsObject, hash: BlockHash, feeRecipient: EthAddress, signature: Option[ByteStr]): PayloadMessage =
new PayloadMessage(payloadJson, hash, feeRecipient, signature)

def apply(payloadJson: JsObject, signature: Option[ByteStr]): Either[String, PayloadMessage] =
(payloadJson \ "blockHash")
.asOpt[BlockHash]
.toRight("Error creating payload message: block hash not defined")
.map(PayloadMessage(payloadJson, _, signature))
(for {
hash <- (payloadJson \ "blockHash").asOpt[BlockHash].toRight("block hash not defined")
feeRecipient <- (payloadJson \ "feeRecipient").asOpt[EthAddress].toRight("fee recipient not defined")
} yield PayloadMessage(payloadJson, hash, feeRecipient, signature))
.leftMap(err => s"Error creating payload message: $err")

def apply(payloadBytes: Array[Byte], signature: Option[ByteStr]): Either[String, PayloadMessage] = for {
payload <- Try(Json.parse(payloadBytes).as[JsObject]).toEither.leftMap(err => s"Payload bytes are not a valid JSON object: ${err.getMessage}")
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/units/network/PayloadMessageWithChannel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package units.network

import io.netty.channel.Channel

case class PayloadMessageWithChannel(pm: PayloadMessage, ch: Channel)
13 changes: 9 additions & 4 deletions src/main/scala/units/network/PayloadObserver.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package units.network

import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import com.wavesplatform.network.ChannelObservable
import com.wavesplatform.account.PrivateKey
import monix.execution.CancelableFuture
import monix.reactive.Observable
import play.api.libs.json.JsObject
import units.{BlockHash, ExecutionPayloadInfo}

trait PayloadObserver {
def getPayloadStream: ChannelObservable[ExecutionPayloadInfo]
def getPayloadStream: Observable[ExecutionPayloadInfo]

def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel]
def loadPayload(req: BlockHash): CancelableFuture[ExecutionPayloadInfo]

def broadcastSigned(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage]

def broadcast(hash: BlockHash): Unit
}
Loading

0 comments on commit fa06b2b

Please sign in to comment.