Skip to content

Commit

Permalink
add max-block parameter for all currencies
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Nov 14, 2024
1 parent e3fd7b5 commit 9b72531
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## Unreleased
## [24.11.0] 2024-11-14
### Changed
- Upgrade to Spark 3.5.3
- Upgrade DataStax Spark Cassandra connector to 3.5.1
### Added
- max-block cli parameter for utxo currencies and eth to test with smaller datasets.

## [24.02.0] 2024-03-04
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RELEASE := 'v24.02.0'
RELEASE := 'v24.11.0'
# RELEASESEM := 'v1.6.2'

all: format lint build
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/org/graphsense/account/eth/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,21 @@ class EthereumJob(

val maxBlockExchangeRates =
exchangeRates.select(max(col("blockId"))).first.getInt(0)

val maxBlockToProcess =
Math.min(
maxBlockExchangeRates,
to.getOrElse(maxBlockExchangeRates)
)

val blocksFiltered =
blocks.filter(col("blockId") <= maxBlockExchangeRates).persist()
blocks.filter(col("blockId") <= maxBlockToProcess).persist()
val transactionsFiltered =
transactions.filter(col("blockId") <= maxBlockExchangeRates).persist()
transactions.filter(col("blockId") <= maxBlockToProcess).persist()
val tracesFiltered =
traces.filter(col("blockId") <= maxBlockExchangeRates).persist()
traces.filter(col("blockId") <= maxBlockToProcess).persist()
val tokenTransfersFiltered = tokenTransfers
.filter(col("blockId") <= maxBlockExchangeRates)
.filter(col("blockId") <= maxBlockToProcess)
.persist()

val maxBlock = blocksFiltered
Expand All @@ -88,11 +95,11 @@ class EthereumJob(
val maxBlockDatetime =
maxBlock.select(col("maxBlockDatetime")).first.getString(0)

val noBlocks = maxBlockExchangeRates.toLong + 1
val noBlocks = maxBlockToProcess.toLong + 1
val noTransactions = transactionsFiltered.count()

println(s"Max block timestamp: ${maxBlockDatetime}")
println(s"Max block ID: ${maxBlockExchangeRates}")
println(s"Max block ID: ${maxBlockToProcess}")
println(s"Max transaction ID: ${noTransactions - 1}")

println("Computing transaction IDs")
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/org/graphsense/utxo/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,11 @@ class UtxoConf(arguments: Seq[String]) extends ScallopConf(arguments) {
noshort = true,
descr = "Spark checkpoint directory (HFDS in non-local mode)"
)
val maxBlock: ScallopOption[Int] = opt[Int](
"max-block",
default = None,
noshort = true,
descr = "Max block that is used for the transformation job"
)
verify()
}
15 changes: 11 additions & 4 deletions src/main/scala/org/graphsense/utxo/TransformationJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,18 @@ object TransformationJob {

val maxBlockExchangeRates =
exchangeRates.select(max(col(F.blockId))).first.getInt(0)

val maxBlockToProcess =
Math.min(
maxBlockExchangeRates,
conf.maxBlock.toOption.getOrElse(maxBlockExchangeRates)
)

val transactionsFiltered =
transactions.filter(col(F.blockId) <= maxBlockExchangeRates).persist()
transactions.filter(col(F.blockId) <= maxBlockToProcess).persist()

val maxBlock = blocks
.filter(col(F.blockId) <= maxBlockExchangeRates)
.filter(col(F.blockId) <= maxBlockToProcess)
.select(
max(col(F.blockId)).as("maxBlockId"),
max(col(F.timestamp)).as("maxBlockTimestamp")
Expand All @@ -87,11 +94,11 @@ object TransformationJob {
maxBlock.select(col("maxBlockDatetime")).first.getString(0)
val maxTransactionId =
transactionsFiltered.select(max(F.txId)).first.getLong(0)
val noBlocks = maxBlockExchangeRates + 1
val noBlocks = maxBlockToProcess + 1
val noTransactions = maxTransactionId + 1

println(s"Max block timestamp: ${maxBlockDatetime}")
println(s"Max block ID: ${maxBlockExchangeRates}")
println(s"Max block ID: ${maxBlockToProcess}")
println(s"Max transaction ID: ${maxTransactionId}")

println("Extracting transaction inputs")
Expand Down

0 comments on commit 9b72531

Please sign in to comment.