Skip to content

Commit

Permalink
ethereum as generic upstream - subs
Browse files Browse the repository at this point in the history
  • Loading branch information
a10zn8 committed Oct 31, 2023
1 parent 02567c4 commit 4b6d787
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource
import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
Expand All @@ -41,21 +43,20 @@ object EthereumChainSpecific : ChainSpecific {

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { ms ->
// val pendingTxes: PendingTxesSource = (ms.getAll())
// .mapNotNull {
// it.getIngressSubscription().getPendingTxes()
// }.let {
// if (it.isEmpty()) {
// NoPendingTxes()
// } else if (it.size == 1) {
// it.first()
// } else {
// AggregatedPendingTxes(it)
// }
// }
// return

EmptyEgressSubscription
val pendingTxes: PendingTxesSource = (ms.getAll())
.map { it as GenericUpstream }
.mapNotNull {
(it.getIngressSubscription() as EthereumIngressSubscription).getPendingTxes()
}.let {
if (it.isEmpty()) {
NoPendingTxes()
} else if (it.size == 1) {
it.first()
} else {
AggregatedPendingTxes(it)
}
}
EthereumEgressSubscription(ms, headScheduler, pendingTxes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetectorBuilder
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription
import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory
import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector
import org.springframework.context.ApplicationEventPublisher
Expand Down Expand Up @@ -194,7 +193,7 @@ open class GenericUpstream(
}
}

fun getIngressSubscription(): EthereumIngressSubscription {
fun getIngressSubscription(): IngressSubscription {
return connector.getIngressSubscription()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package io.emeraldpay.dshackle.upstream.generic.connectors

import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription
import reactor.core.publisher.Flux

interface GenericConnector : Lifecycle {
Expand All @@ -13,5 +13,5 @@ interface GenericConnector : Lifecycle {

fun getIngressReader(): JsonRpcReader

fun getIngressSubscription(): EthereumIngressSubscription
fun getIngressSubscription(): IngressSubscription
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.MergedHead
import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator
import io.emeraldpay.dshackle.upstream.ethereum.NoEthereumIngressSubscription
Expand Down Expand Up @@ -145,7 +145,7 @@ class GenericRpcConnector(
return directReader
}

override fun getIngressSubscription(): EthereumIngressSubscription {
override fun getIngressSubscription(): IngressSubscription {
return NoEthereumIngressSubscription.DEFAULT
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator
Expand Down Expand Up @@ -74,7 +75,7 @@ class GenericWsConnector(
return reader
}

override fun getIngressSubscription(): EthereumIngressSubscription {
override fun getIngressSubscription(): IngressSubscription {
return subscriptions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GenericConnectorMock implements GenericConnector {
}

@Override
EthereumIngressSubscription getIngressSubscription() {
error.NonExistentClass getIngressSubscription() {
return NoEthereumIngressSubscription.DEFAULT
}
}

0 comments on commit 4b6d787

Please sign in to comment.