Skip to content

Commit

Permalink
Box wallet events
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Sep 8, 2023
1 parent 51c1488 commit b3366a7
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl KaspaCli {
msg = multiplexer.receiver.recv().fuse() => {

if let Ok(msg) = msg {
match msg {
match *msg {
Events::UtxoProcStart => {},
Events::UtxoProcStop => {},
Events::UtxoProcError { message } => {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/modules/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Monitor {
Ok(())
}

async fn redraw(self: &Arc<Self>, ctx: &Arc<KaspaCli>, events: &Arc<Mutex<VecDeque<Events>>>) -> Result<()> {
async fn redraw(self: &Arc<Self>, ctx: &Arc<KaspaCli>, events: &Arc<Mutex<VecDeque<Box<Events>>>>) -> Result<()> {
tprint!(ctx, "{}", ClearScreen);
tprint!(ctx, "{}", Goto(1, 1));

Expand All @@ -108,7 +108,7 @@ impl Monitor {
ctx.list().await?;

let events = events.lock().unwrap();
events.iter().for_each(|event| match event {
events.iter().for_each(|event| match event.deref() {
Events::DAAScoreChange { .. } => {}
Events::Balance { balance, id, mature_utxo_size, pending_utxo_size } => {
let network_id = wallet.network_id().expect("missing network type");
Expand Down
12 changes: 6 additions & 6 deletions wallet/core/src/runtime/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use regex::Regex;
struct Inner {
task_ctl: DuplexChannel,
rpc: Arc<DynRpcApi>,
multiplexer: Multiplexer<Events>,
multiplexer: Multiplexer<Box<Events>>,
running: AtomicBool,
is_synced: AtomicBool,
state_observer: StateObserver,
Expand All @@ -18,7 +18,7 @@ pub struct SyncMonitor {
}

impl SyncMonitor {
pub fn new(rpc: &Arc<DynRpcApi>, multiplexer: &Multiplexer<Events>) -> Self {
pub fn new(rpc: &Arc<DynRpcApi>, multiplexer: &Multiplexer<Box<Events>>) -> Self {
Self {
inner: Arc::new(Inner {
rpc: rpc.clone(),
Expand Down Expand Up @@ -74,19 +74,19 @@ impl SyncMonitor {
&self.inner.rpc
}

pub fn multiplexer(&self) -> &Multiplexer<Events> {
pub fn multiplexer(&self) -> &Multiplexer<Box<Events>> {
&self.inner.multiplexer
}

pub async fn notify(&self, event: Events) -> Result<()> {
self.multiplexer()
.try_broadcast(event)
.try_broadcast(Box::new(event))
.map_err(|_| Error::Custom("multiplexer channel error during update_balance".to_string()))?;
Ok(())
}

async fn handle_event(&self, event: Events) -> Result<()> {
match &event {
async fn handle_event(&self, event: Box<Events>) -> Result<()> {
match *event {
Events::UtxoProcStart { .. } => {}
Events::UtxoProcStop { .. } => {}
_ => {}
Expand Down
12 changes: 6 additions & 6 deletions wallet/core/src/runtime/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct Inner {
settings: SettingsStore<WalletSettings>,
utxo_processor: Arc<UtxoProcessor>,
rpc: Arc<DynRpcApi>,
multiplexer: Multiplexer<Events>,
multiplexer: Multiplexer<Box<Events>>,
}

/// `Wallet` data structure
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Wallet {
Arc::new(KaspaRpcClient::new_with_args(WrpcEncoding::Borsh, NotificationMode::MultiListeners, "wrpc://127.0.0.1:17110")?)
};

let multiplexer = Multiplexer::new();
let multiplexer = Multiplexer::<Box<Events>>::new();
let utxo_processor = Arc::new(UtxoProcessor::new(&rpc, network_id, Some(multiplexer.clone())));

let wallet = Wallet {
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Wallet {
&self.inner.rpc
}

pub fn multiplexer(&self) -> &Multiplexer<Events> {
pub fn multiplexer(&self) -> &Multiplexer<Box<Events>> {
&self.inner.multiplexer
}

Expand Down Expand Up @@ -527,7 +527,7 @@ impl Wallet {

pub async fn notify(&self, event: Events) -> Result<()> {
self.multiplexer()
.try_broadcast(event)
.try_broadcast(Box::new(event))
.map_err(|_| Error::Custom("multiplexer channel error during update_balance".to_string()))?;
Ok(())
}
Expand All @@ -540,8 +540,8 @@ impl Wallet {
self.utxo_processor().is_connected()
}

async fn handle_event(self: &Arc<Self>, event: Events) -> Result<()> {
match &event {
async fn handle_event(self: &Arc<Self>, event: Box<Events>) -> Result<()> {
match &*event {
Events::Pending { record, is_outgoing } | Events::Maturity { record, is_outgoing } => {
// if `is_outgoint` is set, this means that this pending and maturity
// event is for the change UTXOs of the outgoing transaction.
Expand Down
4 changes: 2 additions & 2 deletions wallet/core/src/tx/generator/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct Inner {
// Utxo Context
utxo_context: Option<UtxoContext>,
// Event multiplexer
multiplexer: Option<Multiplexer<Events>>,
multiplexer: Option<Multiplexer<Box<Events>>>,
// typically a number of keys required to sign the transaction
sig_op_count: u8,
// number of minimum signatures required to sign the transaction
Expand Down Expand Up @@ -359,7 +359,7 @@ impl Generator {
}

/// Core [`Multiplexer<Events>`] (if available)
pub fn multiplexer(&self) -> &Option<Multiplexer<Events>> {
pub fn multiplexer(&self) -> &Option<Multiplexer<Box<Events>>> {
&self.inner.multiplexer
}

Expand Down
6 changes: 3 additions & 3 deletions wallet/core/src/tx/generator/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct GeneratorSettings {
// Network type
pub network_type: NetworkType,
// Event multiplexer
pub multiplexer: Option<Multiplexer<Events>>,
pub multiplexer: Option<Multiplexer<Box<Events>>>,
// Utxo iterator
pub utxo_iterator: Box<dyn Iterator<Item = UtxoEntryReference> + Send + Sync + 'static>,
// Utxo Context
Expand Down Expand Up @@ -71,7 +71,7 @@ impl GeneratorSettings {
final_transaction_destination: PaymentDestination,
final_priority_fee: Fees,
final_transaction_payload: Option<Vec<u8>>,
multiplexer: Option<Multiplexer<Events>>,
multiplexer: Option<Multiplexer<Box<Events>>>,
) -> Result<Self> {
let network_type = utxo_context.processor().network_id()?.into();
let utxo_iterator = UtxoIterator::new(&utxo_context);
Expand Down Expand Up @@ -101,7 +101,7 @@ impl GeneratorSettings {
final_transaction_destination: PaymentDestination,
final_priority_fee: Fees,
final_transaction_payload: Option<Vec<u8>>,
multiplexer: Option<Multiplexer<Events>>,
multiplexer: Option<Multiplexer<Box<Events>>>,
) -> Result<Self> {
let network_type = NetworkType::try_from(change_address.prefix)?;

Expand Down
22 changes: 12 additions & 10 deletions wallet/core/src/utxo/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub struct Inner {
task_ctl: DuplexChannel,
notification_channel: Channel<Notification>,
sync_proc: SyncMonitor,
multiplexer: Multiplexer<Events>,
multiplexer: Multiplexer<Box<Events>>,
}

impl Inner {
pub fn new(rpc: &Arc<DynRpcApi>, network_id: Option<NetworkId>, multiplexer: Multiplexer<Events>) -> Self {
pub fn new(rpc: &Arc<DynRpcApi>, network_id: Option<NetworkId>, multiplexer: Multiplexer<Box<Events>>) -> Self {
Self {
pending: DashMap::new(),
address_to_utxo_context_map: DashMap::new(),
Expand All @@ -65,7 +65,7 @@ pub struct UtxoProcessor {
}

impl UtxoProcessor {
pub fn new(rpc: &Arc<DynRpcApi>, network_id: Option<NetworkId>, multiplexer: Option<Multiplexer<Events>>) -> Self {
pub fn new(rpc: &Arc<DynRpcApi>, network_id: Option<NetworkId>, multiplexer: Option<Multiplexer<Box<Events>>>) -> Self {
let multiplexer = multiplexer.unwrap_or_else(Multiplexer::new);
UtxoProcessor { inner: Arc::new(Inner::new(rpc, network_id, multiplexer)) }
}
Expand All @@ -78,7 +78,7 @@ impl UtxoProcessor {
self.rpc().clone().downcast_arc::<KaspaRpcClient>().expect("unable to downcast DynRpcApi to KaspaRpcClient")
}

pub fn multiplexer(&self) -> &Multiplexer<Events> {
pub fn multiplexer(&self) -> &Multiplexer<Box<Events>> {
&self.inner.multiplexer
}

Expand Down Expand Up @@ -154,13 +154,15 @@ impl UtxoProcessor {
}

pub async fn notify(&self, event: Events) -> Result<()> {
self.multiplexer().try_broadcast(event).map_err(|_| Error::Custom("multiplexer channel error during notify".to_string()))?;
self.multiplexer()
.try_broadcast(Box::new(event))
.map_err(|_| Error::Custom("multiplexer channel error during notify".to_string()))?;
Ok(())
}

pub fn try_notify(&self, event: Events) -> Result<()> {
self.multiplexer()
.try_broadcast(event)
.try_broadcast(Box::new(event))
.map_err(|_| Error::Custom("multiplexer channel error during try_notify".to_string()))?;
Ok(())
}
Expand Down Expand Up @@ -401,17 +403,17 @@ impl UtxoProcessor {
Ok(msg) => {
match msg {
Ctl::Open => {
this.inner.multiplexer.try_broadcast(Events::Connect {
this.inner.multiplexer.try_broadcast(Box::new(Events::Connect {
network_id : this.network_id().expect("network id expected during connection"),
url : this.rpc_client().url().to_string()
}).unwrap_or_else(|err| log_error!("{err}"));
})).unwrap_or_else(|err| log_error!("{err}"));
this.handle_connect().await.unwrap_or_else(|err| log_error!("{err}"));
},
Ctl::Close => {
this.inner.multiplexer.try_broadcast(Events::Disconnect {
this.inner.multiplexer.try_broadcast(Box::new(Events::Disconnect {
network_id : this.network_id().expect("network id expected during connection"),
url : this.rpc_client().url().to_string()
}).unwrap_or_else(|err| log_error!("{err}"));
})).unwrap_or_else(|err| log_error!("{err}"));
this.handle_disconnect().await.unwrap_or_else(|err| log_error!("{err}"));
}
}
Expand Down
2 changes: 1 addition & 1 deletion wallet/core/src/wasm/tx/generator/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ enum GeneratorSource {
/// Converts [`GeneratorSettingsObject`] to a series of properties intended for use by the [`Generator`].
struct GeneratorSettings {
pub source: GeneratorSource,
pub multiplexer: Option<Multiplexer<Events>>,
pub multiplexer: Option<Multiplexer<Box<Events>>>,
pub final_transaction_destination: PaymentDestination,
pub change_address: Option<Address>,
pub final_priority_fee: Fees,
Expand Down

0 comments on commit b3366a7

Please sign in to comment.