From ed49d7f50d4c110ef557a1689af39866680bc99e Mon Sep 17 00:00:00 2001 From: Vyatcheslav Suharnikov Date: Tue, 29 Oct 2024 16:13:43 +0400 Subject: [PATCH] Logging improvements --- consensus-client-it/build.sbt | 2 +- .../com/wavesplatform/api/HasRetry.scala | 36 +++++ .../wavesplatform/api/LoggingBackend.scala | 47 +++++++ .../com/wavesplatform/api/LoggingUtil.scala | 9 ++ .../com/wavesplatform/api/NodeHttpApi.scala | 125 +++++++++++++----- .../com/wavesplatform/api/WithRetries.scala | 36 ----- .../test/scala/units/BaseItTestSuite.scala | 7 +- .../test/scala/units/BridgeTestSuite.scala | 25 ++-- .../test/scala/units/OneNodeTestSuite.scala | 28 ++-- .../test/scala/units/RewardTestSuite.scala | 10 +- .../test/scala/units/TwoNodesTestSuite.scala | 102 ++++++++++++++ .../units/docker/WavesNodeContainer.scala | 4 +- .../test/scala/units/el/ElBridgeClient.scala | 58 ++++++-- .../test/scala/units/http/OkHttpLogger.scala | 26 ++-- .../scala/units/client/JsonRpcClient.scala | 12 +- src/main/scala/units/client/package.scala | 6 - 16 files changed, 387 insertions(+), 146 deletions(-) create mode 100644 consensus-client-it/src/test/scala/com/wavesplatform/api/HasRetry.scala create mode 100644 consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingBackend.scala create mode 100644 consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingUtil.scala delete mode 100644 consensus-client-it/src/test/scala/com/wavesplatform/api/WithRetries.scala create mode 100644 consensus-client-it/src/test/scala/units/TwoNodesTestSuite.scala delete mode 100644 src/main/scala/units/client/package.scala diff --git a/consensus-client-it/build.sbt b/consensus-client-it/build.sbt index 5ad09114..03df1e85 100644 --- a/consensus-client-it/build.sbt +++ b/consensus-client-it/build.sbt @@ -8,7 +8,7 @@ description := "Consensus client integration tests" libraryDependencies ++= Seq( "org.testcontainers" % "testcontainers" % "1.20.2", - "org.web3j" % "core" % "4.9.8" + "org.web3j" % "core" % "4.9.8" ).map(_ % Test) val logsDirectory = taskKey[File]("The directory for logs") // Evaluates every time, so it recreates the logs directory diff --git a/consensus-client-it/src/test/scala/com/wavesplatform/api/HasRetry.scala b/consensus-client-it/src/test/scala/com/wavesplatform/api/HasRetry.scala new file mode 100644 index 00000000..35be1f74 --- /dev/null +++ b/consensus-client-it/src/test/scala/com/wavesplatform/api/HasRetry.scala @@ -0,0 +1,36 @@ +package com.wavesplatform.api + +import org.scalatest.concurrent.Eventually.PatienceConfig + +import scala.concurrent.duration.{Deadline, DurationInt} +import scala.util.{Failure, Success, Try} + +trait HasRetry { + protected implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = 30.seconds, interval = 1.second) + + protected def retryWithAttempts[ResponseT](f: Int => ResponseT)(implicit patienceConfig: PatienceConfig): ResponseT = { + var attempt = 0 + retry { + attempt += 1 + f(attempt) + } + } + + // Eventually has issues with handling patienceConfig + protected def retry[ResponseT](f: => ResponseT)(implicit patienceConfig: PatienceConfig): ResponseT = { + val deadline = Deadline.now + patienceConfig.timeout + + var r = Try(f) + while (r.isFailure && deadline.hasTimeLeft()) { + Thread.sleep(patienceConfig.interval.toMillis) + r = Try(f) + } + + r match { + case Failure(e) => throw new RuntimeException(s"All attempts are out: $patienceConfig", e) + case Success(r) => r + } + } + + protected def failRetry(message: String): Nothing = throw new RuntimeException(message) +} diff --git a/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingBackend.scala b/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingBackend.scala new file mode 100644 index 00000000..968823a4 --- /dev/null +++ b/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingBackend.scala @@ -0,0 +1,47 @@ +package com.wavesplatform.api + +import com.wavesplatform.api.LoggingBackend.{LoggingOptions, LoggingOptionsTag} +import com.wavesplatform.utils.ScorexLogging +import sttp.capabilities.Effect +import sttp.client3.* + +class LoggingBackend[F[_], P](delegate: SttpBackend[F, P]) extends DelegateSttpBackend[F, P](delegate) with ScorexLogging { + override def send[T, R >: P & Effect[F]](request: Request[T, R]): F[Response[T]] = { + val l = request.tag(LoggingOptionsTag).collect { case l: LoggingOptions => l } + + l.filter(_.logRequest).foreach { l => + var logStr = s"${l.prefix} ${request.method} ${request.uri}" + if (l.logResponseBody) logStr += s": body=${request.body.show}" + log.debug(logStr) + } + + val requestWithRawJson = request.response(asBothOption(request.response, asStringAlways)) + val withErrorLog = responseMonad.handleError(requestWithRawJson.send(delegate)) { x => + l.foreach { l => log.debug(s"${l.prefix} Error: ${x.getMessage}") } + responseMonad.error(x) + } + + responseMonad.flatMap(withErrorLog) { response => + l.foreach { l => + var logStr = s"${l.prefix} HTTP ${response.code}" + if (l.logResponseBody) logStr += s": body=${response.body._2}" + log.debug(logStr) + } + + responseMonad.unit(response.copy(body = response.body._1)) + } + } +} + +object LoggingBackend { + val LoggingOptionsTag = "logging" + + case class LoggingOptions( + logRequest: Boolean = true, + logRequestBody: Boolean = true, + logResponseBody: Boolean = true, + requestId: Int = LoggingUtil.currRequestId + ) { + val prefix = s"[$requestId]" + } +} diff --git a/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingUtil.scala b/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingUtil.scala new file mode 100644 index 00000000..f9f8094c --- /dev/null +++ b/consensus-client-it/src/test/scala/com/wavesplatform/api/LoggingUtil.scala @@ -0,0 +1,9 @@ +package com.wavesplatform.api + +import java.util.concurrent.ThreadLocalRandom + +object LoggingUtil { + val Length = 5 + + def currRequestId: Int = ThreadLocalRandom.current().nextInt(10000, 100000) +} diff --git a/consensus-client-it/src/test/scala/com/wavesplatform/api/NodeHttpApi.scala b/consensus-client-it/src/test/scala/com/wavesplatform/api/NodeHttpApi.scala index d5c0bdca..359da8da 100644 --- a/consensus-client-it/src/test/scala/com/wavesplatform/api/NodeHttpApi.scala +++ b/consensus-client-it/src/test/scala/com/wavesplatform/api/NodeHttpApi.scala @@ -2,7 +2,8 @@ package com.wavesplatform.api import cats.syntax.option.* import com.wavesplatform.account.Address -import com.wavesplatform.api.NodeHttpApi.{AssetBalanceResponse, HeightResponse, TransactionInfoResponse} +import com.wavesplatform.api.LoggingBackend.{LoggingOptions, LoggingOptionsTag} +import com.wavesplatform.api.NodeHttpApi.* import com.wavesplatform.api.http.ApiMarshallers.TransactionJsonWrites import com.wavesplatform.api.http.TransactionsApiRoute.ApplicationStatus import com.wavesplatform.state.DataEntry.Format @@ -10,98 +11,142 @@ import com.wavesplatform.state.{DataEntry, EmptyDataEntry, TransactionId} import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.Transaction import com.wavesplatform.utils.ScorexLogging +import org.scalatest.concurrent.Eventually.PatienceConfig import play.api.libs.json.* import sttp.client3.* import sttp.client3.playJson.* import sttp.model.{StatusCode, Uri} import scala.concurrent.duration.DurationInt +import scala.util.chaining.scalaUtilChainingOps -class NodeHttpApi(apiUri: Uri, backend: SttpBackend[Identity, ?]) extends ScorexLogging { +class NodeHttpApi(apiUri: Uri, backend: SttpBackend[Identity, ?]) extends HasRetry with ScorexLogging { private val averageBlockDelay = 18.seconds - def height: Int = + protected override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = averageBlockDelay, interval = 1.second) + + def waitForHeight(atLeast: Int): Int = { + val loggingOptions: LoggingOptions = LoggingOptions() + log.debug(s"${loggingOptions.prefix} waitForHeight($atLeast)") + val currHeight = heightImpl()(loggingOptions) + if (currHeight >= atLeast) currHeight + else { + val subsequentPatienceConfig = patienceConfig.copy(timeout = averageBlockDelay * (atLeast - currHeight) * 2.5) + val subsequentLoggingOptions = loggingOptions.copy(logRequest = false) + Thread.sleep(patienceConfig.interval.toMillis) + retry { + heightImpl()(subsequentLoggingOptions).tap { r => + if (r < atLeast) failRetry("") + } + }(subsequentPatienceConfig) + } + } + + protected def heightImpl()(implicit loggingOptions: LoggingOptions = LoggingOptions()): Int = basicRequest .get(uri"$apiUri/blocks/height") .response(asJson[HeightResponse]) + .tag(LoggingOptionsTag, loggingOptions) .send(backend) .body match { case Left(e) => throw e case Right(r) => r.height } - def waitForHeight(atLeast: Int): Int = { - val currHeight = height - if (currHeight >= atLeast) currHeight - else - WithRetries( - maxAttempts = (averageBlockDelay.toSeconds.toInt * (atLeast - currHeight) * 2.5).toInt, - message = s"waitForHeight($atLeast)" - ).until(height) { - case h if h >= atLeast => h + def broadcastAndWait(txn: Transaction): TransactionInfoResponse = { + implicit val loggingOptions: LoggingOptions = LoggingOptions(logResponseBody = false) + log.debug(s"${loggingOptions.prefix} broadcastAndWait($txn)") + broadcastImpl(txn)(loggingOptions.copy(logRequestBody = false)) + retryWithAttempts { attempt => + val subsequentLoggingOptions = loggingOptions.copy(logRequest = attempt == 1) + transactionInfoImpl(TransactionId(txn.id()))(subsequentLoggingOptions) match { + case Some(r) if r.applicationStatus == ApplicationStatus.Succeeded => r + case r => failRetry(s"Expected ${ApplicationStatus.Succeeded} status, got: ${r.map(_.applicationStatus)}") } + } } - def broadcast[T <: Transaction](txn: T): T = + protected def broadcastImpl[T <: Transaction](txn: T)(implicit loggingOptions: LoggingOptions = LoggingOptions()): T = basicRequest .post(uri"$apiUri/transactions/broadcast") .body(txn: Transaction) + .response(asJson[BroadcastResponse]) + .tag(LoggingOptionsTag, loggingOptions) .send(backend) .body match { case Left(e) => throw new RuntimeException(e) case _ => txn } - def broadcastAndWait(txn: Transaction): TransactionInfoResponse = { - broadcast(txn) - WithRetries( - message = s"broadcastAndWait(${txn.id()})" - ).until(transactionInfo(TransactionId(txn.id()))) { - case Some(info) if info.applicationStatus == ApplicationStatus.Succeeded => info - } - } - - def transactionInfo(id: TransactionId): Option[TransactionInfoResponse] = + protected def transactionInfoImpl(id: TransactionId)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Option[TransactionInfoResponse] = basicRequest .get(uri"$apiUri/transactions/info/$id") .response(asJson[TransactionInfoResponse]) + .tag(LoggingOptionsTag, loggingOptions) .send(backend) .body match { case Left(HttpError(_, StatusCode.NotFound)) => none - case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}") - case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error") + case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}") + case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error") case Right(r) => r.some } - def getDataByKey(address: Address, key: String): Option[DataEntry[?]] = + def getDataByKey(address: Address, key: String)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Option[DataEntry[?]] = { + log.debug(s"${loggingOptions.prefix} getDataByKey($address, $key)") basicRequest .get(uri"$apiUri/addresses/data/$address/$key") .response(asJson[DataEntry[?]]) + .tag(LoggingOptionsTag, loggingOptions) .send(backend) .body match { case Left(HttpError(_, StatusCode.NotFound)) => none - case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}") - case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error") - case Right(r) => - r match { + case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}") + case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error") + case Right(response) => + response match { case _: EmptyDataEntry => none - case _ => r.some + case _ => response.some } } + } - def balance(address: Address, asset: IssuedAsset): Long = + def balance(address: Address, asset: IssuedAsset)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Long = { + log.debug(s"${loggingOptions.prefix} balance($address, $asset)") basicRequest .get(uri"$apiUri/assets/balance/$address/$asset") .response(asJson[AssetBalanceResponse]) + .tag(LoggingOptionsTag, loggingOptions) .send(backend) .body match { case Left(HttpError(_, StatusCode.NotFound)) => 0L - case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}") - case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error") + case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}") + case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error") case Right(r) => r.balance } + } - private def fail(message: String): Nothing = throw new RuntimeException(s"JSON-RPC error: $message") + def waitForConnectedPeers(atLeast: Int): Unit = { + implicit val loggingOptions: LoggingOptions = LoggingOptions(logRequestBody = false) + log.debug(s"${loggingOptions.prefix} waitForConnectedPeers($atLeast)") + retryWithAttempts { attempt => + val subsequentLoggingOptions = loggingOptions.copy(logRequest = attempt == 1) + connectedPeersImpl()(subsequentLoggingOptions).tap { x => + if (x < atLeast) failRetry(s"Expected at least $atLeast, got $x") + } + } + } + + protected def connectedPeersImpl()(implicit loggingOptions: LoggingOptions = LoggingOptions()): Int = + basicRequest + .get(uri"$apiUri/peers/connected") + .response(asJson[ConnectedPeersResponse]) + .tag(LoggingOptionsTag, loggingOptions) + .send(backend) + .body match { + case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}") + case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error") + case Right(r) => r.peers.length + } } object NodeHttpApi { @@ -110,6 +155,11 @@ object NodeHttpApi { implicit val heightResponseFormat: OFormat[HeightResponse] = Json.format[HeightResponse] } + case class BroadcastResponse(id: String) + object BroadcastResponse { + implicit val broadcastResponseFormat: OFormat[BroadcastResponse] = Json.format[BroadcastResponse] + } + case class TransactionInfoResponse(height: Int, applicationStatus: String) object TransactionInfoResponse { implicit val transactionInfoResponseFormat: OFormat[TransactionInfoResponse] = Json.format[TransactionInfoResponse] @@ -119,4 +169,9 @@ object NodeHttpApi { object AssetBalanceResponse { implicit val assetBalanceResponseFormat: OFormat[AssetBalanceResponse] = Json.format[AssetBalanceResponse] } + + case class ConnectedPeersResponse(peers: List[JsObject]) + object ConnectedPeersResponse { + implicit val connectedPeersResponseFormat: OFormat[ConnectedPeersResponse] = Json.format[ConnectedPeersResponse] + } } diff --git a/consensus-client-it/src/test/scala/com/wavesplatform/api/WithRetries.scala b/consensus-client-it/src/test/scala/com/wavesplatform/api/WithRetries.scala deleted file mode 100644 index 470be5cc..00000000 --- a/consensus-client-it/src/test/scala/com/wavesplatform/api/WithRetries.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.wavesplatform.api - -import com.wavesplatform.api -import com.wavesplatform.utils.LoggerFacade -import org.slf4j.LoggerFactory - -import java.util.concurrent.ThreadLocalRandom -import scala.concurrent.duration.{DurationInt, FiniteDuration} - -case class WithRetries(maxAttempts: Int = 30, delay: FiniteDuration = 1.second, message: String = "") { - def untilDefined[InputT](f: => Option[InputT]): InputT = until(f) { case Some(r) => r } - - def until[InputT, OutputT](f: => InputT)(cond: PartialFunction[InputT, OutputT]): OutputT = { - val values = Iterator.single(f) ++ Iterator.continually { - Thread.sleep(delay.toMillis) - f - } - - val idPrefix = s"[${ThreadLocalRandom.current().nextInt(0, 100_000)}]" - WithRetries.log.trace(s"$idPrefix $message") - values - .take(maxAttempts) - .tapEach { x => - WithRetries.log.trace(s"$idPrefix $x") - } - .collectFirst(cond) match { - case Some(r) => r - case None => - throw new RuntimeException(s"$idPrefix $message: $maxAttempts attempts are out!") - } - } -} - -object WithRetries { - protected lazy val log = LoggerFacade(LoggerFactory.getLogger(classOf[api.WithRetries].getSimpleName)) -} diff --git a/consensus-client-it/src/test/scala/units/BaseItTestSuite.scala b/consensus-client-it/src/test/scala/units/BaseItTestSuite.scala index c892176c..64a28ecd 100644 --- a/consensus-client-it/src/test/scala/units/BaseItTestSuite.scala +++ b/consensus-client-it/src/test/scala/units/BaseItTestSuite.scala @@ -2,11 +2,11 @@ package units import com.google.common.primitives.{Bytes, Ints} import com.wavesplatform.account.{AddressScheme, KeyPair, SeedKeyPair} +import com.wavesplatform.api.HasRetry import com.wavesplatform.common.state.ByteStr import com.wavesplatform.crypto import com.wavesplatform.utils.ScorexLogging import monix.execution.atomic.AtomicBoolean -import org.scalatest.concurrent.Eventually import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, EitherValues, OptionValues} @@ -17,7 +17,6 @@ import units.eth.{EthAddress, Gwei} import units.test.CustomMatchers import java.nio.charset.StandardCharsets -import scala.concurrent.duration.DurationInt trait BaseItTestSuite extends AnyFreeSpec @@ -27,10 +26,8 @@ trait BaseItTestSuite with CustomMatchers with EitherValues with OptionValues - with Eventually + with HasRetry with HasConsensusLayerDappTxHelpers { - override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 30.seconds, interval = 1.second) - override val currentHitSource: ByteStr = ByteStr.empty override val chainContractAccount: KeyPair = mkKeyPair("devnet-1", 2) protected val rewardAmount: Gwei = Gwei.ofRawGwei(2_000_000_000L) diff --git a/consensus-client-it/src/test/scala/units/BridgeTestSuite.scala b/consensus-client-it/src/test/scala/units/BridgeTestSuite.scala index 59ca11dd..4ed56ef2 100644 --- a/consensus-client-it/src/test/scala/units/BridgeTestSuite.scala +++ b/consensus-client-it/src/test/scala/units/BridgeTestSuite.scala @@ -10,24 +10,30 @@ import units.client.engine.model.GetLogsResponseEntry import units.eth.EthAddress import scala.jdk.CollectionConverters.CollectionHasAsScala -import scala.jdk.OptionConverters.RichOptional -class BridgeTestSuite extends OneNodeTestSuite { +class BridgeTestSuite extends TwoNodesTestSuite { "L2-379 Checking balances in EL->CL transfers" in { val elSender = elRichAccount1 val clRecipient = clRichAccount1 val userAmount = 1 log.info("Broadcast Bridge.sendNative transaction") - val ethAmount = Convert.toWei(userAmount.toString, Convert.Unit.ETHER).toBigIntegerExact - val sendTxnResult = ec1.elBridge.sendNative(elSender, clRecipient.toAddress, ethAmount) - val sendTxnReceipt = eventually { - ec1.web3j.ethGetTransactionReceipt(sendTxnResult.getTransactionHash).send().getTransactionReceipt.toScala.get - } + val ethAmount = Convert.toWei(userAmount.toString, Convert.Unit.ETHER).toBigIntegerExact + val sendTxnReceipt = ec1.elBridge.sendNativeAndWait(elSender, clRecipient.toAddress, ethAmount) val blockHash = BlockHash(sendTxnReceipt.getBlockHash) log.info(s"Block with transaction: $blockHash") + log.info(s"Wait block $blockHash on contract") + val blockConfirmationHeight = retry { + waves1.chainContract.getBlock(blockHash).get.height + } + + log.info(s"Wait block $blockHash finalization") + retry { + blockConfirmationHeight <= waves1.chainContract.getFinalizedBlock.height + } + val rawLogsInBlock = ec1.web3j .ethGetLogs(new EthFilter(blockHash, ec1.elBridge.address.hex).addSingleTopic(Bridge.ElSentNativeEventTopic)) .send() @@ -53,11 +59,6 @@ class BridgeTestSuite extends OneNodeTestSuite { val transferProofs = Bridge.mkTransferProofs(transferEvents, sendTxnLogIndex).reverse val wavesAmount = userAmount * Constants.UnitsInWave - log.info(s"Wait block $blockHash finalization") - eventually { - waves1.chainContract.getBlock(blockHash).get.height <= waves1.chainContract.getFinalizedBlock.height - } - def balance: Long = waves1.api.balance(clRecipient.toAddress, waves1.chainContract.token) val balanceBefore = balance diff --git a/consensus-client-it/src/test/scala/units/OneNodeTestSuite.scala b/consensus-client-it/src/test/scala/units/OneNodeTestSuite.scala index db4bf5c3..1d90043b 100644 --- a/consensus-client-it/src/test/scala/units/OneNodeTestSuite.scala +++ b/consensus-client-it/src/test/scala/units/OneNodeTestSuite.scala @@ -1,8 +1,8 @@ package units +import com.wavesplatform.common.utils.EitherExt2 import units.client.engine.model.BlockNumber import units.docker.{EcContainer, Networks, WavesNodeContainer} -import com.wavesplatform.common.utils.EitherExt2 trait OneNodeTestSuite extends BaseItTestSuite { protected lazy val ec1: EcContainer = new EcContainer( @@ -39,27 +39,25 @@ trait OneNodeTestSuite extends BaseItTestSuite { waves1.api.broadcastAndWait(chainContract.setScript()) log.info("Setup chain contract") - val genesisBlock = ec1.engineApi.getBlockByNumber(BlockNumber.Number(0)).explicitGet().getOrElse(fail("No EL genesis block")) + val genesisBlock = ec1.engineApi.getBlockByNumber(BlockNumber.Number(0)).explicitGet().getOrElse(failRetry("No EL genesis block")) waves1.api.broadcastAndWait( - chainContract - .setup( - genesisBlock = genesisBlock, - elMinerReward = rewardAmount.amount.longValue(), - daoAddress = None, - daoReward = 0, - invoker = chainContractAccount - ) + chainContract.setup( + genesisBlock = genesisBlock, + elMinerReward = rewardAmount.amount.longValue(), + daoAddress = None, + daoReward = 0, + invoker = chainContractAccount + ) ) log.info(s"Token id: ${waves1.chainContract.token}") log.info("Waves miner #1 join") val joinMiner1Result = waves1.api.broadcastAndWait( - chainContract - .join( - minerAccount = miner1Account, - elRewardAddress = miner1RewardAddress - ) + chainContract.join( + minerAccount = miner1Account, + elRewardAddress = miner1RewardAddress + ) ) val epoch1Number = joinMiner1Result.height + 1 diff --git a/consensus-client-it/src/test/scala/units/RewardTestSuite.scala b/consensus-client-it/src/test/scala/units/RewardTestSuite.scala index 727d75a9..11b75c88 100644 --- a/consensus-client-it/src/test/scala/units/RewardTestSuite.scala +++ b/consensus-client-it/src/test/scala/units/RewardTestSuite.scala @@ -5,7 +5,7 @@ import units.client.engine.model.BlockNumber class RewardTestSuite extends OneNodeTestSuite { "L2-234 The reward for a previous epoch is in the first block withdrawals" in { - val epoch1FirstEcBlock = eventually { + val epoch1FirstEcBlock = retry { ec1.engineApi.getBlockByNumber(BlockNumber.Number(1)).explicitGet().get } @@ -13,8 +13,8 @@ class RewardTestSuite extends OneNodeTestSuite { epoch1FirstEcBlock.withdrawals shouldBe empty } - val epoch1FirstContractBlock = eventually { - waves1.chainContract.getBlock(epoch1FirstEcBlock.hash).getOrElse(fail(s"No first block ${epoch1FirstEcBlock.hash} confirmation")) + val epoch1FirstContractBlock = retry { + waves1.chainContract.getBlock(epoch1FirstEcBlock.hash).getOrElse(failRetry(s"No first block ${epoch1FirstEcBlock.hash} confirmation")) } val epoch1Number = epoch1FirstContractBlock.epoch @@ -24,14 +24,14 @@ class RewardTestSuite extends OneNodeTestSuite { waves1.api.waitForHeight(epoch2Number) log.info(s"Wait for epoch #$epoch2Number data on chain contract") - val epoch2FirstContractBlock = eventually { + val epoch2FirstContractBlock = retry { waves1.chainContract.getEpochFirstBlock(epoch2Number).get } val epoch2FirstEcBlock = ec1.engineApi .getBlockByHash(epoch2FirstContractBlock.hash) .explicitGet() - .getOrElse(fail(s"Can't find ${epoch2FirstContractBlock.hash}")) + .getOrElse(failRetry(s"Can't find ${epoch2FirstContractBlock.hash}")) epoch2FirstEcBlock.withdrawals should have length 1 diff --git a/consensus-client-it/src/test/scala/units/TwoNodesTestSuite.scala b/consensus-client-it/src/test/scala/units/TwoNodesTestSuite.scala new file mode 100644 index 00000000..3fd2f584 --- /dev/null +++ b/consensus-client-it/src/test/scala/units/TwoNodesTestSuite.scala @@ -0,0 +1,102 @@ +package units + +import com.wavesplatform.common.utils.EitherExt2 +import units.client.engine.model.BlockNumber +import units.docker.{EcContainer, Networks, WavesNodeContainer} + +trait TwoNodesTestSuite extends BaseItTestSuite { + protected lazy val ec1: EcContainer = new EcContainer( + network = network, + number = 1, + ip = Networks.ipForNode(2) // ipForNode(1) is assigned to Ryuk + ) + + protected lazy val ec2: EcContainer = new EcContainer( + network = network, + number = 2, + ip = Networks.ipForNode(3) + ) + + protected lazy val waves1: WavesNodeContainer = new WavesNodeContainer( + network = network, + number = 1, + ip = Networks.ipForNode(4), + baseSeed = "devnet-1", + chainContractAddress = chainContractAddress, + ecEngineApiUrl = s"http://${ec1.hostName}:${EcContainer.EnginePort}" + ) + + protected lazy val waves2: WavesNodeContainer = new WavesNodeContainer( + network = network, + number = 2, + ip = Networks.ipForNode(5), + baseSeed = "devnet-2", + chainContractAddress = chainContractAddress, + ecEngineApiUrl = s"http://${ec2.hostName}:${EcContainer.EnginePort}" + ) + + override protected def startNodes(): Unit = { + ec1.start() + ec1.logPorts() + + ec2.start() + ec2.logPorts() + + waves1.start() + waves1.logPorts() + + waves2.start() + waves2.logPorts() + + waves1.waitReady() + waves2.waitReady() + + waves1.api.waitForConnectedPeers(1) + } + + override protected def stopNodes(): Unit = { + waves1.stop() + waves2.stop() + ec1.stop() + ec2.stop() + } + + override protected def setupNetwork(): Unit = { + log.info("Set script") + waves1.api.broadcastAndWait(chainContract.setScript()) + + log.info("Setup chain contract") + val genesisBlock = ec1.engineApi.getBlockByNumber(BlockNumber.Number(0)).explicitGet().getOrElse(fail("No EL genesis block")) + waves1.api.broadcastAndWait( + chainContract.setup( + genesisBlock = genesisBlock, + elMinerReward = rewardAmount.amount.longValue(), + daoAddress = None, + daoReward = 0, + invoker = chainContractAccount + ) + ) + + log.info(s"Token id: ${waves1.chainContract.token}") + + log.info("Waves miner #1 join") + waves1.api.broadcastAndWait( + chainContract.join( + minerAccount = miner1Account, + elRewardAddress = miner1RewardAddress + ) + ) + + log.info("Waves miner #2 join") + val joinMiner2Result = waves1.api.broadcastAndWait( + chainContract.join( + minerAccount = miner2Account, + elRewardAddress = miner2RewardAddress + ) + ) + + val epoch1Number = joinMiner2Result.height + 1 + log.info(s"Wait for #$epoch1Number epoch") + waves1.api.waitForHeight(epoch1Number) + } +} diff --git a/consensus-client-it/src/test/scala/units/docker/WavesNodeContainer.scala b/consensus-client-it/src/test/scala/units/docker/WavesNodeContainer.scala index 6b7978d1..35eab4a8 100644 --- a/consensus-client-it/src/test/scala/units/docker/WavesNodeContainer.scala +++ b/consensus-client-it/src/test/scala/units/docker/WavesNodeContainer.scala @@ -2,7 +2,7 @@ package units.docker import com.google.common.io.Files import com.wavesplatform.account.Address -import com.wavesplatform.api.NodeHttpApi +import com.wavesplatform.api.{LoggingBackend, NodeHttpApi} import com.wavesplatform.common.utils.Base58 import org.testcontainers.containers.BindMode import org.testcontainers.containers.Network.NetworkImpl @@ -58,7 +58,7 @@ class WavesNodeContainer( lazy val apiPort = container.getMappedPort(ApiPort) // TODO common from EcContainer - private val httpClientBackend = HttpClientSyncBackend() + private val httpClientBackend = new LoggingBackend(HttpClientSyncBackend()) lazy val api = new NodeHttpApi(uri"http://${container.getHost}:$apiPort", httpClientBackend) diff --git a/consensus-client-it/src/test/scala/units/el/ElBridgeClient.scala b/consensus-client-it/src/test/scala/units/el/ElBridgeClient.scala index 99b09d19..7c3e1380 100644 --- a/consensus-client-it/src/test/scala/units/el/ElBridgeClient.scala +++ b/consensus-client-it/src/test/scala/units/el/ElBridgeClient.scala @@ -1,27 +1,51 @@ package units.el import com.wavesplatform.account.Address +import com.wavesplatform.api.{HasRetry, LoggingUtil} import com.wavesplatform.utils.{EthEncoding, ScorexLogging} +import org.scalatest.Assertions.fail import org.web3j.abi.datatypes.generated.Bytes20 import org.web3j.abi.datatypes.{Type, Function as Web3Function} import org.web3j.crypto.{Credentials, RawTransaction, TransactionEncoder} import org.web3j.protocol.Web3j import org.web3j.protocol.core.DefaultBlockParameterName -import org.web3j.protocol.core.methods.response.EthSendTransaction +import org.web3j.protocol.core.methods.response.{EthSendTransaction, TransactionReceipt} import org.web3j.tx.gas.DefaultGasProvider import org.web3j.utils.Numeric import units.eth.EthAddress -import java.util.concurrent.ThreadLocalRandom import scala.jdk.CollectionConverters.SeqHasAsJava +import scala.jdk.OptionConverters.RichOptional +import scala.util.chaining.scalaUtilChainingOps -class ElBridgeClient(web3j: Web3j, val address: EthAddress) extends ScorexLogging { - def sendNative(sender: Credentials, recipient: Address, amountInEther: BigInt): EthSendTransaction = { - val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString - +class ElBridgeClient(web3j: Web3j, val address: EthAddress) extends HasRetry with ScorexLogging { + def sendNativeAndWait( + sender: Credentials, + recipient: Address, + amountInEther: BigInt + ): TransactionReceipt = { + val baseRequestId = LoggingUtil.currRequestId val senderAddress = sender.getAddress - log.debug(s"[$currRequestId] sendNative($senderAddress->$recipient: $amountInEther Ether)") + log.debug(s"[$baseRequestId] sendNativeAndWait($senderAddress->$recipient: $amountInEther Ether)") + val sendTxnResult = sendNativeImpl(sender, recipient, amountInEther, ElBridgeClient.currRequestId(baseRequestId)) + retryWithAttempts { attempt => + web3j + .ethGetTransactionReceipt(sendTxnResult.getTransactionHash) + .tap(_.setId(ElBridgeClient.currRequestId(baseRequestId, attempt))) + .send() + .getTransactionReceipt + .toScala + .getOrElse(fail(s"No receipt for ${sendTxnResult.getTransactionHash}")) + } + } + protected def sendNativeImpl( + sender: Credentials, + recipient: Address, + amountInEther: BigInt, + currRequestId: Long + ): EthSendTransaction = { + val senderAddress = sender.getAddress val recipientAddressHex = Numeric.toHexString(recipient.publicKeyHash) val data = org.web3j.abi.FunctionEncoder.encode( new Web3Function( @@ -31,10 +55,8 @@ class ElBridgeClient(web3j: Web3j, val address: EthAddress) extends ScorexLoggin ) ) - val ethGetTransactionCount = web3j - .ethGetTransactionCount(senderAddress, DefaultBlockParameterName.LATEST) - .send() - val nonce = ethGetTransactionCount.getTransactionCount + val ethGetTransactionCount = web3j.ethGetTransactionCount(senderAddress, DefaultBlockParameterName.LATEST).tap(_.setId(currRequestId)).send() + val nonce = ethGetTransactionCount.getTransactionCount val transaction = RawTransaction.createTransaction( nonce, @@ -47,10 +69,20 @@ class ElBridgeClient(web3j: Web3j, val address: EthAddress) extends ScorexLoggin val signedMessage = TransactionEncoder.signMessage(transaction, sender) val hexValue = Numeric.toHexString(signedMessage) - val r = web3j.ethSendRawTransaction(hexValue).send() + val r = web3j.ethSendRawTransaction(hexValue).tap(_.setId(currRequestId)).send() - log.debug(s"[$currRequestId] txn=${r.getTransactionHash}") + log.debug(s"[${ElBridgeClient.baseId(currRequestId)}] txn=${r.getTransactionHash}") r } +} + +object ElBridgeClient { + val AttemptIdLength = 3 // 001-999 + + // HACK: Distinguish the first request (001) and retries (002-999) + def currRequestId(base: Int = LoggingUtil.currRequestId, attempt: Int = 1): Long = base * 1000L + attempt + def idHasAttempt(id: Long): Boolean = id.toString.length == (LoggingUtil.Length + ElBridgeClient.AttemptIdLength) + def baseId(requestId: Long): Int = (requestId / 1000).toInt + def attempt(requestId: Long): Int = (requestId % 1000).toInt } diff --git a/consensus-client-it/src/test/scala/units/http/OkHttpLogger.scala b/consensus-client-it/src/test/scala/units/http/OkHttpLogger.scala index 360a7a0a..62a3f095 100644 --- a/consensus-client-it/src/test/scala/units/http/OkHttpLogger.scala +++ b/consensus-client-it/src/test/scala/units/http/OkHttpLogger.scala @@ -1,22 +1,32 @@ package units.http +import com.wavesplatform.api.LoggingUtil import com.wavesplatform.utils.ScorexLogging import okhttp3.{Interceptor, Request, Response} +import play.api.libs.json.Json +import units.el.ElBridgeClient -import java.util.concurrent.ThreadLocalRandom import scala.util.Try object OkHttpLogger extends Interceptor with ScorexLogging { override def intercept(chain: Interceptor.Chain): Response = { - val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString - val req = chain.request() - log.debug(s"[$currRequestId] ${req.url()} ${readRequestBody(req)}") + val req = chain.request() + val bodyStr = readRequestBody(req) + + var attempt = 1 + var currRequestId = (Json.parse(bodyStr) \ "id").asOpt[Long].getOrElse(LoggingUtil.currRequestId.toLong) + if (ElBridgeClient.idHasAttempt(currRequestId)) { + attempt = ElBridgeClient.attempt(currRequestId) + currRequestId = ElBridgeClient.baseId(currRequestId) + } + + if (attempt == 1) log.debug(s"[$currRequestId] ${req.method()} ${req.url()}: body=$bodyStr") // HACK: Log only the first attempt val res = chain.proceed(req) - log.debug(s"[$currRequestId] ${res.code()}: ${readResponseBody(res)}") + log.debug(s"[$currRequestId] HTTP ${res.code()}: body=${readResponseBody(res)}") res } - private def readRequestBody(request: Request) = request.body() match { + private def readRequestBody(request: Request): String = request.body() match { case null => "null" case body => val buffer = new okio.Buffer() @@ -26,12 +36,12 @@ object OkHttpLogger extends Interceptor with ScorexLogging { }.getOrElse("Could not read body") } - private def readResponseBody(response: Response) = response.body() match { + private def readResponseBody(response: Response): String = response.body() match { case null => "null" case body => val source = body.source() source.request(Long.MaxValue) // Buffer the entire body. - val buffer = source.buffer().clone() + val buffer = source.getBuffer.clone() buffer.readUtf8() } } diff --git a/src/main/scala/units/client/JsonRpcClient.scala b/src/main/scala/units/client/JsonRpcClient.scala index dcc1f99c..ba8ad3a9 100644 --- a/src/main/scala/units/client/JsonRpcClient.scala +++ b/src/main/scala/units/client/JsonRpcClient.scala @@ -2,14 +2,13 @@ package units.client import cats.Id import cats.syntax.either.* -import units.client.JsonRpcClient.DefaultTimeout -import units.{ClientConfig, ClientError} import play.api.libs.json.{JsError, JsValue, Reads, Writes} import sttp.client3.* import sttp.client3.playJson.* import sttp.model.Uri +import units.client.JsonRpcClient.DefaultTimeout +import units.{ClientConfig, ClientError} -import java.util.concurrent.ThreadLocalRandom import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.{Failure, Success, Try} @@ -26,15 +25,12 @@ trait JsonRpcClient { protected def parseJson[A: Reads](jsValue: JsValue): Either[ClientError, A] = Try(jsValue.as[A]).toEither.leftMap(err => ClientError(s"Response parse error: ${err.getMessage}")) - private def mkRequest[A: Writes, B: Reads](requestBody: A, timeout: FiniteDuration): RpcRequest[B] = { - val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString + private def mkRequest[A: Writes, B: Reads](requestBody: A, timeout: FiniteDuration): RpcRequest[B] = basicRequest .body(requestBody) .post(apiUrl) .response(asJson[JsonRpcResponse[B]]) .readTimeout(timeout) - .tag(RequestIdTag, currRequestId) - } private def sendRequest[RQ: Writes, RS: Reads](request: RpcRequest[RS], retriesLeft: Int): Either[String, Option[RS]] = { def retryIf(cond: Boolean, elseError: String): Either[String, Option[RS]] = @@ -42,7 +38,7 @@ trait JsonRpcClient { val retries = retriesLeft - 1 // TODO: make non-blocking waiting Thread.sleep(config.apiRequestRetryWaitTime.toMillis) - sendRequest(request.tag(RetriesLeftTag, retries), retries) + sendRequest(request, retries) } else Left(elseError) Try { diff --git a/src/main/scala/units/client/package.scala b/src/main/scala/units/client/package.scala deleted file mode 100644 index c9994e08..00000000 --- a/src/main/scala/units/client/package.scala +++ /dev/null @@ -1,6 +0,0 @@ -package units - -package object client { - val RequestIdTag = "requestId" - val RetriesLeftTag = "retriesLeft" -}