From fa06b2b5806c4dbb5ba7c0f87f9b38e4db3533a0 Mon Sep 17 00:00:00 2001 From: Ivan Mashonskii Date: Thu, 26 Sep 2024 18:15:04 +0300 Subject: [PATCH] Signature is on network level --- src/main/scala/units/ConsensusClient.scala | 7 +- src/main/scala/units/ELUpdater.scala | 50 +++------ .../scala/units/network/MessageObserver.scala | 10 +- .../scala/units/network/PayloadMessage.scala | 23 ++-- .../network/PayloadMessageWithChannel.scala | 5 + .../scala/units/network/PayloadObserver.scala | 13 ++- .../units/network/PayloadObserverImpl.scala | 105 +++++++++++++----- .../units/network/TestPayloadObserver.scala | 11 ++ 8 files changed, 139 insertions(+), 85 deletions(-) create mode 100644 src/main/scala/units/network/PayloadMessageWithChannel.scala diff --git a/src/main/scala/units/ConsensusClient.scala b/src/main/scala/units/ConsensusClient.scala index b210efce..551f5f72 100644 --- a/src/main/scala/units/ConsensusClient.scala +++ b/src/main/scala/units/ConsensusClient.scala @@ -28,7 +28,6 @@ class ConsensusClient( context: ExtensionContext, engineApiClient: EngineApiClient, payloadObserver: PayloadObserver, - allChannels: DefaultChannelGroup, globalScheduler: Scheduler, eluScheduler: Scheduler, ownedResources: AutoCloseable @@ -41,7 +40,6 @@ class ConsensusClient( context, deps.engineApiClient, deps.payloadObserver, - deps.allChannels, deps.globalScheduler, deps.eluScheduler, deps @@ -54,18 +52,17 @@ class ConsensusClient( engineApiClient, context.blockchain, context.utx, - allChannels, + payloadObserver, config, context.time, context.wallet, - payloadObserver.loadPayload, context.broadcastTransaction, eluScheduler, globalScheduler ) private val payloadsStreamCancelable: CancelableFuture[Unit] = - payloadObserver.getPayloadStream.foreach { case (ch, ep) => elu.executionPayloadReceived(ep, ch) }(globalScheduler) + payloadObserver.getPayloadStream.foreach(elu.executionPayloadReceived)(globalScheduler) override def start(): Unit = {} diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index e147c994..7190132d 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -9,7 +9,6 @@ import com.wavesplatform.common.state.ByteStr import com.wavesplatform.crypto import com.wavesplatform.lang.ValidationError import com.wavesplatform.lang.v1.compiler.Terms.FUNCTION_CALL -import com.wavesplatform.network.ChannelGroupExt import com.wavesplatform.state.Blockchain import com.wavesplatform.state.diffs.FeeValidation.{FeeConstants, FeeUnit, ScriptExtraFee} import com.wavesplatform.state.diffs.TransactionDiffer.TransactionValidationError @@ -21,7 +20,6 @@ import com.wavesplatform.utils.{EthEncoding, Time, UnsupportedFeature, forceStop import com.wavesplatform.utx.UtxPool import com.wavesplatform.wallet.Wallet import io.netty.channel.Channel -import io.netty.channel.group.DefaultChannelGroup import monix.execution.cancelables.SerialCancelable import monix.execution.{CancelableFuture, Scheduler} import play.api.libs.json.* @@ -34,7 +32,7 @@ import units.client.engine.EngineApiClient.PayloadId import units.client.engine.model.* import units.client.engine.model.Withdrawal.WithdrawalIndex import units.eth.{EmptyPayload, EthAddress, EthereumConstants} -import units.network.PayloadMessage +import units.network.PayloadObserver import units.network.PayloadObserverImpl.PayloadInfoWithChannel import units.util.HexBytesConverter import units.util.HexBytesConverter.toHexNoPrefix @@ -47,11 +45,10 @@ class ELUpdater( engineApiClient: EngineApiClient, blockchain: Blockchain, utx: UtxPool, - allChannels: DefaultChannelGroup, + payloadObserver: PayloadObserver, config: ClientConfig, time: Time, wallet: Wallet, - requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadInfoWithChannel], broadcastTx: Transaction => TracedResult[ValidationError, Boolean], scheduler: Scheduler, globalScheduler: Scheduler @@ -67,7 +64,7 @@ class ELUpdater( def consensusLayerChanged(): Unit = handleNextUpdate := scheduler.scheduleOnce(ClChangedProcessingDelay)(handleConsensusLayerChanged()) - def executionPayloadReceived(epi: ExecutionPayloadInfo, ch: Channel): Unit = scheduler.execute { () => + def executionPayloadReceived(epi: ExecutionPayloadInfo): Unit = scheduler.execute { () => val payload = epi.payload logger.debug(s"New block ${payload.hash}->${payload.parentHash} (timestamp=${payload.timestamp}, height=${payload.height}) appeared") @@ -91,13 +88,13 @@ class ELUpdater( } case w @ Working(_, lastPayload, _, _, _, FollowingChain(nodeChainInfo, _), _, returnToMainChainInfo) if payload.parentHash == lastPayload.hash => - validateAndApply(epi, ch, w, lastPayload, nodeChainInfo, returnToMainChainInfo) + validateAndApply(epi, w, lastPayload, nodeChainInfo, returnToMainChainInfo) case w: Working[ChainStatus] => w.returnToMainChainInfo match { case Some(rInfo) if rInfo.missedBlock.hash == payload.hash => chainContractClient.getChainInfo(rInfo.chainId) match { case Some(chainInfo) if chainInfo.isMain => - validateAndApplyMissedBlock(epi, ch, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo) + validateAndApplyMissedBlock(epi, w, rInfo.missedBlock, rInfo.missedBlockParentPayload, chainInfo) case Some(_) => logger.debug(s"Chain ${rInfo.chainId} is not main anymore, ignoring ${payload.hash}") case _ => @@ -233,11 +230,7 @@ class ELUpdater( latestValidHashOpt <- engineApiClient.applyNewPayload(payloadJson) latestValidHash <- Either.fromOption(latestValidHashOpt, ClientError("Latest valid hash not defined")) _ = logger.info(s"Applied payload $payloadId, block hash is $latestValidHash, timestamp = $timestamp") - newPm <- PayloadMessage.signed(payloadJson, m.keyPair.privateKey).leftMap(ClientError.apply) - _ = logger.debug(s"Broadcasting block ${newPm.hash} payload") - _ <- Try(allChannels.broadcast(newPm)).toEither.leftMap(err => - ClientError(s"Failed to broadcast block ${newPm.hash} payload: ${err.toString}") - ) + newPm <- payloadObserver.broadcastSigned(payloadJson, m.keyPair.privateKey).leftMap(ClientError.apply) payloadInfo <- newPm.payloadInfo.leftMap(ClientError.apply) payload = payloadInfo.payload transfersRootHash <- getE2CTransfersRootHash(payload.hash, chainContractOptions.elBridgeAddress) @@ -785,10 +778,12 @@ class ELUpdater( } private def requestAndProcessPayload(hash: BlockHash): CancelableFuture[(Channel, ExecutionPayloadInfo)] = { - requestPayloadFromPeers(hash).andThen { - case Success((ch, epi)) => executionPayloadReceived(epi, ch) - case Failure(exception) => logger.error(s"Error loading block $hash payload", exception) - }(globalScheduler) + payloadObserver + .loadPayload(hash) + .andThen { + case Success((ch, epi)) => executionPayloadReceived(epi) + case Failure(exception) => logger.error(s"Error loading block $hash payload", exception) + }(globalScheduler) } private def updateToFollowChain( @@ -1023,7 +1018,6 @@ class ELUpdater( private def validateAndApplyMissedBlock( epi: ExecutionPayloadInfo, - ch: Channel, prevState: Working[ChainStatus], contractBlock: ContractBlock, parentPayload: ExecutionPayload, @@ -1033,7 +1027,7 @@ class ELUpdater( validateBlockFull(epi, contractBlock, parentPayload, prevState) match { case Right(updatedState) => logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} was successfully validated") - broadcastAndConfirmBlock(epi, ch, updatedState, nodeChainInfo, None) + broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, None) case Left(err) => logger.debug(s"Missed block ${payload.hash} of main chain ${nodeChainInfo.id} validation error: ${err.message}, ignoring block") } @@ -1041,7 +1035,6 @@ class ELUpdater( private def validateAndApply( epi: ExecutionPayloadInfo, - ch: Channel, prevState: Working[ChainStatus], parentPayload: ExecutionPayload, nodeChainInfo: ChainInfo, @@ -1054,7 +1047,7 @@ class ELUpdater( validateBlockFull(epi, contractBlock, parentPayload, prevState) match { case Right(updatedState) => logger.debug(s"Block ${payload.hash} was successfully validated") - broadcastAndConfirmBlock(epi, ch, updatedState, nodeChainInfo, returnToMainChainInfo) + broadcastAndConfirmBlock(epi, updatedState, nodeChainInfo, returnToMainChainInfo) case Left(err) => logger.debug(s"Block ${payload.hash} validation error: ${err.message}") processInvalidBlock(contractBlock, prevState, Some(nodeChainInfo)) @@ -1066,7 +1059,7 @@ class ELUpdater( preValidateBlock(epi, parentPayload, epochInfo) match { case Right(_) => logger.debug(s"Block ${payload.hash} was successfully partially validated") - broadcastAndConfirmBlock(epi, ch, prevState, nodeChainInfo, returnToMainChainInfo) + broadcastAndConfirmBlock(epi, prevState, nodeChainInfo, returnToMainChainInfo) case Left(err) => logger.error(s"Block ${payload.hash} prevalidation error: ${err.message}, ignoring block") } @@ -1102,21 +1095,12 @@ class ELUpdater( private def broadcastAndConfirmBlock( epi: ExecutionPayloadInfo, - ch: Channel, prevState: Working[ChainStatus], nodeChainInfo: ChainInfo, returnToMainChainInfo: Option[ReturnToMainChainInfo] ): Unit = { - val payload = epi.payload - - (for { - pm <- PayloadMessage(epi.payloadJson, epi.signature) - _ <- Try(allChannels.broadcast(pm, Some(ch))).toEither.leftMap(_.getMessage) - } yield ()).recover { err => - logger.error(s"Failed to broadcast block ${payload.hash} payload: $err") - } - - confirmBlockAndFollowChain(payload, prevState, nodeChainInfo, returnToMainChainInfo) + payloadObserver.broadcast(epi.payload.hash) + confirmBlockAndFollowChain(epi.payload, prevState, nodeChainInfo, returnToMainChainInfo) } private def findBlockChild(parent: BlockHash, lastBlockHash: BlockHash): Either[String, ContractBlock] = { diff --git a/src/main/scala/units/network/MessageObserver.scala b/src/main/scala/units/network/MessageObserver.scala index 9597680c..8d2dda82 100644 --- a/src/main/scala/units/network/MessageObserver.scala +++ b/src/main/scala/units/network/MessageObserver.scala @@ -12,15 +12,11 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging { private implicit val scheduler: SchedulerService = Schedulers.fixedPool(2, "message-observer-l2") - val payloads: Subject[(Channel, ExecutionPayloadInfo), (Channel, ExecutionPayloadInfo)] = ConcurrentSubject.publish[(Channel, ExecutionPayloadInfo)] + val payloads: Subject[PayloadMessageWithChannel, PayloadMessageWithChannel] = ConcurrentSubject.publish[PayloadMessageWithChannel] override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { - case pm: PayloadMessage => - pm.payloadInfo match { - case Right(epi) => payloads.onNext((ctx.channel(), epi)) - case Left(err) => log.warn(err) - } - case _ => super.channelRead(ctx, msg) + case pm: PayloadMessage => payloads.onNext(PayloadMessageWithChannel(pm, ctx.channel())) + case _ => super.channelRead(ctx, msg) } def shutdown(): Unit = { diff --git a/src/main/scala/units/network/PayloadMessage.scala b/src/main/scala/units/network/PayloadMessage.scala index 0667bb83..48387dad 100644 --- a/src/main/scala/units/network/PayloadMessage.scala +++ b/src/main/scala/units/network/PayloadMessage.scala @@ -2,7 +2,7 @@ package units.network import cats.syntax.either.* import com.google.common.primitives.Bytes -import com.wavesplatform.account.PrivateKey +import com.wavesplatform.account.{PrivateKey, PublicKey} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.crypto import com.wavesplatform.crypto.SignatureLength @@ -17,8 +17,11 @@ import scala.util.Try class PayloadMessage private ( payloadJson: JsObject, val hash: BlockHash, + val feeRecipient: EthAddress, val signature: Option[ByteStr] ) { + lazy val jsonBytes: Array[Byte] = Json.toBytes(payloadJson) + lazy val payloadInfo: Either[String, ExecutionPayloadInfo] = { (for { timestamp <- (payloadJson \ "timestamp").asOpt[String].map(toLong).toRight("timestamp not defined") @@ -54,22 +57,26 @@ class PayloadMessage private ( def toBytes: Array[Byte] = { val signatureBytes = signature.map(sig => Bytes.concat(Array(1.toByte), sig.arr)).getOrElse(Array(0.toByte)) - Bytes.concat(signatureBytes, Json.toBytes(payloadJson)) + Bytes.concat(signatureBytes, jsonBytes) } + + def isSignatureValid(pk: PublicKey): Boolean = + signature.exists(crypto.verify(_, jsonBytes, pk, checkWeakPk = true)) } object PayloadMessage { def apply(payloadJson: JsObject): Either[String, PayloadMessage] = apply(payloadJson, None) - def apply(payloadJson: JsObject, hash: BlockHash, signature: Option[ByteStr]): PayloadMessage = - new PayloadMessage(payloadJson, hash, signature) + def apply(payloadJson: JsObject, hash: BlockHash, feeRecipient: EthAddress, signature: Option[ByteStr]): PayloadMessage = + new PayloadMessage(payloadJson, hash, feeRecipient, signature) def apply(payloadJson: JsObject, signature: Option[ByteStr]): Either[String, PayloadMessage] = - (payloadJson \ "blockHash") - .asOpt[BlockHash] - .toRight("Error creating payload message: block hash not defined") - .map(PayloadMessage(payloadJson, _, signature)) + (for { + hash <- (payloadJson \ "blockHash").asOpt[BlockHash].toRight("block hash not defined") + feeRecipient <- (payloadJson \ "feeRecipient").asOpt[EthAddress].toRight("fee recipient not defined") + } yield PayloadMessage(payloadJson, hash, feeRecipient, signature)) + .leftMap(err => s"Error creating payload message: $err") def apply(payloadBytes: Array[Byte], signature: Option[ByteStr]): Either[String, PayloadMessage] = for { payload <- Try(Json.parse(payloadBytes).as[JsObject]).toEither.leftMap(err => s"Payload bytes are not a valid JSON object: ${err.getMessage}") diff --git a/src/main/scala/units/network/PayloadMessageWithChannel.scala b/src/main/scala/units/network/PayloadMessageWithChannel.scala new file mode 100644 index 00000000..c96f7e05 --- /dev/null +++ b/src/main/scala/units/network/PayloadMessageWithChannel.scala @@ -0,0 +1,5 @@ +package units.network + +import io.netty.channel.Channel + +case class PayloadMessageWithChannel(pm: PayloadMessage, ch: Channel) diff --git a/src/main/scala/units/network/PayloadObserver.scala b/src/main/scala/units/network/PayloadObserver.scala index bc037920..cf9b56ee 100644 --- a/src/main/scala/units/network/PayloadObserver.scala +++ b/src/main/scala/units/network/PayloadObserver.scala @@ -1,12 +1,17 @@ package units.network -import units.network.PayloadObserverImpl.PayloadInfoWithChannel -import com.wavesplatform.network.ChannelObservable +import com.wavesplatform.account.PrivateKey import monix.execution.CancelableFuture +import monix.reactive.Observable +import play.api.libs.json.JsObject import units.{BlockHash, ExecutionPayloadInfo} trait PayloadObserver { - def getPayloadStream: ChannelObservable[ExecutionPayloadInfo] + def getPayloadStream: Observable[ExecutionPayloadInfo] - def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel] + def loadPayload(req: BlockHash): CancelableFuture[ExecutionPayloadInfo] + + def broadcastSigned(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage] + + def broadcast(hash: BlockHash): Unit } diff --git a/src/main/scala/units/network/PayloadObserverImpl.scala b/src/main/scala/units/network/PayloadObserverImpl.scala index 7de42060..4310aba2 100644 --- a/src/main/scala/units/network/PayloadObserverImpl.scala +++ b/src/main/scala/units/network/PayloadObserverImpl.scala @@ -1,55 +1,86 @@ package units.network +import cats.syntax.either.* import com.google.common.cache.CacheBuilder -import com.wavesplatform.network.ChannelObservable +import com.wavesplatform.account.{PrivateKey, PublicKey} +import com.wavesplatform.network.ChannelGroupExt import com.wavesplatform.utils.ScorexLogging import io.netty.channel.Channel import io.netty.channel.group.DefaultChannelGroup import monix.eval.Task import monix.execution.{Cancelable, CancelableFuture, CancelablePromise, Scheduler} +import monix.reactive.Observable import monix.reactive.subjects.ConcurrentSubject -import units.network.PayloadObserverImpl.{PayloadInfoWithChannel, State} +import play.api.libs.json.JsObject +import units.eth.EthAddress +import units.network.PayloadObserverImpl.State import units.{BlockHash, ExecutionPayloadInfo} import java.time.Duration +import java.util.concurrent.ConcurrentHashMap import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters.* -import scala.util.{Failure, Success} - -class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObservable[ExecutionPayloadInfo], syncTimeout: FiniteDuration)(implicit - sc: Scheduler -) extends PayloadObserver +import scala.util.{Failure, Success, Try} + +class PayloadObserverImpl( + allChannels: DefaultChannelGroup, + payloads: Observable[PayloadMessageWithChannel], + initialMinersKeys: Map[EthAddress, PublicKey], + syncTimeout: FiniteDuration +)(implicit sc: Scheduler) + extends PayloadObserver with ScorexLogging { - private var state: State = State.Idle(None) - private val payloadsResult: ConcurrentSubject[PayloadInfoWithChannel, PayloadInfoWithChannel] = - ConcurrentSubject.publish[PayloadInfoWithChannel] + private var state: State = State.Idle(None) + private var lastPayloadMessage: Option[PayloadMessageWithChannel] = None + + private val payloadsResult: ConcurrentSubject[ExecutionPayloadInfo, ExecutionPayloadInfo] = + ConcurrentSubject.publish[ExecutionPayloadInfo] + private val addrToPK: ConcurrentHashMap[EthAddress, PublicKey] = new ConcurrentHashMap[EthAddress, PublicKey](initialMinersKeys.asJava) private val knownPayloadCache = CacheBuilder .newBuilder() .expireAfterWrite(Duration.ofMinutes(10)) .maximumSize(100) - .build[BlockHash, PayloadInfoWithChannel]() + .build[BlockHash, ExecutionPayloadInfo]() payloads - .foreach { case v @ (ch, epi) => + .foreach { case PayloadMessageWithChannel(pm, ch) => state = state match { - case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == epi.payload.hash => - nextAttempt.cancel() - p.complete(Success(ch -> epi)) - State.Idle(Some(ch)) - case other => other + case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == pm.hash => + pm.payloadInfo match { + case Right(epi) => + nextAttempt.cancel() + p.complete(Success(epi)) + knownPayloadCache.put(pm.hash, epi) + payloadsResult.onNext(epi) + State.Idle(Some(ch)) + } + case other => + Option(addrToPK.get(pm.feeRecipient)) match { + case Some(pk) => + if (pm.isSignatureValid(pk)) { + pm.payloadInfo match { + case Right(epi) => + knownPayloadCache.put(pm.hash, epi) + payloadsResult.onNext(epi) + case Left(err) => log.debug(err) + } + } else { + log.debug(s"Invalid signature for payload ${pm.hash}") + } + case None => + log.debug(s"Payload ${pm.hash} fee recipient ${pm.feeRecipient} is unknown miner") + } + other } - - knownPayloadCache.put(epi.payload.hash, v) - payloadsResult.onNext(v) } - def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel] = { + def loadPayload(req: BlockHash): CancelableFuture[ExecutionPayloadInfo] = { log.info(s"Loading payload $req") - knownPayloadCache.getIfPresent(req) match { - case null => - val p = CancelablePromise[PayloadInfoWithChannel]() + Option(knownPayloadCache.getIfPresent(req)) match { + case None => + val p = CancelablePromise[ExecutionPayloadInfo]() sc.execute { () => val candidate = state match { case State.LoadingPayload(_, nextAttempt, promise) => @@ -61,12 +92,30 @@ class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObs state = State.LoadingPayload(req, requestFromNextChannel(req, candidate, Set.empty).runToFuture, p) } p.future - case (ch, pm) => - CancelablePromise.successful(ch -> pm).future + case Some(epi) => + CancelablePromise.successful(epi).future } } - def getPayloadStream: ChannelObservable[ExecutionPayloadInfo] = payloadsResult + def getPayloadStream: Observable[ExecutionPayloadInfo] = payloadsResult + + def broadcastSigned(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage] = for { + pm <- PayloadMessage.signed(payloadJson, signer) + _ = log.debug(s"Broadcasting block ${pm.hash} payload") + _ <- Try(allChannels.broadcast(pm)).toEither.leftMap(err => s"Failed to broadcast block ${pm.hash} payload: ${err.toString}") + } yield pm + + override def broadcast(hash: BlockHash): Unit = { + (for { + payloadWithChannel <- lastPayloadMessage.toRight("No prepared for broadcast payload") + _ <- Either.cond(hash == payloadWithChannel.pm.hash, (), s"Payload for block $hash is not last received") + _ = log.debug(s"Broadcasting block ${payloadWithChannel.pm.hash} payload") + _ <- Try(allChannels.broadcast(payloadWithChannel.pm, Some(payloadWithChannel.ch))).toEither.leftMap(_.getMessage) + } yield ()).fold( + err => log.error(s"Failed to broadcast last received payload: $err"), + identity + ) + } // TODO: remove Task private def requestFromNextChannel(req: BlockHash, candidate: Option[Channel], excluded: Set[Channel]): Task[Unit] = Task { @@ -92,7 +141,7 @@ object PayloadObserverImpl { sealed trait State object State { - case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[PayloadInfoWithChannel]) extends State + case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[ExecutionPayloadInfo]) extends State case class Idle(pinnedChannel: Option[Channel]) extends State } diff --git a/src/test/scala/units/network/TestPayloadObserver.scala b/src/test/scala/units/network/TestPayloadObserver.scala index 06fd1e7d..11ed702f 100644 --- a/src/test/scala/units/network/TestPayloadObserver.scala +++ b/src/test/scala/units/network/TestPayloadObserver.scala @@ -1,10 +1,12 @@ package units.network +import com.wavesplatform.account.PrivateKey import com.wavesplatform.network.ChannelObservable import com.wavesplatform.utils.ScorexLogging import io.netty.channel.Channel import monix.eval.Task import monix.execution.CancelableFuture +import play.api.libs.json.JsObject import units.network.PayloadObserverImpl.PayloadInfoWithChannel import units.{BlockHash, ExecutionPayloadInfo} @@ -18,4 +20,13 @@ class TestPayloadObserver(override val getPayloadStream: ChannelObservable[Execu log.debug(s"requestBlock($req)") Task.never } + + override def broadcastSigned(payloadJson: JsObject, signer: PrivateKey): Either[String, PayloadMessage] = { + log.debug(s"broadcastSigned($payloadJson, $signer)") + PayloadMessage.signed(payloadJson, signer) + } + + override def broadcast(hash: BlockHash): Unit = { + log.debug(s"broadcast($hash)") + } }