Skip to content

Commit

Permalink
rename zeroValueIfNull to zeroCurrencyValueIfNull, add zeroCurrencyVa…
Browse files Browse the repository at this point in the history
…lueIfNullSafe that does not fail on null values, add debug prints
  • Loading branch information
soad003 committed Jan 31, 2024
1 parent c75113c commit ee96e41
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 10 deletions.
41 changes: 39 additions & 2 deletions src/main/scala/org/graphsense/TransformHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import org.apache.spark.sql.functions.{
struct,
substring,
sum,
typedLit
typedLit,
when
}
import org.apache.spark.sql.types.{DataType, FloatType, IntegerType}
import org.graphsense.models.ExchangeRatesRaw
Expand Down Expand Up @@ -123,7 +124,18 @@ object TransformHelpers {
currencies.rdd.map(r => r(0).asInstanceOf[Seq[String]]).collect()(0)
}

def zeroValueIfNull(
def getZeroCurrencyValue(
length: Int,
castValueTo: DataType = IntegerType
) = {
struct(
lit(0).cast(castValueTo).as("value"),
typedLit(Array.fill[Float](length)(0))
.as("fiatValues")
)
}

def zeroCurrencyValueIfNull(
columnName: String,
length: Int,
castValueTo: DataType = IntegerType
Expand All @@ -143,6 +155,31 @@ object TransformHelpers {
)
}

def zeroCurrencyValueIfNullSafe(
columnName: String,
length: Int,
castValueTo: DataType = IntegerType
)(
df: DataFrame
): DataFrame = {
val zero = getZeroCurrencyValue(length, castValueTo)
df.withColumn(
columnName,
when(
col(f"${columnName}.value").isNull || col(
f"${columnName}.fiatValues"
).isNull,
zero
)
.otherwise(
coalesce(
col(columnName),
zero
)
)
)
}

def aggregateValues(
valueColumn: String,
fiatValueColumn: String,
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/org/graphsense/account/eth/Transformation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class EthTransformation(spark: SparkSession, bucketSize: Int) {
)
)
}
traces
val etxs = traces
.filter(col("status") === 1)
.withColumnRenamed("txHash", "transaction")
.join(
Expand Down Expand Up @@ -451,6 +451,13 @@ class EthTransformation(spark: SparkSession, bucketSize: Int) {
.join(broadcast(exchangeRates), Seq("blockId"), "left")
.transform(toFiatCurrency("value", "fiatValues"))
.as[EncodedTransaction]

println(
"Transactions with null values; This should be empty; Maybe missing exchange rates"
)
etxs.filter($"value".isNull || $"fiatValues".isNull).show(100)

etxs
}

def computeBlockTransactions(
Expand Down Expand Up @@ -706,14 +713,14 @@ class EthTransformation(spark: SparkSession, bucketSize: Int) {
.na
.fill(false, Seq("isContract"))
.transform(
TransformHelpers.zeroValueIfNull(
TransformHelpers.zeroCurrencyValueIfNullSafe(
"totalReceived",
noFiatCurrencies.get,
castValueTo = DecimalType(38, 0)
)
)
.transform(
TransformHelpers.zeroValueIfNull(
TransformHelpers.zeroCurrencyValueIfNullSafe(
"totalSpent",
noFiatCurrencies.get,
castValueTo = DecimalType(38, 0)
Expand Down Expand Up @@ -816,7 +823,7 @@ class EthTransformation(spark: SparkSession, bucketSize: Int) {
)
)
.transform(
TransformHelpers.zeroValueIfNull(
TransformHelpers.zeroCurrencyValueIfNullSafe(
"value",
noFiatCurrencies.get,
castValueTo = DecimalType(38, 0)
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/org/graphsense/utxo/Transformation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,12 @@ class Transformation(
.na
.fill(0)
.transform(
TransformHelpers.zeroValueIfNull(F.totalReceived, noFiatCurrencies.get)
TransformHelpers
.zeroCurrencyValueIfNull(F.totalReceived, noFiatCurrencies.get)
)
.transform(
TransformHelpers.zeroValueIfNull(F.totalSpent, noFiatCurrencies.get)
TransformHelpers
.zeroCurrencyValueIfNull(F.totalSpent, noFiatCurrencies.get)
)
}

Expand Down Expand Up @@ -531,10 +533,11 @@ class Transformation(
.join(totalSpentAdj, Seq("clusterId"), "left")
.transform(
TransformHelpers
.zeroValueIfNull(F.totalReceivedAdj, noFiatCurrencies.get)
.zeroCurrencyValueIfNull(F.totalReceivedAdj, noFiatCurrencies.get)
)
.transform(
TransformHelpers.zeroValueIfNull(F.totalSpentAdj, noFiatCurrencies.get)
TransformHelpers
.zeroCurrencyValueIfNull(F.totalSpentAdj, noFiatCurrencies.get)
)
.transform(
TransformHelpers.withIdGroup(F.clusterId, F.clusterIdGroup, bucketSize)
Expand Down

0 comments on commit ee96e41

Please sign in to comment.