diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index 0fde81a3..cacc73fe 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -34,7 +34,7 @@ import units.client.engine.EngineApiClient.PayloadId import units.client.engine.model.* import units.client.engine.model.Withdrawal.WithdrawalIndex import units.eth.{EmptyPayload, EthAddress, EthereumConstants} -import units.network.PayloadObserverImpl.PayloadWithChannel +import units.network.PayloadObserverImpl.PayloadInfoWithChannel import units.util.HexBytesConverter import units.util.HexBytesConverter.toHexNoPrefix @@ -43,17 +43,17 @@ import scala.concurrent.duration.* import scala.util.* class ELUpdater( - engineApiClient: EngineApiClient, - blockchain: Blockchain, - utx: UtxPool, - allChannels: DefaultChannelGroup, - config: ClientConfig, - time: Time, - wallet: Wallet, - requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadWithChannel], - broadcastTx: Transaction => TracedResult[ValidationError, Boolean], - scheduler: Scheduler, - globalScheduler: Scheduler + engineApiClient: EngineApiClient, + blockchain: Blockchain, + utx: UtxPool, + allChannels: DefaultChannelGroup, + config: ClientConfig, + time: Time, + wallet: Wallet, + requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadInfoWithChannel], + broadcastTx: Transaction => TracedResult[ValidationError, Boolean], + scheduler: Scheduler, + globalScheduler: Scheduler ) extends StrictLogging { import ELUpdater.* @@ -1561,8 +1561,8 @@ object ELUpdater { } } - case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[PayloadWithChannel]) extends State - case class SyncingToFinalizedBlock(target: BlockHash) extends State + case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[PayloadInfoWithChannel]) extends State + case class SyncingToFinalizedBlock(target: BlockHash) extends State } private case class RollbackBlock(hash: BlockHash, parentPayload: ExecutionPayload) diff --git a/src/main/scala/units/network/MessageObserver.scala b/src/main/scala/units/network/MessageObserver.scala index cbe03c94..9597680c 100644 --- a/src/main/scala/units/network/MessageObserver.scala +++ b/src/main/scala/units/network/MessageObserver.scala @@ -1,21 +1,26 @@ package units.network -import com.wavesplatform.utils.Schedulers +import com.wavesplatform.utils.{Schedulers, ScorexLogging} import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter} import monix.execution.schedulers.SchedulerService import monix.reactive.subjects.{ConcurrentSubject, Subject} +import units.ExecutionPayloadInfo @Sharable -class MessageObserver extends ChannelInboundHandlerAdapter { +class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging { private implicit val scheduler: SchedulerService = Schedulers.fixedPool(2, "message-observer-l2") - val payloads: Subject[(Channel, PayloadMessage), (Channel, PayloadMessage)] = ConcurrentSubject.publish[(Channel, PayloadMessage)] + val payloads: Subject[(Channel, ExecutionPayloadInfo), (Channel, ExecutionPayloadInfo)] = ConcurrentSubject.publish[(Channel, ExecutionPayloadInfo)] override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { - case b: PayloadMessage => payloads.onNext((ctx.channel(), b)) - case _ => super.channelRead(ctx, msg) + case pm: PayloadMessage => + pm.payloadInfo match { + case Right(epi) => payloads.onNext((ctx.channel(), epi)) + case Left(err) => log.warn(err) + } + case _ => super.channelRead(ctx, msg) } def shutdown(): Unit = { diff --git a/src/main/scala/units/network/PayloadMessage.scala b/src/main/scala/units/network/PayloadMessage.scala index 4b36718a..d7cdd097 100644 --- a/src/main/scala/units/network/PayloadMessage.scala +++ b/src/main/scala/units/network/PayloadMessage.scala @@ -17,7 +17,7 @@ class PayloadMessage private ( val hash: BlockHash, val signature: Option[ByteStr] ) { - lazy val payload: Either[String, ExecutionPayloadInfo] = { + lazy val payloadInfo: Either[String, ExecutionPayloadInfo] = { (for { timestamp <- (payloadJson \ "timestamp").asOpt[String].map(toLong).toRight("timestamp not defined") height <- (payloadJson \ "blockNumber").asOpt[String].map(toLong).toRight("height not defined") @@ -46,7 +46,7 @@ class PayloadMessage private ( ), payloadJson ) - }).leftMap(err => s"Error creating payload for block $hash: $err") + }).leftMap(err => s"Error creating payload info for block $hash: $err") } def toBytes: Array[Byte] = { @@ -65,7 +65,7 @@ object PayloadMessage { def apply(payloadJson: JsObject, signature: Option[ByteStr]): Either[String, PayloadMessage] = (payloadJson \ "blockHash") .asOpt[BlockHash] - .toRight("Error creating payload: block hash not defined") + .toRight("Error creating payload message: block hash not defined") .map(PayloadMessage(payloadJson, _, signature)) def apply(payloadBytes: Array[Byte], signature: Option[ByteStr]): Either[String, PayloadMessage] = for { diff --git a/src/main/scala/units/network/PayloadObserver.scala b/src/main/scala/units/network/PayloadObserver.scala index 9b00eeca..1640975c 100644 --- a/src/main/scala/units/network/PayloadObserver.scala +++ b/src/main/scala/units/network/PayloadObserver.scala @@ -1,15 +1,13 @@ package units.network -import units.network.PayloadObserverImpl.PayloadWithChannel +import units.network.PayloadObserverImpl.PayloadInfoWithChannel import com.wavesplatform.network.ChannelObservable import monix.eval.Task import monix.execution.CancelableFuture -import units.BlockHash +import units.{BlockHash, ExecutionPayloadInfo} trait PayloadObserver { - def getPayloadStream: ChannelObservable[PayloadMessage] + def getPayloadStream: ChannelObservable[ExecutionPayloadInfo] - def requestPayload(req: BlockHash): Task[PayloadWithChannel] - - def loadPayload(req: BlockHash): CancelableFuture[PayloadWithChannel] + def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel] } diff --git a/src/main/scala/units/network/PayloadObserverImpl.scala b/src/main/scala/units/network/PayloadObserverImpl.scala index efcd49a2..7de42060 100644 --- a/src/main/scala/units/network/PayloadObserverImpl.scala +++ b/src/main/scala/units/network/PayloadObserverImpl.scala @@ -8,93 +8,67 @@ import io.netty.channel.group.DefaultChannelGroup import monix.eval.Task import monix.execution.{Cancelable, CancelableFuture, CancelablePromise, Scheduler} import monix.reactive.subjects.ConcurrentSubject -import units.network.PayloadObserverImpl.{PayloadWithChannel, State} -import units.BlockHash +import units.network.PayloadObserverImpl.{PayloadInfoWithChannel, State} +import units.{BlockHash, ExecutionPayloadInfo} import java.time.Duration import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters.* import scala.util.{Failure, Success} -class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObservable[PayloadMessage], syncTimeout: FiniteDuration)(implicit +class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObservable[ExecutionPayloadInfo], syncTimeout: FiniteDuration)(implicit sc: Scheduler ) extends PayloadObserver with ScorexLogging { private var state: State = State.Idle(None) - private val payloadsResult: ConcurrentSubject[PayloadWithChannel, PayloadWithChannel] = - ConcurrentSubject.publish[PayloadWithChannel] + private val payloadsResult: ConcurrentSubject[PayloadInfoWithChannel, PayloadInfoWithChannel] = + ConcurrentSubject.publish[PayloadInfoWithChannel] private val knownPayloadCache = CacheBuilder .newBuilder() .expireAfterWrite(Duration.ofMinutes(10)) .maximumSize(100) - .build[BlockHash, PayloadWithChannel]() + .build[BlockHash, PayloadInfoWithChannel]() payloads - .foreach { case v @ (ch, pm) => + .foreach { case v @ (ch, epi) => state = state match { - case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == pm.hash => + case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == epi.payload.hash => nextAttempt.cancel() - p.complete(Success(ch -> pm)) + p.complete(Success(ch -> epi)) State.Idle(Some(ch)) case other => other } - knownPayloadCache.put(pm.hash, v) + knownPayloadCache.put(epi.payload.hash, v) payloadsResult.onNext(v) } - def loadPayload(req: BlockHash): CancelableFuture[PayloadWithChannel] = knownPayloadCache.getIfPresent(req) match { - case null => - val p = CancelablePromise[PayloadWithChannel]() - sc.execute { () => - val candidate = state match { - case State.LoadingPayload(_, nextAttempt, promise) => - nextAttempt.cancel() - promise.complete(Failure(new NoSuchElementException("Loading was canceled"))) - None - case State.Idle(candidate) => candidate - } - state = State.LoadingPayload(req, requestFromNextChannel(req, candidate, Set.empty).runToFuture, p) - } - p.future - case (ch, pm) => - CancelablePromise.successful(ch -> pm).future - } - - def getPayloadStream: ChannelObservable[PayloadMessage] = payloadsResult - - def requestPayload(req: BlockHash): Task[PayloadWithChannel] = Task - .defer { - log.info(s"Loading payload $req") - knownPayloadCache.getIfPresent(req) match { - case null => - val p = CancelablePromise[PayloadWithChannel]() - + def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel] = { + log.info(s"Loading payload $req") + knownPayloadCache.getIfPresent(req) match { + case null => + val p = CancelablePromise[PayloadInfoWithChannel]() + sc.execute { () => val candidate = state match { - case l: State.LoadingPayload => - log.trace(s"No longer waiting for payload ${l.blockHash}, will load $req instead") - l.nextAttempt.cancel() - l.promise.future.cancel() + case State.LoadingPayload(_, nextAttempt, promise) => + nextAttempt.cancel() + promise.complete(Failure(new NoSuchElementException("Loading was canceled"))) None - case State.Idle(candidate) => - candidate + case State.Idle(candidate) => candidate } - - state = State.LoadingPayload( - req, - requestFromNextChannel(req, candidate, Set.empty).runToFuture, - p - ) - - Task.fromCancelablePromise(p) - case (ch, pm) => - Task.pure(ch -> pm) - } + state = State.LoadingPayload(req, requestFromNextChannel(req, candidate, Set.empty).runToFuture, p) + } + p.future + case (ch, pm) => + CancelablePromise.successful(ch -> pm).future } - .executeOn(sc) + } + + def getPayloadStream: ChannelObservable[ExecutionPayloadInfo] = payloadsResult + // TODO: remove Task private def requestFromNextChannel(req: BlockHash, candidate: Option[Channel], excluded: Set[Channel]): Task[Unit] = Task { candidate.filterNot(excluded).orElse(nextOpenChannel(excluded)) match { case None => @@ -113,12 +87,12 @@ class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObs object PayloadObserverImpl { - type PayloadWithChannel = (Channel, PayloadMessage) + type PayloadInfoWithChannel = (Channel, ExecutionPayloadInfo) sealed trait State object State { - case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[PayloadWithChannel]) extends State + case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[PayloadInfoWithChannel]) extends State case class Idle(pinnedChannel: Option[Channel]) extends State } diff --git a/src/main/scala/units/network/TrafficLogger.scala b/src/main/scala/units/network/TrafficLogger.scala index d7bb6cf0..1867dff9 100644 --- a/src/main/scala/units/network/TrafficLogger.scala +++ b/src/main/scala/units/network/TrafficLogger.scala @@ -21,7 +21,7 @@ class TrafficLogger(settings: TL.Settings) extends TL(settings) { } protected def stringify(msg: Any): String = msg match { - case pm: PayloadMessage => s"${pm.hash}" + case pm: PayloadMessage => s"PayloadMessage(hash=${pm.hash})" case RawBytes(code, data) => s"RawBytes(${specsByCodes(code).messageName}, ${data.length} bytes)" case other => other.toString } diff --git a/src/test/scala/units/network/TestPayloadObserver.scala b/src/test/scala/units/network/TestPayloadObserver.scala index 5c317b0b..7256f246 100644 --- a/src/test/scala/units/network/TestPayloadObserver.scala +++ b/src/test/scala/units/network/TestPayloadObserver.scala @@ -5,7 +5,7 @@ import com.wavesplatform.utils.ScorexLogging import io.netty.channel.Channel import monix.eval.Task import monix.execution.CancelableFuture -import units.network.PayloadObserverImpl.PayloadWithChannel +import units.network.PayloadObserverImpl.PayloadInfoWithChannel import units.{BlockHash, NetworkBlock} class TestPayloadObserver(override val getPayloadStream: ChannelObservable[NetworkBlock]) extends PayloadObserver with ScorexLogging { @@ -14,7 +14,7 @@ class TestPayloadObserver(override val getPayloadStream: ChannelObservable[Netwo CancelableFuture.never } - def requestPayload(req: BlockHash): Task[PayloadWithChannel] = { + def requestPayload(req: BlockHash): Task[PayloadInfoWithChannel] = { log.debug(s"requestBlock($req)") Task.never }