Skip to content

Commit

Permalink
Logging improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vsuharnikov committed Oct 29, 2024
1 parent 724e339 commit ed49d7f
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 146 deletions.
2 changes: 1 addition & 1 deletion consensus-client-it/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ description := "Consensus client integration tests"

libraryDependencies ++= Seq(
"org.testcontainers" % "testcontainers" % "1.20.2",
"org.web3j" % "core" % "4.9.8"
"org.web3j" % "core" % "4.9.8"
).map(_ % Test)

val logsDirectory = taskKey[File]("The directory for logs") // Evaluates every time, so it recreates the logs directory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.wavesplatform.api

import org.scalatest.concurrent.Eventually.PatienceConfig

import scala.concurrent.duration.{Deadline, DurationInt}
import scala.util.{Failure, Success, Try}

trait HasRetry {
protected implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = 30.seconds, interval = 1.second)

protected def retryWithAttempts[ResponseT](f: Int => ResponseT)(implicit patienceConfig: PatienceConfig): ResponseT = {
var attempt = 0
retry {
attempt += 1
f(attempt)
}
}

// Eventually has issues with handling patienceConfig
protected def retry[ResponseT](f: => ResponseT)(implicit patienceConfig: PatienceConfig): ResponseT = {
val deadline = Deadline.now + patienceConfig.timeout

var r = Try(f)
while (r.isFailure && deadline.hasTimeLeft()) {
Thread.sleep(patienceConfig.interval.toMillis)
r = Try(f)
}

r match {
case Failure(e) => throw new RuntimeException(s"All attempts are out: $patienceConfig", e)
case Success(r) => r
}
}

protected def failRetry(message: String): Nothing = throw new RuntimeException(message)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.wavesplatform.api

import com.wavesplatform.api.LoggingBackend.{LoggingOptions, LoggingOptionsTag}
import com.wavesplatform.utils.ScorexLogging
import sttp.capabilities.Effect
import sttp.client3.*

class LoggingBackend[F[_], P](delegate: SttpBackend[F, P]) extends DelegateSttpBackend[F, P](delegate) with ScorexLogging {
override def send[T, R >: P & Effect[F]](request: Request[T, R]): F[Response[T]] = {
val l = request.tag(LoggingOptionsTag).collect { case l: LoggingOptions => l }

l.filter(_.logRequest).foreach { l =>
var logStr = s"${l.prefix} ${request.method} ${request.uri}"
if (l.logResponseBody) logStr += s": body=${request.body.show}"
log.debug(logStr)
}

val requestWithRawJson = request.response(asBothOption(request.response, asStringAlways))
val withErrorLog = responseMonad.handleError(requestWithRawJson.send(delegate)) { x =>
l.foreach { l => log.debug(s"${l.prefix} Error: ${x.getMessage}") }
responseMonad.error(x)
}

responseMonad.flatMap(withErrorLog) { response =>
l.foreach { l =>
var logStr = s"${l.prefix} HTTP ${response.code}"
if (l.logResponseBody) logStr += s": body=${response.body._2}"
log.debug(logStr)
}

responseMonad.unit(response.copy(body = response.body._1))
}
}
}

object LoggingBackend {
val LoggingOptionsTag = "logging"

case class LoggingOptions(
logRequest: Boolean = true,
logRequestBody: Boolean = true,
logResponseBody: Boolean = true,
requestId: Int = LoggingUtil.currRequestId
) {
val prefix = s"[$requestId]"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.wavesplatform.api

import java.util.concurrent.ThreadLocalRandom

object LoggingUtil {
val Length = 5

def currRequestId: Int = ThreadLocalRandom.current().nextInt(10000, 100000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,106 +2,151 @@ package com.wavesplatform.api

import cats.syntax.option.*
import com.wavesplatform.account.Address
import com.wavesplatform.api.NodeHttpApi.{AssetBalanceResponse, HeightResponse, TransactionInfoResponse}
import com.wavesplatform.api.LoggingBackend.{LoggingOptions, LoggingOptionsTag}
import com.wavesplatform.api.NodeHttpApi.*
import com.wavesplatform.api.http.ApiMarshallers.TransactionJsonWrites
import com.wavesplatform.api.http.TransactionsApiRoute.ApplicationStatus
import com.wavesplatform.state.DataEntry.Format
import com.wavesplatform.state.{DataEntry, EmptyDataEntry, TransactionId}
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.Transaction
import com.wavesplatform.utils.ScorexLogging
import org.scalatest.concurrent.Eventually.PatienceConfig
import play.api.libs.json.*
import sttp.client3.*
import sttp.client3.playJson.*
import sttp.model.{StatusCode, Uri}

import scala.concurrent.duration.DurationInt
import scala.util.chaining.scalaUtilChainingOps

class NodeHttpApi(apiUri: Uri, backend: SttpBackend[Identity, ?]) extends ScorexLogging {
class NodeHttpApi(apiUri: Uri, backend: SttpBackend[Identity, ?]) extends HasRetry with ScorexLogging {
private val averageBlockDelay = 18.seconds

def height: Int =
protected override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = averageBlockDelay, interval = 1.second)

def waitForHeight(atLeast: Int): Int = {
val loggingOptions: LoggingOptions = LoggingOptions()
log.debug(s"${loggingOptions.prefix} waitForHeight($atLeast)")
val currHeight = heightImpl()(loggingOptions)
if (currHeight >= atLeast) currHeight
else {
val subsequentPatienceConfig = patienceConfig.copy(timeout = averageBlockDelay * (atLeast - currHeight) * 2.5)
val subsequentLoggingOptions = loggingOptions.copy(logRequest = false)
Thread.sleep(patienceConfig.interval.toMillis)
retry {
heightImpl()(subsequentLoggingOptions).tap { r =>
if (r < atLeast) failRetry("")
}
}(subsequentPatienceConfig)
}
}

protected def heightImpl()(implicit loggingOptions: LoggingOptions = LoggingOptions()): Int =
basicRequest
.get(uri"$apiUri/blocks/height")
.response(asJson[HeightResponse])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(e) => throw e
case Right(r) => r.height
}

def waitForHeight(atLeast: Int): Int = {
val currHeight = height
if (currHeight >= atLeast) currHeight
else
WithRetries(
maxAttempts = (averageBlockDelay.toSeconds.toInt * (atLeast - currHeight) * 2.5).toInt,
message = s"waitForHeight($atLeast)"
).until(height) {
case h if h >= atLeast => h
def broadcastAndWait(txn: Transaction): TransactionInfoResponse = {
implicit val loggingOptions: LoggingOptions = LoggingOptions(logResponseBody = false)
log.debug(s"${loggingOptions.prefix} broadcastAndWait($txn)")
broadcastImpl(txn)(loggingOptions.copy(logRequestBody = false))
retryWithAttempts { attempt =>
val subsequentLoggingOptions = loggingOptions.copy(logRequest = attempt == 1)
transactionInfoImpl(TransactionId(txn.id()))(subsequentLoggingOptions) match {
case Some(r) if r.applicationStatus == ApplicationStatus.Succeeded => r
case r => failRetry(s"Expected ${ApplicationStatus.Succeeded} status, got: ${r.map(_.applicationStatus)}")
}
}
}

def broadcast[T <: Transaction](txn: T): T =
protected def broadcastImpl[T <: Transaction](txn: T)(implicit loggingOptions: LoggingOptions = LoggingOptions()): T =
basicRequest
.post(uri"$apiUri/transactions/broadcast")
.body(txn: Transaction)
.response(asJson[BroadcastResponse])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(e) => throw new RuntimeException(e)
case _ => txn
}

def broadcastAndWait(txn: Transaction): TransactionInfoResponse = {
broadcast(txn)
WithRetries(
message = s"broadcastAndWait(${txn.id()})"
).until(transactionInfo(TransactionId(txn.id()))) {
case Some(info) if info.applicationStatus == ApplicationStatus.Succeeded => info
}
}

def transactionInfo(id: TransactionId): Option[TransactionInfoResponse] =
protected def transactionInfoImpl(id: TransactionId)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Option[TransactionInfoResponse] =
basicRequest
.get(uri"$apiUri/transactions/info/$id")
.response(asJson[TransactionInfoResponse])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(HttpError(_, StatusCode.NotFound)) => none
case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error")
case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error")
case Right(r) => r.some
}

def getDataByKey(address: Address, key: String): Option[DataEntry[?]] =
def getDataByKey(address: Address, key: String)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Option[DataEntry[?]] = {
log.debug(s"${loggingOptions.prefix} getDataByKey($address, $key)")
basicRequest
.get(uri"$apiUri/addresses/data/$address/$key")
.response(asJson[DataEntry[?]])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(HttpError(_, StatusCode.NotFound)) => none
case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error")
case Right(r) =>
r match {
case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error")
case Right(response) =>
response match {
case _: EmptyDataEntry => none
case _ => r.some
case _ => response.some
}
}
}

def balance(address: Address, asset: IssuedAsset): Long =
def balance(address: Address, asset: IssuedAsset)(implicit loggingOptions: LoggingOptions = LoggingOptions()): Long = {
log.debug(s"${loggingOptions.prefix} balance($address, $asset)")
basicRequest
.get(uri"$apiUri/assets/balance/$address/$asset")
.response(asJson[AssetBalanceResponse])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(HttpError(_, StatusCode.NotFound)) => 0L
case Left(HttpError(body, statusCode)) => fail(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => fail(s"failed to parse response $body: $error")
case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error")
case Right(r) => r.balance
}
}

private def fail(message: String): Nothing = throw new RuntimeException(s"JSON-RPC error: $message")
def waitForConnectedPeers(atLeast: Int): Unit = {
implicit val loggingOptions: LoggingOptions = LoggingOptions(logRequestBody = false)
log.debug(s"${loggingOptions.prefix} waitForConnectedPeers($atLeast)")
retryWithAttempts { attempt =>
val subsequentLoggingOptions = loggingOptions.copy(logRequest = attempt == 1)
connectedPeersImpl()(subsequentLoggingOptions).tap { x =>
if (x < atLeast) failRetry(s"Expected at least $atLeast, got $x")
}
}
}

protected def connectedPeersImpl()(implicit loggingOptions: LoggingOptions = LoggingOptions()): Int =
basicRequest
.get(uri"$apiUri/peers/connected")
.response(asJson[ConnectedPeersResponse])
.tag(LoggingOptionsTag, loggingOptions)
.send(backend)
.body match {
case Left(HttpError(body, statusCode)) => failRetry(s"Server returned error $body with status ${statusCode.code}")
case Left(DeserializationException(body, error)) => failRetry(s"failed to parse response $body: $error")
case Right(r) => r.peers.length
}
}

object NodeHttpApi {
Expand All @@ -110,6 +155,11 @@ object NodeHttpApi {
implicit val heightResponseFormat: OFormat[HeightResponse] = Json.format[HeightResponse]
}

case class BroadcastResponse(id: String)
object BroadcastResponse {
implicit val broadcastResponseFormat: OFormat[BroadcastResponse] = Json.format[BroadcastResponse]
}

case class TransactionInfoResponse(height: Int, applicationStatus: String)
object TransactionInfoResponse {
implicit val transactionInfoResponseFormat: OFormat[TransactionInfoResponse] = Json.format[TransactionInfoResponse]
Expand All @@ -119,4 +169,9 @@ object NodeHttpApi {
object AssetBalanceResponse {
implicit val assetBalanceResponseFormat: OFormat[AssetBalanceResponse] = Json.format[AssetBalanceResponse]
}

case class ConnectedPeersResponse(peers: List[JsObject])
object ConnectedPeersResponse {
implicit val connectedPeersResponseFormat: OFormat[ConnectedPeersResponse] = Json.format[ConnectedPeersResponse]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package units

import com.google.common.primitives.{Bytes, Ints}
import com.wavesplatform.account.{AddressScheme, KeyPair, SeedKeyPair}
import com.wavesplatform.api.HasRetry
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.crypto
import com.wavesplatform.utils.ScorexLogging
import monix.execution.atomic.AtomicBoolean
import org.scalatest.concurrent.Eventually
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, EitherValues, OptionValues}
Expand All @@ -17,7 +17,6 @@ import units.eth.{EthAddress, Gwei}
import units.test.CustomMatchers

import java.nio.charset.StandardCharsets
import scala.concurrent.duration.DurationInt

trait BaseItTestSuite
extends AnyFreeSpec
Expand All @@ -27,10 +26,8 @@ trait BaseItTestSuite
with CustomMatchers
with EitherValues
with OptionValues
with Eventually
with HasRetry
with HasConsensusLayerDappTxHelpers {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 30.seconds, interval = 1.second)

override val currentHitSource: ByteStr = ByteStr.empty
override val chainContractAccount: KeyPair = mkKeyPair("devnet-1", 2)
protected val rewardAmount: Gwei = Gwei.ofRawGwei(2_000_000_000L)
Expand Down
Loading

0 comments on commit ed49d7f

Please sign in to comment.