Skip to content

Commit

Permalink
Signature is on network level
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-mashonskiy committed Sep 27, 2024
1 parent fa06b2b commit ed0bed0
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 71 deletions.
12 changes: 11 additions & 1 deletion src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import net.ceedubs.ficus.Ficus.*
import org.slf4j.LoggerFactory
import sttp.client3.HttpClientSyncBackend
import units.client.JwtAuthenticationBackend
import units.client.contract.{ChainContractClient, ChainContractStateClient}
import units.client.engine.{EngineApiClient, HttpEngineApiClient, LoggedEngineApiClient}
import units.network.*

Expand All @@ -27,6 +28,7 @@ class ConsensusClient(
config: ClientConfig,
context: ExtensionContext,
engineApiClient: EngineApiClient,
chainContractClient: ChainContractClient,
payloadObserver: PayloadObserver,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
Expand All @@ -39,6 +41,7 @@ class ConsensusClient(
deps.config,
context,
deps.engineApiClient,
deps.chainContractClient,
deps.payloadObserver,
deps.globalScheduler,
deps.eluScheduler,
Expand All @@ -50,6 +53,7 @@ class ConsensusClient(
private[units] val elu =
new ELUpdater(
engineApiClient,
chainContractClient,
context.blockchain,
context.utx,
payloadObserver,
Expand Down Expand Up @@ -115,6 +119,9 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab

val engineApiClient = new LoggedEngineApiClient(new HttpEngineApiClient(config, maybeAuthenticatedBackend))

private val contractAddress = config.chainContractAddress
val chainContractClient = new ChainContractStateClient(contractAddress, context.blockchain)

val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val peerDatabase = new PeerDatabaseImpl(config.network)
val messageObserver = new MessageObserver()
Expand All @@ -127,7 +134,10 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab
new ConcurrentHashMap[Channel, PeerInfo]
)

val payloadObserver = new PayloadObserverImpl(allChannels, messageObserver.payloads, config.blockSyncRequestTimeout)(payloadObserverScheduler)
val payloadObserver =
new PayloadObserverImpl(allChannels, messageObserver.payloads, chainContractClient.getMinersPks, config.blockSyncRequestTimeout)(
payloadObserverScheduler
)

override def close(): Unit = {
log.info("Closing HTTP/Engine API")
Expand Down
49 changes: 18 additions & 31 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import com.wavesplatform.transaction.{Asset, Proofs, Transaction, TransactionSig
import com.wavesplatform.utils.{EthEncoding, Time, UnsupportedFeature, forceStopApplication}
import com.wavesplatform.utx.UtxPool
import com.wavesplatform.wallet.Wallet
import io.netty.channel.Channel
import monix.execution.cancelables.SerialCancelable
import monix.execution.{CancelableFuture, Scheduler}
import play.api.libs.json.*
import units.ELUpdater.State.*
import units.ELUpdater.State.ChainStatus.{FollowingChain, Mining, WaitForNewChain}
import units.client.CommonBlockData
Expand All @@ -33,7 +31,6 @@ import units.client.engine.model.*
import units.client.engine.model.Withdrawal.WithdrawalIndex
import units.eth.{EmptyPayload, EthAddress, EthereumConstants}
import units.network.PayloadObserver
import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import units.util.HexBytesConverter
import units.util.HexBytesConverter.toHexNoPrefix

Expand All @@ -43,6 +40,7 @@ import scala.util.*

class ELUpdater(
engineApiClient: EngineApiClient,
chainContractClient: ChainContractClient,
blockchain: Blockchain,
utx: UtxPool,
payloadObserver: PayloadObserver,
Expand All @@ -55,9 +53,7 @@ class ELUpdater(
) extends StrictLogging {
import ELUpdater.*

private val handleNextUpdate = SerialCancelable()
private val contractAddress = config.chainContractAddress
private val chainContractClient = new ChainContractStateClient(contractAddress, blockchain)
private val handleNextUpdate = SerialCancelable()

private[units] var state: State = Starting

Expand Down Expand Up @@ -152,7 +148,7 @@ class ELUpdater(
// Removing here, because we have these transactions in PP after the onProcessBlock trigger
utx.getPriorityPool.foreach { pp =>
val staleTxs = pp.priorityTransactions.filter {
case tx: InvokeScriptTransaction => tx.dApp == contractAddress
case tx: InvokeScriptTransaction => tx.dApp == config.chainContractAddress
case _ => false
}

Expand All @@ -169,7 +165,7 @@ class ELUpdater(
val tx = InvokeScriptTransaction(
TxVersion.V2,
invoker.publicKey,
contractAddress,
config.chainContractAddress,
Some(fc),
Seq.empty,
TxPositiveAmount.unsafeFrom(FeeConstants(TransactionType.InvokeScript) * FeeUnit + extraFee),
Expand All @@ -178,7 +174,9 @@ class ELUpdater(
Proofs.empty,
blockchain.settings.addressSchemeCharacter.toByte
).signWith(invoker.privateKey)
logger.info(s"Invoking $contractAddress '${fc.function.funcName}' for block ${payload.hash}->${payload.parentHash}, txId=${tx.id()}")
logger.info(
s"Invoking ${config.chainContractAddress} '${fc.function.funcName}' for block ${payload.hash}->${payload.parentHash}, txId=${tx.id()}"
)
cleanPriorityPool()

broadcastTx(tx).resultE match {
Expand Down Expand Up @@ -477,14 +475,15 @@ class ELUpdater(
}

private def handleConsensusLayerChanged(): Unit = {
payloadObserver.updateMinerPublicKeys(chainContractClient.getMinersPks)
state match {
case Starting => updateStartingState()
case w: Working[ChainStatus] => updateWorkingState(w)
case other => logger.debug(s"Unprocessed state: $other")
}
}

private def findAltChain(prevChainId: Long, referenceBlock: BlockHash) = {
private def findAltChain(prevChainId: Long, referenceBlock: BlockHash): Option[ChainInfo] = {
logger.debug(s"Trying to find alternative chain referencing $referenceBlock")

val lastChainId = chainContractClient.getLastChainId
Expand Down Expand Up @@ -777,11 +776,11 @@ class ELUpdater(
}
}

private def requestAndProcessPayload(hash: BlockHash): CancelableFuture[(Channel, ExecutionPayloadInfo)] = {
private def requestAndProcessPayload(hash: BlockHash): CancelableFuture[ExecutionPayloadInfo] = {
payloadObserver
.loadPayload(hash)
.andThen {
case Success((ch, epi)) => executionPayloadReceived(epi)
case Success(epi) => executionPayloadReceived(epi)
case Failure(exception) => logger.error(s"Error loading block $hash payload", exception)
}(globalScheduler)
}
Expand Down Expand Up @@ -943,23 +942,11 @@ class ELUpdater(
val payload = epi.payload
epochInfo match {
case Some(epochMeta) =>
for {
_ <- Either.cond(
payload.feeRecipient == epochMeta.rewardAddress,
(),
ClientError(s"block miner ${payload.feeRecipient} doesn't equal to ${epochMeta.rewardAddress}")
)
signature <- Either.fromOption(epi.signature, ClientError(s"signature not found"))
publicKey <- Either.fromOption(
chainContractClient.getMinerPublicKey(payload.feeRecipient),
ClientError(s"public key for block miner ${payload.feeRecipient} not found")
)
_ <- Either.cond(
crypto.verify(signature, Json.toBytes(epi.payloadJson), publicKey, checkWeakPk = true),
(),
ClientError(s"invalid signature")
)
} yield ()
Either.cond(
payload.feeRecipient == epochMeta.rewardAddress,
(),
ClientError(s"block miner ${payload.feeRecipient} doesn't equal to ${epochMeta.rewardAddress}")
)
case _ => Either.unit
}
}
Expand Down Expand Up @@ -1556,8 +1543,8 @@ object ELUpdater {
}
}

case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[PayloadInfoWithChannel]) extends State
case class SyncingToFinalizedBlock(target: BlockHash) extends State
case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[ExecutionPayloadInfo]) extends State
case class SyncingToFinalizedBlock(target: BlockHash) extends State
}

private case class RollbackBlock(hash: BlockHash, parentPayload: ExecutionPayload)
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/units/ExecutionPayloadInfo.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package units

import com.wavesplatform.common.state.ByteStr
import play.api.libs.json.JsObject
import units.client.engine.model.ExecutionPayload

case class ExecutionPayloadInfo(payload: ExecutionPayload, payloadJson: JsObject, signature: Option[ByteStr])
case class ExecutionPayloadInfo(payload: ExecutionPayload, payloadJson: JsObject)
13 changes: 13 additions & 0 deletions src/main/scala/units/client/contract/ChainContractClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ trait ChainContractClient {
case BinaryDataEntry(_, v) => EthAddress.unsafeFrom(v.arr)
}

def getPublicKey(rewardAddress: EthAddress): Option[PublicKey] = {
getBinaryData(s"miner_${rewardAddress}_PK").map(PublicKey(_))
}

def getBlock(hash: BlockHash): Option[ContractBlock] =
getBinaryData(s"block_$hash").orElse(getBinaryData(s"blockMeta${clean(hash)}")).map { blockMeta =>
val bb = ByteBuffer.wrap(blockMeta.arr)
Expand Down Expand Up @@ -212,6 +216,15 @@ trait ChainContractClient {
def getNativeTransfers(fromIndex: Long, maxItems: Long): Vector[ContractTransfer] =
(fromIndex until math.min(fromIndex + maxItems, getNativeTransfersCount)).map(requireNativeTransfer).toVector

def getMinersPks: Map[EthAddress, PublicKey] = {
getAllActualMiners.flatMap { addr =>
for {
rewardAddress <- getElRewardAddress(addr)
publicKey <- getPublicKey(rewardAddress)
} yield rewardAddress -> publicKey
}.toMap
}

private def getNativeTransfersCount: Long = getLongData("nativeTransfersCount").getOrElse(0L)

private def requireNativeTransfer(atIndex: Long): ContractTransfer = {
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/units/network/MessageObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package units.network

import com.wavesplatform.utils.{Schedulers, ScorexLogging}
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.{ConcurrentSubject, Subject}
import units.ExecutionPayloadInfo

@Sharable
class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging {
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/units/network/PayloadMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class PayloadMessage private (
prevRandao,
withdrawals
),
payloadJson,
signature
payloadJson
)
}).leftMap(err => s"Error creating payload info for block $hash: $err")
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/units/network/PayloadObserver.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package units.network

import com.wavesplatform.account.PrivateKey
import com.wavesplatform.account.{PrivateKey, PublicKey}
import monix.execution.CancelableFuture
import monix.reactive.Observable
import play.api.libs.json.JsObject
import units.eth.EthAddress
import units.{BlockHash, ExecutionPayloadInfo}

trait PayloadObserver {
Expand All @@ -14,4 +15,6 @@ trait PayloadObserver {
def broadcastSigned(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage]

def broadcast(hash: BlockHash): Unit

def updateMinerPublicKeys(newKeys: Map[EthAddress, PublicKey]): Unit
}
24 changes: 16 additions & 8 deletions src/main/scala/units/network/PayloadObserverImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class PayloadObserverImpl(
extends PayloadObserver
with ScorexLogging {

private var state: State = State.Idle(None)
private var lastPayloadMessage: Option[PayloadMessageWithChannel] = None
private var state: State = State.Idle(None)
private val lastPayloadMessages: ConcurrentHashMap[BlockHash, PayloadMessageWithChannel] =
new ConcurrentHashMap[BlockHash, PayloadMessageWithChannel]()

private val payloadsResult: ConcurrentSubject[ExecutionPayloadInfo, ExecutionPayloadInfo] =
ConcurrentSubject.publish[ExecutionPayloadInfo]
Expand All @@ -45,23 +46,26 @@ class PayloadObserverImpl(
.build[BlockHash, ExecutionPayloadInfo]()

payloads
.foreach { case PayloadMessageWithChannel(pm, ch) =>
.foreach { case v @ PayloadMessageWithChannel(pm, ch) =>
state = state match {
case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == pm.hash =>
pm.payloadInfo match {
case Right(epi) =>
nextAttempt.cancel()
p.complete(Success(epi))
lastPayloadMessages.put(pm.hash, v)
knownPayloadCache.put(pm.hash, epi)
payloadsResult.onNext(epi)
State.Idle(Some(ch))
case Left(err) => log.debug(err)
}
State.Idle(Some(ch))
case other =>
Option(addrToPK.get(pm.feeRecipient)) match {
case Some(pk) =>
if (pm.isSignatureValid(pk)) {
pm.payloadInfo match {
case Right(epi) =>
lastPayloadMessages.put(pm.hash, v)
knownPayloadCache.put(pm.hash, epi)
payloadsResult.onNext(epi)
case Left(err) => log.debug(err)
Expand Down Expand Up @@ -107,14 +111,21 @@ class PayloadObserverImpl(

override def broadcast(hash: BlockHash): Unit = {
(for {
payloadWithChannel <- lastPayloadMessage.toRight("No prepared for broadcast payload")
payloadWithChannel <- Option(lastPayloadMessages.get(hash)).toRight(s"No prepared for broadcast payload $hash")
_ <- Either.cond(hash == payloadWithChannel.pm.hash, (), s"Payload for block $hash is not last received")
_ = log.debug(s"Broadcasting block ${payloadWithChannel.pm.hash} payload")
_ <- Try(allChannels.broadcast(payloadWithChannel.pm, Some(payloadWithChannel.ch))).toEither.leftMap(_.getMessage)
} yield ()).fold(
err => log.error(s"Failed to broadcast last received payload: $err"),
identity
)

lastPayloadMessages.remove(hash)
}

def updateMinerPublicKeys(newKeys: Map[EthAddress, PublicKey]): Unit = {
addrToPK.clear()
addrToPK.putAll(newKeys.asJava)
}

// TODO: remove Task
Expand All @@ -135,9 +146,6 @@ class PayloadObserverImpl(
}

object PayloadObserverImpl {

type PayloadInfoWithChannel = (Channel, ExecutionPayloadInfo)

sealed trait State

object State {
Expand Down
Loading

0 comments on commit ed0bed0

Please sign in to comment.