Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Dec 9, 2024
1 parent 900c1ac commit 178761e
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 76 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import units.ConsensusClient.ChainHandler
import units.client.engine.EngineApiClient
import units.network.*

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.Future
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.Try
Expand Down Expand Up @@ -79,7 +80,7 @@ object ConsensusClient {
blockObserver: BlocksObserver,
allChannels: DefaultChannelGroup,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
eluScheduler: ScheduledExecutorService,
ownedResources: AutoCloseable
) extends AutoCloseable {
def this(context: ExtensionContext, deps: ConsensusClientDependencies) =
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/units/ConsensusClientDependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.wavesplatform.network.{PeerDatabaseImpl, PeerInfo}
import com.wavesplatform.utils.{LoggerFacade, Schedulers}
import io.netty.channel.Channel
import io.netty.channel.group.DefaultChannelGroup
import io.netty.util.concurrent.GlobalEventExecutor
import io.netty.util.concurrent.{DefaultThreadFactory, GlobalEventExecutor}
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import org.slf4j.LoggerFactory
Expand All @@ -13,7 +13,7 @@ import units.client.JwtAuthenticationBackend
import units.client.engine.{HttpEngineApiClient, LoggedEngineApiClient}
import units.network.{BlocksObserverImpl, HistoryReplier, MessageObserver, NetworkServer}

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ConcurrentHashMap, Executors, ScheduledExecutorService, ThreadFactory}
import scala.io.Source

// A helper to create ConsensusClient due to Scala secondary constructors limitations
Expand All @@ -23,8 +23,15 @@ class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseabl
private val blockObserverScheduler =
Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService =
Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })
val eluScheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor({ (r: Runnable) =>
val t = new Thread(r)
t.setDaemon(true)
t.setName(s"el-updater-${config.chainContract}")
t.setUncaughtExceptionHandler((t: Thread, e: Throwable) => {
log.warn("Exception in ELUpdater", e)
})
t
})

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
Expand Down
169 changes: 99 additions & 70 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.wavesplatform.utx.UtxPool
import com.wavesplatform.wallet.Wallet
import io.netty.channel.Channel
import io.netty.channel.group.DefaultChannelGroup
import monix.execution.cancelables.SerialCancelable
import monix.execution.{CancelableFuture, Scheduler}
import play.api.libs.json.*
import units.ELUpdater.State.*
Expand All @@ -38,6 +37,7 @@ import units.network.BlocksObserverImpl.BlockWithChannel
import units.util.HexBytesConverter
import units.util.HexBytesConverter.toHexNoPrefix

import java.util.concurrent.{CompletableFuture, Future, ScheduledExecutorService, TimeUnit}
import scala.annotation.tailrec
import scala.concurrent.duration.*
import scala.util.*
Expand All @@ -52,19 +52,25 @@ class ELUpdater(
wallet: Wallet,
requestBlockFromPeers: BlockHash => CancelableFuture[BlockWithChannel],
broadcastTx: Transaction => TracedResult[ValidationError, Boolean],
scheduler: Scheduler,
scheduler: ScheduledExecutorService,
globalScheduler: Scheduler
) extends StrictLogging {
import ELUpdater.*

private val handleNextUpdate = SerialCancelable()
private val contractAddress = config.chainContractAddress
private val contractAddress = config.chainContractAddress
private val chainContractClient = new ChainContractStateClient(contractAddress, blockchain)

private[units] var state: State = Starting

def consensusLayerChanged(): Unit =
handleNextUpdate := scheduler.scheduleOnce(ClChangedProcessingDelay)(handleConsensusLayerChanged())
@volatile private var handleNextUpdate: Future[?] = CompletableFuture.completedFuture(())
def consensusLayerChanged(): Unit = {
handleNextUpdate.cancel(false)
handleNextUpdate = scheduler.schedule(
handleConsensusLayerChanged(),
ClChangedProcessingDelay.length,
ClChangedProcessingDelay.unit
)
}

def executionBlockReceived(block: NetworkL2Block, ch: Channel): Unit = scheduler.execute { () =>
logger.debug(s"New block ${block.hash}->${block.parentHash} (timestamp=${block.timestamp}, height=${block.height}) appeared")
Expand Down Expand Up @@ -207,7 +213,7 @@ class ELUpdater(
timestamp: Long,
contractFunction: ContractFunction,
chainContractOptions: ChainContractOptions
): Unit = {
): Runnable = () => {
def getWaitingTime: Option[FiniteDuration] = {
val timestampAheadTime = (timestamp - time.correctedTime() / 1000).max(0)
if (timestampAheadTime > 0) {
Expand All @@ -221,8 +227,10 @@ class ELUpdater(
case Working(epochInfo, _, _, _, _, m: Mining, _, _) if m.currentPayloadId == payloadId =>
getWaitingTime match {
case Some(waitingTime) =>
scheduler.scheduleOnce(waitingTime)(
prepareAndApplyPayload(payloadId, referenceHash, timestamp, contractFunction, chainContractOptions)
scheduler.schedule(
(() => prepareAndApplyPayload(payloadId, referenceHash, timestamp, contractFunction, chainContractOptions)): Runnable,
waitingTime.length,
waitingTime.unit
)
case _ =>
(for {
Expand Down Expand Up @@ -374,14 +382,16 @@ class ELUpdater(
)

setState("12", newState)
scheduler.scheduleOnce((miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1).seconds)(
scheduler.schedule(
prepareAndApplyPayload(
miningData.payloadId,
parentBlock.hash,
miningData.nextBlockUnixTs,
newState.options.startEpochChainFunction(parentBlock.hash, epochInfo.hitSource, nodeChainInfo.toOption),
newState.options
)
),
(miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1),
TimeUnit.SECONDS
)
}).fold(
err => logger.error(s"Error starting payload build process: ${err.message}"),
Expand All @@ -396,7 +406,7 @@ class ELUpdater(
epochNumber: Int,
parentBlock: EcBlock,
chainContractOptions: ChainContractOptions
): Unit = {
): Runnable = () => {
state match {
case w @ Working(epochInfo, _, finalizedBlock, _, _, m: Mining, _, _) if epochInfo.number == epochNumber && blockchain.height == epochNumber =>
val nextBlockUnixTs = (parentBlock.timestamp + config.blockDelay.toSeconds).max(time.correctedTime() / 1000)
Expand All @@ -413,9 +423,11 @@ class ELUpdater(
).fold[Unit](
err => {
logger.error(s"Error starting payload build process: ${err.message}")
scheduler.scheduleOnce(MiningRetryInterval) {
tryToForgeNextBlock(epochNumber, parentBlock, chainContractOptions)
}
scheduler.schedule(
tryToForgeNextBlock(epochNumber, parentBlock, chainContractOptions),
MiningRetryInterval.length,
MiningRetryInterval.unit
)
},
miningData => {
val newState = w.copy(
Expand All @@ -427,14 +439,16 @@ class ELUpdater(
)
)
setState("11", newState)
scheduler.scheduleOnce((miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1).seconds)(
scheduler.schedule(
prepareAndApplyPayload(
miningData.payloadId,
parentBlock.hash,
miningData.nextBlockUnixTs,
chainContractOptions.appendFunction(parentBlock.hash),
chainContractOptions
)
),
(miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1),
TimeUnit.SECONDS
)
}
)
Expand Down Expand Up @@ -484,7 +498,7 @@ class ELUpdater(
}
}

private def handleConsensusLayerChanged(): Unit = {
private def handleConsensusLayerChanged(): Runnable = () => {
state match {
case Starting => updateStartingState()
case w: Working[ChainStatus] => updateWorkingState(w)
Expand Down Expand Up @@ -566,11 +580,16 @@ class ELUpdater(
w.chainStatus match {
case FollowingChain(_, Some(nextExpectedBlock)) =>
logger.debug(s"Waiting for block $nextExpectedBlock from peers")
scheduler.scheduleOnce(WaitRequestedBlockTimeout) {
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
}
scheduler.schedule(
(
() =>
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
): Runnable,
WaitRequestedBlockTimeout.length,
WaitRequestedBlockTimeout.unit
)
case FollowingChain(nodeChainInfo, _) =>
tryToStartMining(w, Right(nodeChainInfo))
case WaitForNewChain(chainSwitchInfo) =>
Expand All @@ -583,11 +602,16 @@ class ELUpdater(

prevState.chainStatus.nextExpectedBlock match {
case Some(missedBlock) =>
scheduler.scheduleOnce(WaitRequestedBlockTimeout) {
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
}
scheduler.schedule(
(
() =>
if (blockchain.height == prevState.epochInfo.number) {
check(missedBlock)
}
): Runnable,
WaitRequestedBlockTimeout.length,
WaitRequestedBlockTimeout.unit
)
case _ =>
tryToStartMining(prevState, Right(prevState.chainStatus.nodeChainInfo))
}
Expand Down Expand Up @@ -888,50 +912,55 @@ class ELUpdater(
chainInfo.lastBlock.height > lastEcBlock.height && !chainContractClient.blockExists(lastEcBlock.hash) ||
chainInfo.lastBlock.height < lastEcBlock.height

private def waitForSyncCompletion(target: ContractBlock): Unit = scheduler.scheduleOnce(5.seconds)(state match {
case SyncingToFinalizedBlock(finalizedBlockHash) if finalizedBlockHash == target.hash =>
logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}")
engineApiClient.getLastExecutionBlock() match {
case Left(error) =>
logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}")
setState("23", Starting)
case Right(lastBlock) if lastBlock.hash == target.hash =>
logger.debug(s"Finished synchronization to ${target.hash} successfully")
calculateEpochInfo match {
case Left(err) =>
logger.error(s"Could not transition to following chain state: $err")
setState("24", Starting)
case Right(newEpochInfo) =>
chainContractClient.getMainChainInfo match {
case Some(mainChainInfo) =>
logger.trace(s"Following main chain ${mainChainInfo.id}")
val fullValidationStatus =
FullValidationStatus(
lastValidatedBlock = target,
lastElWithdrawalIndex = None
)
followChainAndRequestNextBlock(
newEpochInfo,
mainChainInfo,
lastBlock,
mainChainInfo,
target,
fullValidationStatus,
chainContractClient.getOptions,
None
)
case _ =>
logger.error(s"Can't get main chain info")
setState("25", Starting)
private def waitForSyncCompletion(target: ContractBlock): Unit = scheduler.schedule(
() =>
state match {
case SyncingToFinalizedBlock(finalizedBlockHash) if finalizedBlockHash == target.hash =>
logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}")
engineApiClient.getLastExecutionBlock() match {
case Left(error) =>
logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}")
setState("23", Starting)
case Right(lastBlock) if lastBlock.hash == target.hash =>
logger.debug(s"Finished synchronization to ${target.hash} successfully")
calculateEpochInfo match {
case Left(err) =>
logger.error(s"Could not transition to following chain state: $err")
setState("24", Starting)
case Right(newEpochInfo) =>
chainContractClient.getMainChainInfo match {
case Some(mainChainInfo) =>
logger.trace(s"Following main chain ${mainChainInfo.id}")
val fullValidationStatus =
FullValidationStatus(
lastValidatedBlock = target,
lastElWithdrawalIndex = None
)
followChainAndRequestNextBlock(
newEpochInfo,
mainChainInfo,
lastBlock,
mainChainInfo,
target,
fullValidationStatus,
chainContractClient.getOptions,
None
)
case _ =>
logger.error(s"Can't get main chain info")
setState("25", Starting)
}
}
case Right(lastBlock) =>
logger.debug(s"Sync to ${target.hash} is in progress: current last block is ${lastBlock.hash} at height ${lastBlock.height}")
waitForSyncCompletion(target)
}
case Right(lastBlock) =>
logger.debug(s"Sync to ${target.hash} is in progress: current last block is ${lastBlock.hash} at height ${lastBlock.height}")
waitForSyncCompletion(target)
}
case other =>
logger.debug(s"Unexpected state on sync: $other")
})
case other =>
logger.debug(s"Unexpected state on sync: $other")
},
5,
TimeUnit.SECONDS
)

private def validateRandao(block: EcBlock, epochNumber: Int): JobResult[Unit] =
blockchain.vrf(epochNumber) match {
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/units/ExtensionDomain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import units.network.TestBlocksObserver
import units.test.CustomMatchers

import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors
import scala.annotation.tailrec
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.reflect.ClassTag
Expand Down Expand Up @@ -87,7 +88,7 @@ class ExtensionDomain(
val ecClients = new TestEcClients(ecGenesisBlock, blockchain)

val globalScheduler = TestScheduler(ExecutionModel.AlwaysAsyncExecution)
val eluScheduler = TestScheduler(ExecutionModel.AlwaysAsyncExecution)
val eluScheduler = Executors.newSingleThreadScheduledExecutor()

val elBlockStream = PublishSubject[(Channel, NetworkL2Block)]()
val blockObserver = new TestBlocksObserver(elBlockStream)
Expand Down

0 comments on commit 178761e

Please sign in to comment.