Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple consensus clients in one extension #24

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions docker/waves-testnet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ waves {
port = 6869
api-key-hash = ${?WAVES_API_KEY_HASH}
}
}

l2 {
chain-contract = 3Msx4Aq69zWUKy4d1wyKnQ4ofzEDAfv5Ngf
execution-client-address = "http://execution-client:8551"
jwt-secret-file = /etc/secrets/jwtsecret
units {
chains = [
{
chain-contract = 3Msx4Aq69zWUKy4d1wyKnQ4ofzEDAfv5Ngf
execution-client-address = "http://execution-client:8551"
jwt-secret-file = /etc/secrets/jwtsecret

network {
port = 6865
declared-address = ${?UNITS_DECLARED_ADDRESS}
known-peers = [
"testnet-l2-htz-hel1-1.wavesnodes.com:6865"
"testnet-l2-htz-hel1-2.wavesnodes.com:6865"
"testnet-htz-nbg1-1.wavesnodes.com:6865"
]
network {
port = 6865
declared-address = ${?UNITS_DECLARED_ADDRESS}
known-peers = [
"testnet-l2-htz-hel1-1.wavesnodes.com:6865"
"testnet-l2-htz-hel1-2.wavesnodes.com:6865"
"testnet-htz-nbg1-1.wavesnodes.com:6865"
]
}
mining-enable = no
}
mining-enable = no
}
]
}
36 changes: 23 additions & 13 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
waves {
l2 {
chain-contract = ""

execution-client-address = "http://127.0.0.1:8551"
units {
defaults {
api-request-retries = 2
api-request-retry-wait-time = 2s

block-delay = 6s
block-sync-request-timeout = 500ms

network = ${waves.network}
network {
known-peers = [] # Clean L1 peers
file = ${waves.directory}"/peersL2.dat"
port = 6865
}

mining-enable = true
sync-interval = 2s

network = ${waves.network} {
known-peers = []
file = null
}
}

chains = [
# sample chain config:
# {
# chain-contract = ""
# execution-client-address = "http://127.0.0.1:8551"
# mining-enable = true
# network = {
# port = 6865
# declared-address = "1.2.3.4:6865"
# known-peers = [
# "5.6.7.8:6865"
# ]
# }
# }
]
}
1 change: 0 additions & 1 deletion src/main/scala/units/ClientConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ case class ClientConfig(
blockSyncRequestTimeout: FiniteDuration,
network: NetworkSettings,
miningEnable: Boolean,
syncInterval: FiniteDuration,
jwtSecretFile: Option[String]
) {
lazy val chainContractAddress: Address = Address.fromString(chainContract).explicitGet()
Expand Down
174 changes: 67 additions & 107 deletions src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
@@ -1,150 +1,110 @@
package units

import com.typesafe.scalalogging.StrictLogging
import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.events.BlockchainUpdateTriggers
import com.wavesplatform.extensions.{Extension, Context as ExtensionContext}
import com.wavesplatform.network.{PeerDatabaseImpl, PeerInfo}
import com.wavesplatform.state.{Blockchain, StateSnapshot}
import com.wavesplatform.utils.{LoggerFacade, Schedulers}
import io.netty.channel.Channel
import io.netty.channel.group.DefaultChannelGroup
import io.netty.util.concurrent.GlobalEventExecutor
import monix.execution.schedulers.SchedulerService
import monix.execution.{CancelableFuture, Scheduler}
import net.ceedubs.ficus.Ficus.*
import org.slf4j.LoggerFactory
import sttp.client3.HttpClientSyncBackend
import units.client.JwtAuthenticationBackend
import units.client.engine.{EngineApiClient, HttpEngineApiClient, LoggedEngineApiClient}
import units.ConsensusClient.ChainHandler
import units.client.engine.EngineApiClient
import units.network.*

import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.io.Source

class ConsensusClient(
config: ClientConfig,
context: ExtensionContext,
engineApiClient: EngineApiClient,
blockObserver: BlocksObserver,
allChannels: DefaultChannelGroup,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
ownedResources: AutoCloseable
) extends Extension
with BlockchainUpdateTriggers {

def this(context: ExtensionContext, deps: ConsensusClientDependencies) =
this(
deps.config,
context,
deps.engineApiClient,
deps.blockObserver,
deps.allChannels,
deps.globalScheduler,
deps.eluScheduler,
deps
)
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.Try

def this(context: ExtensionContext) = this(context, new ConsensusClientDependencies(context))
class ConsensusClient(context: ExtensionContext) extends StrictLogging with Extension with BlockchainUpdateTriggers {
import scala.concurrent.ExecutionContext.Implicits.global

private[units] val elu =
new ELUpdater(
engineApiClient,
context.blockchain,
context.utx,
allChannels,
config,
context.time,
context.wallet,
blockObserver.loadBlock,
context.broadcastTransaction,
eluScheduler,
globalScheduler
)
private def requireUnique[A](configs: Iterable[ClientConfig], key: ClientConfig => A, keyName: String): Unit = {
val duplicateKeys = configs.groupBy(key).collect { case (k, confs) if confs.size > 1 => k -> confs.size }
require(duplicateKeys.isEmpty, s"The following $keyName were used several times in config: ${duplicateKeys.mkString(",")}")
}

private val chainHandlers: Seq[ChainHandler] = {
val defaultConfig = context.settings.config.getConfig("units.defaults")

val legacyChainConfig =
Try(context.settings.config.getConfig("waves.l2")).toOption.map(_.withFallback(defaultConfig).as[ClientConfig]).tapEach { _ =>
logger.info("Consensus client settings at waves.l2 path have been deprecated, please update your config file")
}

val newChainConfigs = context.settings.config
.getConfigList("units.chains")
.asScala
.map(cfg => cfg.withFallback(defaultConfig).resolve().as[ClientConfig])

val allChainConfigs = legacyChainConfig ++ newChainConfigs

requireUnique(allChainConfigs, _.chainContract, "chain contract addresses")
requireUnique(allChainConfigs, _.executionClientAddress, "execution client addresses")

private val blocksStreamCancelable: CancelableFuture[Unit] =
blockObserver.getBlockStream.foreach { case (ch, block) => elu.executionBlockReceived(block, ch) }(globalScheduler)
allChainConfigs.map(cfg => new ConsensusClient.ChainHandler(context, new ConsensusClientDependencies(cfg))).toVector
}

override def start(): Unit = {}

def shutdown(): Future[Unit] = Future {
blocksStreamCancelable.cancel()
ownedResources.close()
}(globalScheduler)
def shutdown(): Future[Unit] = Future.sequence(chainHandlers.map(h => Future(h.close()))).map(_ => ())

override def onProcessBlock(
block: Block,
snapshot: StateSnapshot,
reward: Option[Long],
hitSource: ByteStr,
blockchainBeforeWithReward: Blockchain
): Unit = elu.consensusLayerChanged()
): Unit = chainHandlers.foreach(_.elu.consensusLayerChanged())

override def onProcessMicroBlock(
microBlock: MicroBlock,
snapshot: StateSnapshot,
blockchainBeforeWithReward: Blockchain,
totalBlockId: ByteStr,
totalTransactionsRoot: ByteStr
): Unit = elu.consensusLayerChanged()
): Unit = chainHandlers.foreach(_.elu.consensusLayerChanged())

override def onRollback(blockchainBefore: Blockchain, toBlockId: ByteStr, toHeight: Int): Unit = {}

override def onMicroBlockRollback(blockchainBefore: Blockchain, toBlockId: ByteStr): Unit = {}
}

// A helper to create ConsensusClient due to Scala secondary constructors limitations
class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseable {
protected lazy val log: LoggerFacade = LoggerFacade(LoggerFactory.getLogger(classOf[ConsensusClient]))

val config: ClientConfig = context.settings.config.as[ClientConfig]("waves.l2")

private val blockObserverScheduler = Schedulers.singleThread("block-observer-l2", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService = Scheduler.singleThread("el-updater", reporter = { e => log.warn("Exception in ELUpdater", e) })

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
case Some(secretFile) =>
val src = Source.fromFile(secretFile)
try new JwtAuthenticationBackend(src.getLines().next(), httpClientBackend)
finally src.close()
case _ =>
log.warn("JWT secret is not set")
httpClientBackend
}

val engineApiClient = new LoggedEngineApiClient(new HttpEngineApiClient(config, maybeAuthenticatedBackend))

val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val peerDatabase = new PeerDatabaseImpl(config.network)
val messageObserver = new MessageObserver()
private val networkServer = NetworkServer(
config,
new HistoryReplier(engineApiClient)(globalScheduler),
peerDatabase,
messageObserver,
allChannels,
new ConcurrentHashMap[Channel, PeerInfo]
)

val blockObserver = new BlocksObserverImpl(allChannels, messageObserver.blocks, config.blockSyncRequestTimeout)(blockObserverScheduler)

override def close(): Unit = {
log.info("Closing HTTP/Engine API")
httpClientBackend.close()

log.debug("Closing peer database L2")
peerDatabase.close()
object ConsensusClient {
class ChainHandler(
context: ExtensionContext,
config: ClientConfig,
engineApiClient: EngineApiClient,
blockObserver: BlocksObserver,
allChannels: DefaultChannelGroup,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
ownedResources: AutoCloseable
) extends AutoCloseable {
def this(context: ExtensionContext, deps: ConsensusClientDependencies) =
this(context, deps.config, deps.engineApiClient, deps.blockObserver, deps.allChannels, deps.globalScheduler, deps.eluScheduler, deps)

val elu = new ELUpdater(
engineApiClient,
context.blockchain,
context.utx,
allChannels,
config,
context.time,
context.wallet,
blockObserver.loadBlock,
context.broadcastTransaction,
eluScheduler,
globalScheduler
)

log.info("Stopping network services L2")
networkServer.shutdown()
messageObserver.shutdown()
private val blocksStreamCancelable: CancelableFuture[Unit] =
blockObserver.getBlockStream.foreach { case (ch, block) => elu.executionBlockReceived(block, ch) }(globalScheduler)

log.info("Closing schedulers")
blockObserverScheduler.shutdown()
eluScheduler.shutdown()
override def close(): Unit = {
blocksStreamCancelable.cancel()
ownedResources.close()
}
}
}
71 changes: 71 additions & 0 deletions src/main/scala/units/ConsensusClientDependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package units

import com.wavesplatform.network.{PeerDatabaseImpl, PeerInfo}
import com.wavesplatform.utils.{LoggerFacade, Schedulers}
import io.netty.channel.Channel
import io.netty.channel.group.DefaultChannelGroup
import io.netty.util.concurrent.GlobalEventExecutor
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import org.slf4j.LoggerFactory
import sttp.client3.HttpClientSyncBackend
import units.client.JwtAuthenticationBackend
import units.client.engine.{HttpEngineApiClient, LoggedEngineApiClient}
import units.network.{BlocksObserverImpl, HistoryReplier, MessageObserver, NetworkServer}

import java.util.concurrent.ConcurrentHashMap
import scala.io.Source

// A helper to create ConsensusClient due to Scala secondary constructors limitations
class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseable {
protected lazy val log: LoggerFacade = LoggerFacade(LoggerFactory.getLogger(classOf[ConsensusClient]))

private val blockObserverScheduler =
Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService =
Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
case Some(secretFile) =>
val src = Source.fromFile(secretFile)
try new JwtAuthenticationBackend(src.getLines().next(), httpClientBackend)
finally src.close()
case _ =>
log.warn("JWT secret is not set")
httpClientBackend
}

val engineApiClient = new LoggedEngineApiClient(new HttpEngineApiClient(config, maybeAuthenticatedBackend))

val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val peerDatabase = new PeerDatabaseImpl(config.network)
val messageObserver = new MessageObserver()
private val networkServer = NetworkServer(
config,
new HistoryReplier(engineApiClient)(globalScheduler),
peerDatabase,
messageObserver,
allChannels,
new ConcurrentHashMap[Channel, PeerInfo]
)

val blockObserver = new BlocksObserverImpl(allChannels, messageObserver.blocks, config.blockSyncRequestTimeout)(blockObserverScheduler)

override def close(): Unit = {
log.info("Closing HTTP/Engine API")
httpClientBackend.close()

log.debug("Closing peer database L2")
peerDatabase.close()

log.info("Stopping network services L2")
networkServer.shutdown()
messageObserver.shutdown()

log.info("Closing schedulers")
blockObserverScheduler.shutdown()
eluScheduler.shutdown()
}
}
6 changes: 6 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
waves {
wallet.seed = "2j8DJP266rEEfdDSfPNbSAqmxMfiyPySHWxCaqP2SHcen8hFbhF1KkqqFH"
l2 {
execution-client-address = "http://127.0.0.1:8551"
api-request-retries = 2
api-request-retry-wait-time = 2s
block-delay = 6s
block-sync-request-timeout = 500ms
chain-contract = "3MsD16zWCbJ4G7QnJizA9x7tM6yDHShoSYB" # Seed: chain-contract
mining-enable = false
network = ${waves.network}
}
}
Loading