Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logs #21

Open
wants to merge 3 commits into
base: rename-block-classes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/main/scala/units/ClientError.scala

This file was deleted.

181 changes: 96 additions & 85 deletions src/main/scala/units/ELUpdater.scala

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/main/scala/units/client/JsonRpcClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package units.client
import cats.Id
import cats.syntax.either.*
import units.client.JsonRpcClient.DefaultTimeout
import units.{ClientConfig, ClientError}
import units.ClientConfig
import play.api.libs.json.{JsError, JsValue, Reads, Writes}
import sttp.client3.*
import sttp.client3.playJson.*
Expand All @@ -23,8 +23,8 @@ trait JsonRpcClient {
protected def sendRequest[RQ: Writes, RP: Reads](requestBody: RQ, timeout: FiniteDuration = DefaultTimeout): Either[String, Option[RP]] =
sendRequest(mkRequest(requestBody, timeout), config.apiRequestRetries)

protected def parseJson[A: Reads](jsValue: JsValue): Either[ClientError, A] =
Try(jsValue.as[A]).toEither.leftMap(err => ClientError(s"Response parse error: ${err.getMessage}"))
protected def parseJson[A: Reads](jsValue: JsValue): Either[String, A] =
Try(jsValue.as[A]).toEither.leftMap(err => s"Response parse error: ${err.getMessage}")

private def mkRequest[A: Writes, B: Reads](requestBody: A, timeout: FiniteDuration): RpcRequest[B] = {
val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/units/client/contract/ContractFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import com.wavesplatform.lang.v1.FunctionHeader
import com.wavesplatform.lang.v1.compiler.Terms.{CONST_BYTESTR, CONST_LONG, CONST_STRING, EVALUATED, FUNCTION_CALL}
import org.web3j.utils.Numeric.cleanHexPrefix
import units.util.HexBytesConverter.toHexNoPrefix
import units.{BlockHash, ClientError, JobResult}
import units.BlockHash

abstract class ContractFunction(name: String, reference: BlockHash, extraArgs: Either[CommonError, List[EVALUATED]]) {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): JobResult[FUNCTION_CALL] = (for {
def toFunctionCall(blockHash: BlockHash, transfersRootHash: Digest, lastC2ETransferIndex: Long): Either[String, FUNCTION_CALL] = (for {
hash <- CONST_STRING(cleanHexPrefix(blockHash))
ref <- CONST_STRING(cleanHexPrefix(reference))
trh <- CONST_STRING(toHexNoPrefix(transfersRootHash))
xtra <- extraArgs
} yield FUNCTION_CALL(
FunctionHeader.User(name),
List(hash, ref) ++ xtra ++ List(trh, CONST_LONG(lastC2ETransferIndex))
)).leftMap(e => ClientError(s"Error building function call for $name: $e"))
)).leftMap(e => s"Error building function call for $name: $e")
}

object ContractFunction {
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/units/client/engine/EngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import play.api.libs.json.*
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, JobResult}
import units.BlockHash

trait EngineApiClient {
def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus]
def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus]

def forkChoiceUpdatedWithPayloadId(
lastBlockHash: BlockHash,
Expand All @@ -16,25 +16,25 @@ trait EngineApiClient {
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal] = Vector.empty
): JobResult[PayloadId]
): Either[String, PayloadId]

def getPayload(payloadId: PayloadId): JobResult[JsObject]
def getPayload(payloadId: PayloadId): Either[String, JsObject]

def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]]
def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]]

def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]]
def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]]

def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]]
def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]]

def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]]
def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]]

def getLatestBlock: JobResult[ExecutionPayload]
def getLatestBlock: Either[String, ExecutionPayload]

def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]]
def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]]

def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData]
def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData]

def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]]
def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]]
}

object EngineApiClient {
Expand Down
60 changes: 30 additions & 30 deletions src/main/scala/units/client/engine/HttpEngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ import units.client.engine.model.*
import units.client.engine.model.ForkChoiceUpdatedRequest.ForkChoiceAttributes
import units.client.engine.model.PayloadStatus.{Syncing, Valid}
import units.eth.EthAddress
import units.{BlockHash, ClientConfig, ClientError, JobResult}
import units.{BlockHash, ClientConfig}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Identity, ?]) extends EngineApiClient with JsonRpcClient {

val apiUrl: Uri = uri"${config.executionClientAddress}"

def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] = {
def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] = {
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](
ForkChoiceUpdatedRequest(blockHash, finalizedBlockHash, None),
BlockExecutionTimeout
)
.flatMap {
case ForkChoiceUpdatedResponse(ps @ PayloadState(Valid | Syncing, _, _), None) => Right(ps.status)
case ForkChoiceUpdatedResponse(PayloadState(_, _, Some(validationError)), _) =>
Left(ClientError(s"Payload validation error: $validationError"))
case ForkChoiceUpdatedResponse(payloadState, _) => Left(ClientError(s"Unexpected payload status ${payloadState.status}"))
Left(s"Payload validation error: $validationError")
case ForkChoiceUpdatedResponse(payloadState, _) => Left(s"Unexpected payload status ${payloadState.status}")
}
}

Expand All @@ -40,7 +40,7 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal]
): JobResult[PayloadId] = {
): Either[String, PayloadId] = {
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](
ForkChoiceUpdatedRequest(
lastBlockHash,
Expand All @@ -52,74 +52,74 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
case ForkChoiceUpdatedResponse(PayloadState(Valid, _, _), Some(payloadId)) =>
Right(payloadId)
case ForkChoiceUpdatedResponse(_, None) =>
Left(ClientError(s"Payload id for $lastBlockHash is not defined"))
Left(s"Payload id for $lastBlockHash is not defined")
case ForkChoiceUpdatedResponse(PayloadState(_, _, Some(validationError)), _) =>
Left(ClientError(s"Payload validation error for $lastBlockHash: $validationError"))
Left(s"Payload validation error for $lastBlockHash: $validationError")
case ForkChoiceUpdatedResponse(payloadState, _) =>
Left(ClientError(s"Unexpected payload status for $lastBlockHash: ${payloadState.status}"))
Left(s"Unexpected payload status for $lastBlockHash: ${payloadState.status}")
}
}

def getPayload(payloadId: PayloadId): JobResult[JsObject] = {
def getPayload(payloadId: PayloadId): Either[String, JsObject] = {
sendEngineRequest[GetPayloadRequest, GetPayloadResponse](GetPayloadRequest(payloadId), NonBlockExecutionTimeout).map(_.executionPayload)
}

def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] = {
def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] = {
sendEngineRequest[NewPayloadRequest, PayloadState](NewPayloadRequest(payloadJson), BlockExecutionTimeout).flatMap {
case PayloadState(_, _, Some(validationError)) => Left(ClientError(s"Payload validation error: $validationError"))
case PayloadState(_, _, Some(validationError)) => Left(s"Payload validation error: $validationError")
case PayloadState(Valid, Some(latestValidHash), _) => Right(Some(latestValidHash))
case PayloadState(Syncing, latestValidHash, _) => Right(latestValidHash)
case PayloadState(status, None, _) => Left(ClientError(s"Latest valid hash is not defined at status $status"))
case PayloadState(status, _, _) => Left(ClientError(s"Unexpected payload status: $status"))
case PayloadState(status, None, _) => Left(s"Latest valid hash is not defined at status $status")
case PayloadState(status, _, _) => Left(s"Unexpected payload status: $status")
}
}

def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] = {
def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] = {
sendEngineRequest[GetPayloadBodyByHash, JsArray](GetPayloadBodyByHash(hash), NonBlockExecutionTimeout)
.map(_.value.headOption.flatMap(_.asOpt[JsObject]))
}

def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] = {
def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] = {
for {
json <- sendRequest[GetBlockByNumberRequest, JsObject](GetBlockByNumberRequest(number.str))
.leftMap(err => ClientError(s"Error getting payload by number $number: $err"))
.leftMap(err => s"Error getting payload by number $number: $err")
blockMeta <- json.traverse(parseJson[ExecutionPayload](_))
} yield blockMeta
}

def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] = {
def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] = {
sendRequest[GetBlockByHashRequest, ExecutionPayload](GetBlockByHashRequest(hash))
.leftMap(err => ClientError(s"Error getting payload by hash $hash: $err"))
.leftMap(err => s"Error getting payload by hash $hash: $err")
}

def getLatestBlock: JobResult[ExecutionPayload] = for {
def getLatestBlock: Either[String, ExecutionPayload] = for {
lastPayloadOpt <- getBlockByNumber(BlockNumber.Latest)
lastPayload <- Either.fromOption(lastPayloadOpt, ClientError("Impossible: EC doesn't have payloads"))
lastPayload <- Either.fromOption(lastPayloadOpt, "Impossible: EC doesn't have payloads")
} yield lastPayload

def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] = {
def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] = {
sendRequest[GetBlockByHashRequest, JsObject](GetBlockByHashRequest(hash))
.leftMap(err => ClientError(s"Error getting block json by hash $hash: $err"))
.leftMap(err => s"Error getting block json by hash $hash: $err")
}

def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] = {
def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] = {
for {
blockJsonOpt <- getBlockJsonByHash(hash)
blockJson <- Either.fromOption(blockJsonOpt, ClientError("block not found"))
blockJson <- Either.fromOption(blockJsonOpt, "block not found")
payloadBodyJsonOpt <- getPayloadBodyByHash(hash)
payloadBodyJson <- Either.fromOption(payloadBodyJsonOpt, ClientError("payload body not found"))
payloadBodyJson <- Either.fromOption(payloadBodyJsonOpt, "payload body not found")
} yield PayloadJsonData(blockJson, payloadBodyJson)
}

def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] =
def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] =
sendRequest[GetLogsRequest, List[GetLogsResponseEntry]](GetLogsRequest(hash, address, List(topic)))
.leftMap(err => ClientError(s"Error getting block logs by hash $hash: $err"))
.leftMap(err => s"Error getting block logs by hash $hash: $err")
.map(_.getOrElse(List.empty))

private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): JobResult[B] = {
private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): Either[String, B] = {
sendRequest(request, timeout) match {
case Right(response) => response.toRight(ClientError(s"Unexpected engine API empty response"))
case Left(err) => Left(ClientError(s"Engine API request error: $err"))
case Right(response) => response.toRight(s"Unexpected engine API empty response")
case Left(err) => Left(s"Engine API request error: $err")
}
}
}
Expand Down
54 changes: 26 additions & 28 deletions src/main/scala/units/client/engine/LoggedEngineApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.LoggedEngineApiClient.excludedJsonFields
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, JobResult}
import units.BlockHash

import java.util.concurrent.ThreadLocalRandom
import scala.util.chaining.scalaUtilChainingOps

class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient {
protected val log: LoggerFacade = LoggerFacade(LoggerFactory.getLogger(underlying.getClass))

override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): JobResult[PayloadStatus] =
wrap(s"forkChoiceUpdated($blockHash, f=$finalizedBlockHash)", underlying.forkChoiceUpdated(blockHash, finalizedBlockHash))
override def forkChoiceUpdated(blockHash: BlockHash, finalizedBlockHash: BlockHash): Either[String, PayloadStatus] =
wrap(s"forkChoiceUpdated($blockHash, f=$finalizedBlockHash)", underlying.forkChoiceUpdated(blockHash, finalizedBlockHash))()

override def forkChoiceUpdatedWithPayloadId(
lastBlockHash: BlockHash,
Expand All @@ -25,50 +25,48 @@ class LoggedEngineApiClient(underlying: EngineApiClient) extends EngineApiClient
suggestedFeeRecipient: EthAddress,
prevRandao: String,
withdrawals: Vector[Withdrawal]
): JobResult[PayloadId] = wrap(
): Either[String, PayloadId] = wrap(
s"forkChoiceUpdatedWithPayloadId(l=$lastBlockHash, f=$finalizedBlockHash, ts=$unixEpochSeconds, m=$suggestedFeeRecipient, " +
s"r=$prevRandao, w={${withdrawals.mkString(", ")}}",
underlying.forkChoiceUpdatedWithPayloadId(lastBlockHash, finalizedBlockHash, unixEpochSeconds, suggestedFeeRecipient, prevRandao, withdrawals)
)
)()

override def getPayload(payloadId: PayloadId): JobResult[JsObject] =
wrap(s"getPayload($payloadId)", underlying.getPayload(payloadId), filteredJsonStr)
override def getPayload(payloadId: PayloadId): Either[String, JsObject] =
wrap(s"getPayload($payloadId)", underlying.getPayload(payloadId))(filteredJsonStr)

override def applyNewPayload(payloadJson: JsObject): JobResult[Option[BlockHash]] =
wrap(s"applyNewPayload(${filteredJsonStr(payloadJson)})", underlying.applyNewPayload(payloadJson), _.fold("None")(identity))
override def applyNewPayload(payloadJson: JsObject): Either[String, Option[BlockHash]] =
wrap(s"applyNewPayload(${filteredJsonStr(payloadJson)})", underlying.applyNewPayload(payloadJson))(_.fold("None")(identity))

override def getPayloadBodyByHash(hash: BlockHash): JobResult[Option[JsObject]] =
wrap(s"getPayloadBodyByHash($hash)", underlying.getPayloadBodyByHash(hash), _.fold("None")(filteredJsonStr))
override def getPayloadBodyByHash(hash: BlockHash): Either[String, Option[JsObject]] =
wrap(s"getPayloadBodyByHash($hash)", underlying.getPayloadBodyByHash(hash))(_.fold("None")(filteredJsonStr))

override def getBlockByNumber(number: BlockNumber): JobResult[Option[ExecutionPayload]] =
wrap(s"getBlockByNumber($number)", underlying.getBlockByNumber(number), _.fold("None")(_.toString))
override def getBlockByNumber(number: BlockNumber): Either[String, Option[ExecutionPayload]] =
wrap(s"getBlockByNumber($number)", underlying.getBlockByNumber(number))(_.fold("None")(_.toString))

override def getBlockByHash(hash: BlockHash): JobResult[Option[ExecutionPayload]] =
wrap(s"getBlockByHash($hash)", underlying.getBlockByHash(hash), _.fold("None")(_.toString))
override def getBlockByHash(hash: BlockHash): Either[String, Option[ExecutionPayload]] =
wrap(s"getBlockByHash($hash)", underlying.getBlockByHash(hash))(_.fold("None")(_.toString))

override def getLatestBlock: JobResult[ExecutionPayload] =
wrap("getLatestBlock", underlying.getLatestBlock)
override def getLatestBlock: Either[String, ExecutionPayload] =
wrap("getLatestBlock", underlying.getLatestBlock)()

override def getBlockJsonByHash(hash: BlockHash): JobResult[Option[JsObject]] =
wrap(s"getBlockJsonByHash($hash)", underlying.getBlockJsonByHash(hash), _.fold("None")(filteredJsonStr))
override def getBlockJsonByHash(hash: BlockHash): Either[String, Option[JsObject]] =
wrap(s"getBlockJsonByHash($hash)", underlying.getBlockJsonByHash(hash))(_.fold("None")(filteredJsonStr))

override def getPayloadJsonDataByHash(hash: BlockHash): JobResult[PayloadJsonData] = {
wrap(
s"getPayloadJsonDataByHash($hash)",
underlying.getPayloadJsonDataByHash(hash),
pjd => PayloadJsonData(filteredJson(pjd.blockJson), filteredJson(pjd.bodyJson)).toString
override def getPayloadJsonDataByHash(hash: BlockHash): Either[String, PayloadJsonData] = {
wrap(s"getPayloadJsonDataByHash($hash)", underlying.getPayloadJsonDataByHash(hash))(pjd =>
PayloadJsonData(filteredJson(pjd.blockJson), filteredJson(pjd.bodyJson)).toString
)
}

override def getLogs(hash: BlockHash, address: EthAddress, topic: String): JobResult[List[GetLogsResponseEntry]] =
wrap(s"getLogs($hash, a=$address, t=$topic)", underlying.getLogs(hash, address, topic), _.view.map(_.data).mkString("{", ", ", "}"))
override def getLogs(hash: BlockHash, address: EthAddress, topic: String): Either[String, List[GetLogsResponseEntry]] =
wrap(s"getLogs($hash, a=$address, t=$topic)", underlying.getLogs(hash, address, topic))(_.view.map(_.data).mkString("{", ", ", "}"))

protected def wrap[R](method: String, f: => JobResult[R], toMsg: R => String = (_: R).toString): JobResult[R] = {
protected def wrap[R](method: String, f: => Either[String, R])(toMsg: R => String = (_: R).toString): Either[String, R] = {
val currRequestId = ThreadLocalRandom.current().nextInt(10000, 100000).toString
log.debug(s"[$currRequestId] $method")

f.tap {
case Left(e) => log.debug(s"[$currRequestId] Error: ${e.message}")
case Left(e) => log.debug(s"[$currRequestId] Error: $e")
case Right(r) => log.debug(s"[$currRequestId] Success: ${toMsg(r)}")
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/units/network/HistoryReplier.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package units.network

import cats.syntax.either.*
import com.wavesplatform.network.id
import com.wavesplatform.utils.ScorexLogging
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.execution.Scheduler
import units.client.engine.EngineApiClient
import units.{BlockHash, ClientError}
import units.BlockHash

import scala.concurrent.Future
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -39,9 +38,9 @@ class HistoryReplier(engineApiClient: EngineApiClient)(implicit sc: Scheduler) e
case _ => super.channelRead(ctx, msg)
}

private def loadPayload(hash: BlockHash): Future[Either[ClientError, PayloadMessage]] = Future {
private def loadPayload(hash: BlockHash): Future[Either[String, PayloadMessage]] = Future {
engineApiClient.getPayloadJsonDataByHash(hash).flatMap { payloadJsonData =>
PayloadMessage(payloadJsonData.toPayloadJson).leftMap(ClientError.apply)
PayloadMessage(payloadJsonData.toPayloadJson)
}
}
}
3 changes: 1 addition & 2 deletions src/main/scala/units/package.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package object units {
type BlockHash = BlockHash.Type
type JobResult[A] = Either[ClientError, A]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't say that I totally disagree with this decision, but wondering, why do we remove JobResult? It looks concise and hides the left part, so we haven't write it each time.
We can even rename it to Result to make it shorter and remove Job part, because it is not a "job" anymore, just a computed value.

type BlockHash = BlockHash.Type
}
Loading