diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 217bb3807..6101ef47f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -8,14 +8,16 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector +import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes +import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream @@ -41,21 +43,20 @@ object EthereumChainSpecific : ChainSpecific { override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { return { ms -> -// val pendingTxes: PendingTxesSource = (ms.getAll()) -// .mapNotNull { -// it.getIngressSubscription().getPendingTxes() -// }.let { -// if (it.isEmpty()) { -// NoPendingTxes() -// } else if (it.size == 1) { -// it.first() -// } else { -// AggregatedPendingTxes(it) -// } -// } -// return - - EmptyEgressSubscription + val pendingTxes: PendingTxesSource = (ms.getAll()) + .map { it as GenericUpstream } + .mapNotNull { + (it.getIngressSubscription() as EthereumIngressSubscription).getPendingTxes() + }.let { + if (it.isEmpty()) { + NoPendingTxes() + } else if (it.size == 1) { + it.first() + } else { + AggregatedPendingTxes(it) + } + } + EthereumEgressSubscription(ms, headScheduler, pendingTxes) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 5c5df67ef..3be3e9d9e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -8,10 +8,10 @@ import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent -import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetectorBuilder import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability @@ -19,7 +19,6 @@ import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector import org.springframework.context.ApplicationEventPublisher @@ -194,7 +193,7 @@ open class GenericUpstream( } } - fun getIngressSubscription(): EthereumIngressSubscription { + fun getIngressSubscription(): IngressSubscription { return connector.getIngressSubscription() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt index ca68bf189..0486bb4c3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt @@ -2,8 +2,8 @@ package io.emeraldpay.dshackle.upstream.generic.connectors import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import reactor.core.publisher.Flux interface GenericConnector : Lifecycle { @@ -13,5 +13,5 @@ interface GenericConnector : Lifecycle { fun getIngressReader(): JsonRpcReader - fun getIngressSubscription(): EthereumIngressSubscription + fun getIngressSubscription(): IngressSubscription } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 43ee5ea06..6b2732f92 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -7,9 +7,9 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.MergedHead -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.NoEthereumIngressSubscription @@ -145,7 +145,7 @@ class GenericRpcConnector( return directReader } - override fun getIngressSubscription(): EthereumIngressSubscription { + override fun getIngressSubscription(): IngressSubscription { return NoEthereumIngressSubscription.DEFAULT } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index 3bd33a2c1..2caa57b97 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator @@ -74,7 +75,7 @@ class GenericWsConnector( return reader } - override fun getIngressSubscription(): EthereumIngressSubscription { + override fun getIngressSubscription(): IngressSubscription { return subscriptions } diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy index 6612c4e97..d71c57243 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy @@ -47,7 +47,7 @@ class GenericConnectorMock implements GenericConnector { } @Override - EthereumIngressSubscription getIngressSubscription() { + error.NonExistentClass getIngressSubscription() { return NoEthereumIngressSubscription.DEFAULT } } \ No newline at end of file