Skip to content

Commit

Permalink
change eth block_transactions to long format
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Feb 7, 2024
1 parent ee96e41 commit 8b67cfc
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 156 deletions.
3 changes: 0 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ LABEL org.opencontainers.image.description="The GraphSense Transformation Pipeli
LABEL org.opencontainers.image.source="https://github.com/graphsense/graphsense-spark"

ARG UID=10000
ADD requirements.txt /tmp/requirements.txt

ARG SPARK_UI_PORT=8080

Expand All @@ -17,7 +16,6 @@ RUN apt-get update && \
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | apt-key add && \
apt-get update && \
apt-get install -y --no-install-recommends -y python3-pip python3-setuptools python3-wheel sbt && \
pip3 install -r /tmp/requirements.txt && \
pip3 install cqlsh && \
useradd -m -d /home/dockeruser -r -u $UID dockeruser

Expand Down Expand Up @@ -54,7 +52,6 @@ RUN sbt package && \
cp target/scala-2.12/graphsense-spark*.jar graphsense-spark.jar

ADD docker/ .
ADD scripts/ ./scripts

USER dockeruser

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RELEASE := 'v24.01.1'
RELEASE := 'v24.02.beta1'
# RELEASESEM := 'v1.6.2'

all: format lint build
Expand Down
8 changes: 0 additions & 8 deletions docker/docker-entrypoint.sh

This file was deleted.

1 change: 0 additions & 1 deletion requirements.txt

This file was deleted.

10 changes: 5 additions & 5 deletions src/main/scala/org/graphsense/account/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ case class Balance(
currency: String
)

// case class BlockTransaction(
// blockIdGroup: Int,
// blockId: Int,
// txs: Seq[types.TransactionIdType]
// )
case class BlockTransaction(
blockIdGroup: Int,
blockId: Int,
txs: Seq[types.TransactionIdType]
)
case class BlockTransactionRelational(
blockIdGroup: Int,
blockId: Int,
txId: types.TransactionIdType
Expand Down
18 changes: 3 additions & 15 deletions src/main/scala/org/graphsense/account/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.graphsense.account.models.{
AddressTransactionSecondaryIds,
Balance,
BlockTransaction,
BlockTransactionRelational,
Configuration,
SummaryStatistics,
TokenConfiguration,
Expand All @@ -33,9 +32,8 @@ trait AccountSink {
): Unit
def saveAddressIdsByPrefix(ids: Dataset[AddressIdByAddressPrefix]): Unit
def saveBalances(balances: Dataset[Balance]): Unit
def saveBlockTransactions(blockTxs: Dataset[BlockTransaction]): Unit
def saveBlockTransactionsRelational(
blockTxs: Dataset[BlockTransactionRelational]
def saveBlockTransactions(
blockTxs: Dataset[BlockTransaction]
): Unit
def saveAddressTransactions(addressTxs: Dataset[AddressTransaction]): Unit
def saveAddressTransactionBySecondaryId(
Expand Down Expand Up @@ -143,7 +141,7 @@ class CassandraAccountSink(store: CassandraStorage, keyspace: String)
store.isTableEmpty(keyspace, "block_transactions")
}

override def saveBlockTransactions(
def saveBlockTransactions(
blockTxs: Dataset[BlockTransaction]
): Unit = {
store.store(
Expand All @@ -153,16 +151,6 @@ class CassandraAccountSink(store: CassandraStorage, keyspace: String)
)
}

def saveBlockTransactionsRelational(
blockTxs: Dataset[BlockTransactionRelational]
): Unit = {
store.store(
keyspace,
"block_transactions",
blockTxs
)
}

override def areAddressTransactionsEmtpy(): Boolean = {
store.isTableEmpty(keyspace, "address_transactions")
}
Expand Down
25 changes: 10 additions & 15 deletions src/main/scala/org/graphsense/account/eth/Transformation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.spark.sql.functions.{
min,
row_number,
size,
sort_array,
sum,
to_date,
transform,
Expand Down Expand Up @@ -464,20 +463,16 @@ class EthTransformation(spark: SparkSession, bucketSize: Int) {
blocks: Dataset[Block],
encodedTransactions: Dataset[EncodedTransaction]
): Dataset[BlockTransaction] = {
encodedTransactions
.groupBy("blockId")
.agg(collect_set("transactionId").as("txs"))
.withColumn("txs", sort_array(col("txs")))
.join(
blocks.select(col("blockId")),
Seq("blockId"),
"right"
)
.transform(
TransformHelpers.withIdGroup("blockId", "blockIdGroup", bucketSize)
)
.sort("blockId")
.as[BlockTransaction]
TransformHelpers.toDSEager(
encodedTransactions
.select("blockId", "transactionId")
.withColumnRenamed("transactionId", "txId")
.filter($"txId".isNotNull)
.dropDuplicates("blockId", "txId")
.transform(
TransformHelpers.withIdGroup("blockId", "blockIdGroup", bucketSize)
)
)
}

def computeAddressTransactions(
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/org/graphsense/account/trx/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class TronJob(

printDatasetStats(blockTransactions, "blockTransactions")

sink.saveBlockTransactionsRelational(blockTransactions)
sink.saveBlockTransactions(blockTransactions)
blockTransactions.unpersist(true)
} else {
println("Warning - blockTransactions not empty skipping stage.")
Expand Down
13 changes: 2 additions & 11 deletions src/main/scala/org/graphsense/account/trx/Transformation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -689,17 +689,8 @@ class TrxTransformation(spark: SparkSession, bucketSize: Int) {
def computeBlockTransactions(
blocks: Dataset[Block],
encodedTransactions: Dataset[EncodedTransaction]
): Dataset[BlockTransactionRelational] = {
TransformHelpers.toDSEager(
encodedTransactions
.select("blockId", "transactionId")
.withColumnRenamed("transactionId", "txId")
.filter($"txId".isNotNull)
.dropDuplicates("blockId", "txId")
.transform(
TransformHelpers.withIdGroup("blockId", "blockIdGroup", bucketSize)
)
)
): Dataset[BlockTransaction] = {
ethTransform.computeBlockTransactions(blocks, encodedTransactions)
}

def computeAddressTransactions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,84 +1,10 @@
{"blockId":46147,"txs":[0],"blockIdGroup":23073}
{"blockId":46148,"txs":[],"blockIdGroup":23074}
{"blockId":46149,"txs":[],"blockIdGroup":23074}
{"blockId":46150,"txs":[],"blockIdGroup":23075}
{"blockId":46151,"txs":[],"blockIdGroup":23075}
{"blockId":46152,"txs":[],"blockIdGroup":23076}
{"blockId":46153,"txs":[],"blockIdGroup":23076}
{"blockId":46154,"txs":[],"blockIdGroup":23077}
{"blockId":46155,"txs":[],"blockIdGroup":23077}
{"blockId":46156,"txs":[],"blockIdGroup":23078}
{"blockId":46157,"txs":[],"blockIdGroup":23078}
{"blockId":46158,"txs":[],"blockIdGroup":23079}
{"blockId":46159,"txs":[],"blockIdGroup":23079}
{"blockId":46160,"txs":[],"blockIdGroup":23080}
{"blockId":46161,"txs":[],"blockIdGroup":23080}
{"blockId":46162,"txs":[],"blockIdGroup":23081}
{"blockId":46163,"txs":[],"blockIdGroup":23081}
{"blockId":46164,"txs":[],"blockIdGroup":23082}
{"blockId":46165,"txs":[],"blockIdGroup":23082}
{"blockId":46166,"txs":[],"blockIdGroup":23083}
{"blockId":46167,"txs":[],"blockIdGroup":23083}
{"blockId":46168,"txs":[],"blockIdGroup":23084}
{"blockId":46169,"txs":[1],"blockIdGroup":23084}
{"blockId":46170,"txs":[2],"blockIdGroup":23085}
{"blockId":46171,"txs":[],"blockIdGroup":23085}
{"blockId":46172,"txs":[],"blockIdGroup":23086}
{"blockId":46173,"txs":[],"blockIdGroup":23086}
{"blockId":46174,"txs":[],"blockIdGroup":23087}
{"blockId":46175,"txs":[],"blockIdGroup":23087}
{"blockId":46176,"txs":[],"blockIdGroup":23088}
{"blockId":46177,"txs":[],"blockIdGroup":23088}
{"blockId":46178,"txs":[],"blockIdGroup":23089}
{"blockId":46179,"txs":[],"blockIdGroup":23089}
{"blockId":46180,"txs":[],"blockIdGroup":23090}
{"blockId":46181,"txs":[],"blockIdGroup":23090}
{"blockId":46182,"txs":[],"blockIdGroup":23091}
{"blockId":46183,"txs":[],"blockIdGroup":23091}
{"blockId":46184,"txs":[],"blockIdGroup":23092}
{"blockId":46185,"txs":[],"blockIdGroup":23092}
{"blockId":46186,"txs":[],"blockIdGroup":23093}
{"blockId":46187,"txs":[],"blockIdGroup":23093}
{"blockId":46188,"txs":[],"blockIdGroup":23094}
{"blockId":46189,"txs":[],"blockIdGroup":23094}
{"blockId":46190,"txs":[],"blockIdGroup":23095}
{"blockId":46191,"txs":[],"blockIdGroup":23095}
{"blockId":46192,"txs":[],"blockIdGroup":23096}
{"blockId":46193,"txs":[],"blockIdGroup":23096}
{"blockId":46194,"txs":[3],"blockIdGroup":23097}
{"blockId":46195,"txs":[],"blockIdGroup":23097}
{"blockId":46196,"txs":[],"blockIdGroup":23098}
{"blockId":46197,"txs":[],"blockIdGroup":23098}
{"blockId":46198,"txs":[],"blockIdGroup":23099}
{"blockId":46199,"txs":[],"blockIdGroup":23099}
{"blockId":46200,"txs":[],"blockIdGroup":23100}
{"blockId":46201,"txs":[],"blockIdGroup":23100}
{"blockId":46202,"txs":[],"blockIdGroup":23101}
{"blockId":46203,"txs":[],"blockIdGroup":23101}
{"blockId":46204,"txs":[],"blockIdGroup":23102}
{"blockId":46205,"txs":[4],"blockIdGroup":23102}
{"blockId":46206,"txs":[],"blockIdGroup":23103}
{"blockId":46207,"txs":[],"blockIdGroup":23103}
{"blockId":46208,"txs":[],"blockIdGroup":23104}
{"blockId":46209,"txs":[],"blockIdGroup":23104}
{"blockId":46210,"txs":[],"blockIdGroup":23105}
{"blockId":46211,"txs":[],"blockIdGroup":23105}
{"blockId":46212,"txs":[],"blockIdGroup":23106}
{"blockId":46213,"txs":[],"blockIdGroup":23106}
{"blockId":46214,"txs":[5],"blockIdGroup":23107}
{"blockId":46215,"txs":[],"blockIdGroup":23107}
{"blockId":46216,"txs":[],"blockIdGroup":23108}
{"blockId":46217,"txs":[6],"blockIdGroup":23108}
{"blockId":46218,"txs":[],"blockIdGroup":23109}
{"blockId":46219,"txs":[7],"blockIdGroup":23109}
{"blockId":46220,"txs":[8],"blockIdGroup":23110}
{"blockId":46221,"txs":[],"blockIdGroup":23110}
{"blockId":46222,"txs":[],"blockIdGroup":23111}
{"blockId":46223,"txs":[],"blockIdGroup":23111}
{"blockId":46224,"txs":[],"blockIdGroup":23112}
{"blockId":46225,"txs":[],"blockIdGroup":23112}
{"blockId":46226,"txs":[],"blockIdGroup":23113}
{"blockId":46227,"txs":[],"blockIdGroup":23113}
{"blockId":46228,"txs":[],"blockIdGroup":23114}
{"blockId":46229,"txs":[],"blockIdGroup":23114}
{"blockId":46230,"txs":[9],"blockIdGroup":23115}
{"blockId": 46147, "blockIdGroup": 23073, "txId": 0}
{"blockId": 46169, "blockIdGroup": 23084, "txId": 1}
{"blockId": 46170, "blockIdGroup": 23085, "txId": 2}
{"blockId": 46194, "blockIdGroup": 23097, "txId": 3}
{"blockId": 46205, "blockIdGroup": 23102, "txId": 4}
{"blockId": 46214, "blockIdGroup": 23107, "txId": 5}
{"blockId": 46217, "blockIdGroup": 23108, "txId": 6}
{"blockId": 46219, "blockIdGroup": 23109, "txId": 7}
{"blockId": 46220, "blockIdGroup": 23110, "txId": 8}
{"blockId": 46230, "blockIdGroup": 23115, "txId": 9}
5 changes: 2 additions & 3 deletions src/test/scala/org/graphsense/account/eth/TokensTest.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.graphsense.account.eth

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{array_distinct, col, forall, lit, size}
import org.apache.spark.sql.functions.{col, forall, lit}
import org.graphsense.account.Implicits._
import org.graphsense.account.models.{Address, AddressRelation, TokenTransfer}
import org.graphsense.TestBase
Expand Down Expand Up @@ -114,8 +114,7 @@ class TokenTest extends TestBase {

assert(
blockTransactions
.filter(size(col("txs")) =!= size(array_distinct(col("txs"))))
.count() === 0,
.count() === blockTransactions.dropDuplicates("txId").count(),
"duplicates in block transactions"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.graphsense.account.eth

import org.apache.spark.sql.functions.{array_distinct, col, max, size}
import org.apache.spark.sql.functions.{col, max}
import org.graphsense.account.models.{
Address,
AddressId,
Expand Down Expand Up @@ -180,13 +180,11 @@ class TransformationTest extends TestBase {
)
}

test("no duplicates in block txs") {
assert(
blockTransactions
.filter(size(col("txs")) =!= size(array_distinct(col("txs"))))
.count() === 0
)
}
assert(
blockTransactions
.count() === blockTransactions.dropDuplicates("txId").count(),
"duplicates in block transactions"
)

test("Address IDs") {
assertDataFrameEquality(addressIds, addressIdsRef)
Expand All @@ -210,6 +208,8 @@ class TransformationTest extends TestBase {

note("Test blocks")

blockTransactions.show(100)
blockTransactionsRef.show(100)
test("Block transactions") {
assertDataFrameEquality(blockTransactions, blockTransactionsRef)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TransformationTest extends TestBase {
encodedTransactions: Dataset[EncodedTransaction],
encodedTokenTransfers: Dataset[EncodedTokenTransfer],
addressTransactions: Dataset[AddressTransaction],
blockTransactions: Dataset[BlockTransactionRelational],
blockTransactions: Dataset[BlockTransaction],
addressRelations: Dataset[AddressRelation]
)

Expand Down

0 comments on commit 8b67cfc

Please sign in to comment.