diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 48fca9a0b..11ba2e261 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -280,7 +280,7 @@ impl KaspaCli { msg = multiplexer.receiver.recv().fuse() => { if let Ok(msg) = msg { - match msg { + match *msg { Events::UtxoProcStart => {}, Events::UtxoProcStop => {}, Events::UtxoProcError { message } => { diff --git a/cli/src/modules/monitor.rs b/cli/src/modules/monitor.rs index 4b51b2435..5c2e11df9 100644 --- a/cli/src/modules/monitor.rs +++ b/cli/src/modules/monitor.rs @@ -91,7 +91,7 @@ impl Monitor { Ok(()) } - async fn redraw(self: &Arc, ctx: &Arc, events: &Arc>>) -> Result<()> { + async fn redraw(self: &Arc, ctx: &Arc, events: &Arc>>>) -> Result<()> { tprint!(ctx, "{}", ClearScreen); tprint!(ctx, "{}", Goto(1, 1)); @@ -108,7 +108,7 @@ impl Monitor { ctx.list().await?; let events = events.lock().unwrap(); - events.iter().for_each(|event| match event { + events.iter().for_each(|event| match event.deref() { Events::DAAScoreChange { .. } => {} Events::Balance { balance, id, mature_utxo_size, pending_utxo_size } => { let network_id = wallet.network_id().expect("missing network type"); diff --git a/wallet/core/src/runtime/sync.rs b/wallet/core/src/runtime/sync.rs index 68da4bef5..91c3c910b 100644 --- a/wallet/core/src/runtime/sync.rs +++ b/wallet/core/src/runtime/sync.rs @@ -6,7 +6,7 @@ use regex::Regex; struct Inner { task_ctl: DuplexChannel, rpc: Arc, - multiplexer: Multiplexer, + multiplexer: Multiplexer>, running: AtomicBool, is_synced: AtomicBool, state_observer: StateObserver, @@ -18,7 +18,7 @@ pub struct SyncMonitor { } impl SyncMonitor { - pub fn new(rpc: &Arc, multiplexer: &Multiplexer) -> Self { + pub fn new(rpc: &Arc, multiplexer: &Multiplexer>) -> Self { Self { inner: Arc::new(Inner { rpc: rpc.clone(), @@ -74,19 +74,19 @@ impl SyncMonitor { &self.inner.rpc } - pub fn multiplexer(&self) -> &Multiplexer { + pub fn multiplexer(&self) -> &Multiplexer> { &self.inner.multiplexer } pub async fn notify(&self, event: Events) -> Result<()> { self.multiplexer() - .try_broadcast(event) + .try_broadcast(Box::new(event)) .map_err(|_| Error::Custom("multiplexer channel error during update_balance".to_string()))?; Ok(()) } - async fn handle_event(&self, event: Events) -> Result<()> { - match &event { + async fn handle_event(&self, event: Box) -> Result<()> { + match *event { Events::UtxoProcStart { .. } => {} Events::UtxoProcStop { .. } => {} _ => {} diff --git a/wallet/core/src/runtime/wallet.rs b/wallet/core/src/runtime/wallet.rs index 7c7cd3f17..b0197c77e 100644 --- a/wallet/core/src/runtime/wallet.rs +++ b/wallet/core/src/runtime/wallet.rs @@ -102,7 +102,7 @@ pub struct Inner { settings: SettingsStore, utxo_processor: Arc, rpc: Arc, - multiplexer: Multiplexer, + multiplexer: Multiplexer>, } /// `Wallet` data structure @@ -131,7 +131,7 @@ impl Wallet { Arc::new(KaspaRpcClient::new_with_args(WrpcEncoding::Borsh, NotificationMode::MultiListeners, "wrpc://127.0.0.1:17110")?) }; - let multiplexer = Multiplexer::new(); + let multiplexer = Multiplexer::>::new(); let utxo_processor = Arc::new(UtxoProcessor::new(&rpc, network_id, Some(multiplexer.clone()))); let wallet = Wallet { @@ -293,7 +293,7 @@ impl Wallet { &self.inner.rpc } - pub fn multiplexer(&self) -> &Multiplexer { + pub fn multiplexer(&self) -> &Multiplexer> { &self.inner.multiplexer } @@ -527,7 +527,7 @@ impl Wallet { pub async fn notify(&self, event: Events) -> Result<()> { self.multiplexer() - .try_broadcast(event) + .try_broadcast(Box::new(event)) .map_err(|_| Error::Custom("multiplexer channel error during update_balance".to_string()))?; Ok(()) } @@ -540,8 +540,8 @@ impl Wallet { self.utxo_processor().is_connected() } - async fn handle_event(self: &Arc, event: Events) -> Result<()> { - match &event { + async fn handle_event(self: &Arc, event: Box) -> Result<()> { + match &*event { Events::Pending { record, is_outgoing } | Events::Maturity { record, is_outgoing } => { // if `is_outgoint` is set, this means that this pending and maturity // event is for the change UTXOs of the outgoing transaction. diff --git a/wallet/core/src/tx/generator/generator.rs b/wallet/core/src/tx/generator/generator.rs index 62967dd03..d7921b90e 100644 --- a/wallet/core/src/tx/generator/generator.rs +++ b/wallet/core/src/tx/generator/generator.rs @@ -210,7 +210,7 @@ struct Inner { // Utxo Context utxo_context: Option, // Event multiplexer - multiplexer: Option>, + multiplexer: Option>>, // typically a number of keys required to sign the transaction sig_op_count: u8, // number of minimum signatures required to sign the transaction @@ -359,7 +359,7 @@ impl Generator { } /// Core [`Multiplexer`] (if available) - pub fn multiplexer(&self) -> &Option> { + pub fn multiplexer(&self) -> &Option>> { &self.inner.multiplexer } diff --git a/wallet/core/src/tx/generator/settings.rs b/wallet/core/src/tx/generator/settings.rs index ae054ff70..206173416 100644 --- a/wallet/core/src/tx/generator/settings.rs +++ b/wallet/core/src/tx/generator/settings.rs @@ -12,7 +12,7 @@ pub struct GeneratorSettings { // Network type pub network_type: NetworkType, // Event multiplexer - pub multiplexer: Option>, + pub multiplexer: Option>>, // Utxo iterator pub utxo_iterator: Box + Send + Sync + 'static>, // Utxo Context @@ -71,7 +71,7 @@ impl GeneratorSettings { final_transaction_destination: PaymentDestination, final_priority_fee: Fees, final_transaction_payload: Option>, - multiplexer: Option>, + multiplexer: Option>>, ) -> Result { let network_type = utxo_context.processor().network_id()?.into(); let utxo_iterator = UtxoIterator::new(&utxo_context); @@ -101,7 +101,7 @@ impl GeneratorSettings { final_transaction_destination: PaymentDestination, final_priority_fee: Fees, final_transaction_payload: Option>, - multiplexer: Option>, + multiplexer: Option>>, ) -> Result { let network_type = NetworkType::try_from(change_address.prefix)?; diff --git a/wallet/core/src/utxo/processor.rs b/wallet/core/src/utxo/processor.rs index 9148580e2..af4edd4f0 100644 --- a/wallet/core/src/utxo/processor.rs +++ b/wallet/core/src/utxo/processor.rs @@ -37,11 +37,11 @@ pub struct Inner { task_ctl: DuplexChannel, notification_channel: Channel, sync_proc: SyncMonitor, - multiplexer: Multiplexer, + multiplexer: Multiplexer>, } impl Inner { - pub fn new(rpc: &Arc, network_id: Option, multiplexer: Multiplexer) -> Self { + pub fn new(rpc: &Arc, network_id: Option, multiplexer: Multiplexer>) -> Self { Self { pending: DashMap::new(), address_to_utxo_context_map: DashMap::new(), @@ -65,7 +65,7 @@ pub struct UtxoProcessor { } impl UtxoProcessor { - pub fn new(rpc: &Arc, network_id: Option, multiplexer: Option>) -> Self { + pub fn new(rpc: &Arc, network_id: Option, multiplexer: Option>>) -> Self { let multiplexer = multiplexer.unwrap_or_else(Multiplexer::new); UtxoProcessor { inner: Arc::new(Inner::new(rpc, network_id, multiplexer)) } } @@ -78,7 +78,7 @@ impl UtxoProcessor { self.rpc().clone().downcast_arc::().expect("unable to downcast DynRpcApi to KaspaRpcClient") } - pub fn multiplexer(&self) -> &Multiplexer { + pub fn multiplexer(&self) -> &Multiplexer> { &self.inner.multiplexer } @@ -154,13 +154,15 @@ impl UtxoProcessor { } pub async fn notify(&self, event: Events) -> Result<()> { - self.multiplexer().try_broadcast(event).map_err(|_| Error::Custom("multiplexer channel error during notify".to_string()))?; + self.multiplexer() + .try_broadcast(Box::new(event)) + .map_err(|_| Error::Custom("multiplexer channel error during notify".to_string()))?; Ok(()) } pub fn try_notify(&self, event: Events) -> Result<()> { self.multiplexer() - .try_broadcast(event) + .try_broadcast(Box::new(event)) .map_err(|_| Error::Custom("multiplexer channel error during try_notify".to_string()))?; Ok(()) } @@ -401,17 +403,17 @@ impl UtxoProcessor { Ok(msg) => { match msg { Ctl::Open => { - this.inner.multiplexer.try_broadcast(Events::Connect { + this.inner.multiplexer.try_broadcast(Box::new(Events::Connect { network_id : this.network_id().expect("network id expected during connection"), url : this.rpc_client().url().to_string() - }).unwrap_or_else(|err| log_error!("{err}")); + })).unwrap_or_else(|err| log_error!("{err}")); this.handle_connect().await.unwrap_or_else(|err| log_error!("{err}")); }, Ctl::Close => { - this.inner.multiplexer.try_broadcast(Events::Disconnect { + this.inner.multiplexer.try_broadcast(Box::new(Events::Disconnect { network_id : this.network_id().expect("network id expected during connection"), url : this.rpc_client().url().to_string() - }).unwrap_or_else(|err| log_error!("{err}")); + })).unwrap_or_else(|err| log_error!("{err}")); this.handle_disconnect().await.unwrap_or_else(|err| log_error!("{err}")); } } diff --git a/wallet/core/src/wasm/tx/generator/generator.rs b/wallet/core/src/wasm/tx/generator/generator.rs index 3cf7bded0..e18c2d3d5 100644 --- a/wallet/core/src/wasm/tx/generator/generator.rs +++ b/wallet/core/src/wasm/tx/generator/generator.rs @@ -156,7 +156,7 @@ enum GeneratorSource { /// Converts [`GeneratorSettingsObject`] to a series of properties intended for use by the [`Generator`]. struct GeneratorSettings { pub source: GeneratorSource, - pub multiplexer: Option>, + pub multiplexer: Option>>, pub final_transaction_destination: PaymentDestination, pub change_address: Option
, pub final_priority_fee: Fees,