Skip to content

Commit

Permalink
Rename block classes (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-mashonskiy committed Sep 23, 2024
1 parent 216e1da commit 5722acb
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 88 deletions.
28 changes: 14 additions & 14 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.client.engine.model.Withdrawal.WithdrawalIndex
import units.eth.{EmptyPayload, EthAddress, EthereumConstants}
import units.network.PayloadObserverImpl.PayloadWithChannel
import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import units.util.HexBytesConverter
import units.util.HexBytesConverter.toHexNoPrefix

Expand All @@ -43,17 +43,17 @@ import scala.concurrent.duration.*
import scala.util.*

class ELUpdater(
engineApiClient: EngineApiClient,
blockchain: Blockchain,
utx: UtxPool,
allChannels: DefaultChannelGroup,
config: ClientConfig,
time: Time,
wallet: Wallet,
requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadWithChannel],
broadcastTx: Transaction => TracedResult[ValidationError, Boolean],
scheduler: Scheduler,
globalScheduler: Scheduler
engineApiClient: EngineApiClient,
blockchain: Blockchain,
utx: UtxPool,
allChannels: DefaultChannelGroup,
config: ClientConfig,
time: Time,
wallet: Wallet,
requestPayloadFromPeers: BlockHash => CancelableFuture[PayloadInfoWithChannel],
broadcastTx: Transaction => TracedResult[ValidationError, Boolean],
scheduler: Scheduler,
globalScheduler: Scheduler
) extends StrictLogging {
import ELUpdater.*

Expand Down Expand Up @@ -1561,8 +1561,8 @@ object ELUpdater {
}
}

case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[PayloadWithChannel]) extends State
case class SyncingToFinalizedBlock(target: BlockHash) extends State
case class WaitingForSyncHead(target: ContractBlock, task: CancelableFuture[PayloadInfoWithChannel]) extends State
case class SyncingToFinalizedBlock(target: BlockHash) extends State
}

private case class RollbackBlock(hash: BlockHash, parentPayload: ExecutionPayload)
Expand Down
15 changes: 10 additions & 5 deletions src/main/scala/units/network/MessageObserver.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package units.network

import com.wavesplatform.utils.Schedulers
import com.wavesplatform.utils.{Schedulers, ScorexLogging}
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.{ConcurrentSubject, Subject}
import units.ExecutionPayloadInfo

@Sharable
class MessageObserver extends ChannelInboundHandlerAdapter {
class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging {

private implicit val scheduler: SchedulerService = Schedulers.fixedPool(2, "message-observer-l2")

val payloads: Subject[(Channel, PayloadMessage), (Channel, PayloadMessage)] = ConcurrentSubject.publish[(Channel, PayloadMessage)]
val payloads: Subject[(Channel, ExecutionPayloadInfo), (Channel, ExecutionPayloadInfo)] = ConcurrentSubject.publish[(Channel, ExecutionPayloadInfo)]

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
case b: PayloadMessage => payloads.onNext((ctx.channel(), b))
case _ => super.channelRead(ctx, msg)
case pm: PayloadMessage =>
pm.payloadInfo match {
case Right(epi) => payloads.onNext((ctx.channel(), epi))
case Left(err) => log.warn(err)
}
case _ => super.channelRead(ctx, msg)
}

def shutdown(): Unit = {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/units/network/PayloadMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PayloadMessage private (
val hash: BlockHash,
val signature: Option[ByteStr]
) {
lazy val payload: Either[String, ExecutionPayloadInfo] = {
lazy val payloadInfo: Either[String, ExecutionPayloadInfo] = {
(for {
timestamp <- (payloadJson \ "timestamp").asOpt[String].map(toLong).toRight("timestamp not defined")
height <- (payloadJson \ "blockNumber").asOpt[String].map(toLong).toRight("height not defined")
Expand Down Expand Up @@ -46,7 +46,7 @@ class PayloadMessage private (
),
payloadJson
)
}).leftMap(err => s"Error creating payload for block $hash: $err")
}).leftMap(err => s"Error creating payload info for block $hash: $err")
}

def toBytes: Array[Byte] = {
Expand All @@ -65,7 +65,7 @@ object PayloadMessage {
def apply(payloadJson: JsObject, signature: Option[ByteStr]): Either[String, PayloadMessage] =
(payloadJson \ "blockHash")
.asOpt[BlockHash]
.toRight("Error creating payload: block hash not defined")
.toRight("Error creating payload message: block hash not defined")
.map(PayloadMessage(payloadJson, _, signature))

def apply(payloadBytes: Array[Byte], signature: Option[ByteStr]): Either[String, PayloadMessage] = for {
Expand Down
10 changes: 4 additions & 6 deletions src/main/scala/units/network/PayloadObserver.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package units.network

import units.network.PayloadObserverImpl.PayloadWithChannel
import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import com.wavesplatform.network.ChannelObservable
import monix.eval.Task
import monix.execution.CancelableFuture
import units.BlockHash
import units.{BlockHash, ExecutionPayloadInfo}

trait PayloadObserver {
def getPayloadStream: ChannelObservable[PayloadMessage]
def getPayloadStream: ChannelObservable[ExecutionPayloadInfo]

def requestPayload(req: BlockHash): Task[PayloadWithChannel]

def loadPayload(req: BlockHash): CancelableFuture[PayloadWithChannel]
def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel]
}
88 changes: 31 additions & 57 deletions src/main/scala/units/network/PayloadObserverImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,93 +8,67 @@ import io.netty.channel.group.DefaultChannelGroup
import monix.eval.Task
import monix.execution.{Cancelable, CancelableFuture, CancelablePromise, Scheduler}
import monix.reactive.subjects.ConcurrentSubject
import units.network.PayloadObserverImpl.{PayloadWithChannel, State}
import units.BlockHash
import units.network.PayloadObserverImpl.{PayloadInfoWithChannel, State}
import units.{BlockHash, ExecutionPayloadInfo}

import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Success}

class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObservable[PayloadMessage], syncTimeout: FiniteDuration)(implicit
class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObservable[ExecutionPayloadInfo], syncTimeout: FiniteDuration)(implicit
sc: Scheduler
) extends PayloadObserver
with ScorexLogging {

private var state: State = State.Idle(None)
private val payloadsResult: ConcurrentSubject[PayloadWithChannel, PayloadWithChannel] =
ConcurrentSubject.publish[PayloadWithChannel]
private val payloadsResult: ConcurrentSubject[PayloadInfoWithChannel, PayloadInfoWithChannel] =
ConcurrentSubject.publish[PayloadInfoWithChannel]

private val knownPayloadCache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofMinutes(10))
.maximumSize(100)
.build[BlockHash, PayloadWithChannel]()
.build[BlockHash, PayloadInfoWithChannel]()

payloads
.foreach { case v @ (ch, pm) =>
.foreach { case v @ (ch, epi) =>
state = state match {
case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == pm.hash =>
case State.LoadingPayload(expectedHash, nextAttempt, p) if expectedHash == epi.payload.hash =>
nextAttempt.cancel()
p.complete(Success(ch -> pm))
p.complete(Success(ch -> epi))
State.Idle(Some(ch))
case other => other
}

knownPayloadCache.put(pm.hash, v)
knownPayloadCache.put(epi.payload.hash, v)
payloadsResult.onNext(v)
}

def loadPayload(req: BlockHash): CancelableFuture[PayloadWithChannel] = knownPayloadCache.getIfPresent(req) match {
case null =>
val p = CancelablePromise[PayloadWithChannel]()
sc.execute { () =>
val candidate = state match {
case State.LoadingPayload(_, nextAttempt, promise) =>
nextAttempt.cancel()
promise.complete(Failure(new NoSuchElementException("Loading was canceled")))
None
case State.Idle(candidate) => candidate
}
state = State.LoadingPayload(req, requestFromNextChannel(req, candidate, Set.empty).runToFuture, p)
}
p.future
case (ch, pm) =>
CancelablePromise.successful(ch -> pm).future
}

def getPayloadStream: ChannelObservable[PayloadMessage] = payloadsResult

def requestPayload(req: BlockHash): Task[PayloadWithChannel] = Task
.defer {
log.info(s"Loading payload $req")
knownPayloadCache.getIfPresent(req) match {
case null =>
val p = CancelablePromise[PayloadWithChannel]()

def loadPayload(req: BlockHash): CancelableFuture[PayloadInfoWithChannel] = {
log.info(s"Loading payload $req")
knownPayloadCache.getIfPresent(req) match {
case null =>
val p = CancelablePromise[PayloadInfoWithChannel]()
sc.execute { () =>
val candidate = state match {
case l: State.LoadingPayload =>
log.trace(s"No longer waiting for payload ${l.blockHash}, will load $req instead")
l.nextAttempt.cancel()
l.promise.future.cancel()
case State.LoadingPayload(_, nextAttempt, promise) =>
nextAttempt.cancel()
promise.complete(Failure(new NoSuchElementException("Loading was canceled")))
None
case State.Idle(candidate) =>
candidate
case State.Idle(candidate) => candidate
}

state = State.LoadingPayload(
req,
requestFromNextChannel(req, candidate, Set.empty).runToFuture,
p
)

Task.fromCancelablePromise(p)
case (ch, pm) =>
Task.pure(ch -> pm)
}
state = State.LoadingPayload(req, requestFromNextChannel(req, candidate, Set.empty).runToFuture, p)
}
p.future
case (ch, pm) =>
CancelablePromise.successful(ch -> pm).future
}
.executeOn(sc)
}

def getPayloadStream: ChannelObservable[ExecutionPayloadInfo] = payloadsResult

// TODO: remove Task
private def requestFromNextChannel(req: BlockHash, candidate: Option[Channel], excluded: Set[Channel]): Task[Unit] = Task {
candidate.filterNot(excluded).orElse(nextOpenChannel(excluded)) match {
case None =>
Expand All @@ -113,12 +87,12 @@ class PayloadObserverImpl(allChannels: DefaultChannelGroup, payloads: ChannelObs

object PayloadObserverImpl {

type PayloadWithChannel = (Channel, PayloadMessage)
type PayloadInfoWithChannel = (Channel, ExecutionPayloadInfo)

sealed trait State

object State {
case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[PayloadWithChannel]) extends State
case class LoadingPayload(blockHash: BlockHash, nextAttempt: Cancelable, promise: CancelablePromise[PayloadInfoWithChannel]) extends State

case class Idle(pinnedChannel: Option[Channel]) extends State
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/units/network/TrafficLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TrafficLogger(settings: TL.Settings) extends TL(settings) {
}

protected def stringify(msg: Any): String = msg match {
case pm: PayloadMessage => s"${pm.hash}"
case pm: PayloadMessage => s"PayloadMessage(hash=${pm.hash})"
case RawBytes(code, data) => s"RawBytes(${specsByCodes(code).messageName}, ${data.length} bytes)"
case other => other.toString
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/units/network/TestPayloadObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.wavesplatform.utils.ScorexLogging
import io.netty.channel.Channel
import monix.eval.Task
import monix.execution.CancelableFuture
import units.network.PayloadObserverImpl.PayloadWithChannel
import units.network.PayloadObserverImpl.PayloadInfoWithChannel
import units.{BlockHash, NetworkBlock}

class TestPayloadObserver(override val getPayloadStream: ChannelObservable[NetworkBlock]) extends PayloadObserver with ScorexLogging {
Expand All @@ -14,7 +14,7 @@ class TestPayloadObserver(override val getPayloadStream: ChannelObservable[Netwo
CancelableFuture.never
}

def requestPayload(req: BlockHash): Task[PayloadWithChannel] = {
def requestPayload(req: BlockHash): Task[PayloadInfoWithChannel] = {
log.debug(s"requestBlock($req)")
Task.never
}
Expand Down

0 comments on commit 5722acb

Please sign in to comment.