From d72d097dde2c61feab2015228467a110d0486219 Mon Sep 17 00:00:00 2001 From: Dennis Kievits Date: Mon, 18 Sep 2023 00:50:10 +0200 Subject: [PATCH] progress indexed db --- .../src/storage/local/transaction/indexdb.rs | 201 +++++++++++++++++- 1 file changed, 192 insertions(+), 9 deletions(-) diff --git a/wallet/core/src/storage/local/transaction/indexdb.rs b/wallet/core/src/storage/local/transaction/indexdb.rs index 82c015cf7..7f641474a 100644 --- a/wallet/core/src/storage/local/transaction/indexdb.rs +++ b/wallet/core/src/storage/local/transaction/indexdb.rs @@ -4,11 +4,14 @@ use crate::storage::interface::StorageStream; use crate::storage::{Binding, TransactionRecordStore}; use crate::storage::{TransactionMetadata, TransactionRecord}; use idb::TransactionMode; +use kaspa_wallet_core::storage::transaction::TransactionData; const IDB_TRANSACTIONS_STORE: &str = "transactions"; const IDB_ID_INDEX: &str = "id"; const IDB_VERSION_INDEX: &str = "version"; const IDB_TIMESTAMP_INDEX: &str = "timestamp"; +const IDB_BINDING_INDEX: &str = "binding"; +const IDB_NETWORK_ID_INDEX: &str = "networkId"; const IDB_BLOCK_DAA_SCORE_INDEX: &str = "blockDaaScore"; const IDB_TRANSACTION_DATA_INDEX: &str = "data"; const IDB_TRANSACTION_METADATA_INDEX: &str = "metadata"; @@ -111,6 +114,8 @@ impl TransactionStore { store.create_index(IDB_VERSION_INDEX, idb::KeyPath::new_single(IDB_VERSION_INDEX), None).unwrap(); store.create_index(IDB_TIMESTAMP_INDEX, idb::KeyPath::new_single(IDB_TIMESTAMP_INDEX), None).unwrap(); + store.create_index(IDB_BINDING_INDEX, idb::KeyPath::new_single(IDB_BINDING_INDEX), None).unwrap(); + store.create_index(IDB_NETWORK_ID_INDEX, idb::KeyPath::new_single(IDB_NETWORK_ID_INDEX), None).unwrap(); store.create_index(IDB_BLOCK_DAA_SCORE_INDEX, idb::KeyPath::new_single(IDB_BLOCK_DAA_SCORE_INDEX), None).unwrap(); store.create_index(IDB_TRANSACTION_DATA_INDEX, idb::KeyPath::new_single(IDB_TRANSACTION_DATA_INDEX), None).unwrap(); store @@ -150,7 +155,7 @@ impl TransactionRecordStore for TransactionStore { let db_mutex = self.init_or_get_db(binding, network_id).await?; let hex_id = id.to_hex(); - let (tx, rx) = oneshot::>>(); + let (tx, rx) = oneshot(); dispatch(async move { let db = match db_mutex.lock() { Ok(db) => db, @@ -191,29 +196,207 @@ impl TransactionRecordStore for TransactionStore { } }; - println!("value: {:?}", value); + let value = match value.dyn_into::() { + Ok(value) => value, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error converting value to object: {}", err.as_string().unwrap_or_default())))) + .await + .unwrap(); + return; + } + }; + let version = match value.get_u16(IDB_VERSION_INDEX) { + Ok(version) => version, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting version: {}", err)))).await.unwrap(); + return; + } + }; + let id = match value.get_string(IDB_ID_INDEX) { + Ok(id) => id, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting id: {}", err)))).await.unwrap(); + return; + } + }; + let unixtime = match value.get_u64(IDB_TIMESTAMP_INDEX) { + Ok(unixtime) => (unixtime != 0).then(|| unixtime), + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting unixtime: {}", err)))).await.unwrap(); + return; + } + }; + let binding_str = match value.get_string(IDB_BINDING_INDEX) { + Ok(binding) => binding, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting binding: {}", err)))).await.unwrap(); + return; + } + }; + let binding: Binding = match serde_json::from_str(&binding_str) { + Ok(binding) => binding, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error deserializing binding: {}", err)))).await.unwrap(); + return; + } + }; + let block_daa_score = match value.get_u64(IDB_BLOCK_DAA_SCORE_INDEX) { + Ok(block_daa_score) => block_daa_score, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting block daa score: {}", err)))).await.unwrap(); + return; + } + }; + let network_id_str = match value.get_string(IDB_NETWORK_ID_INDEX) { + Ok(network_id) => network_id, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting network id: {}", err)))).await.unwrap(); + return; + } + }; + let network_id: NetworkId = match serde_json::from_str(&network_id_str) { + Ok(network_id) => network_id, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error deserializing network id: {}", err)))).await.unwrap(); + return; + } + }; + let data_str = match value.get_string(IDB_TRANSACTION_DATA_INDEX) { + Ok(data) => data, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error getting data: {}", err)))).await.unwrap(); + return; + } + }; + let data: TransactionData = match serde_json::from_str(&data_str) { + Ok(data) => data, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error deserializing data: {}", err)))).await.unwrap(); + return; + } + }; + + // let transaction_record = Arc::new(TransactionRecord { + // version, + // id: TransactionId::from_hex(&id).map_err(|err| Error::Custom(format!("Error parsing id: {}", err))).unwrap(), + // unixtime, + // binding, + // block_daa_score, + // network_id, + // transaction_data: data, + // metadata: None, + // }); tx.send(Err(Error::NotImplemented)).await.unwrap(); }); - rx.recv().await.map_err(|err| Error::Custom(format!("Error opening database recv error oneshot channel: {}", err)))? + rx.recv().await.map_err(|err| Error::Custom(format!("Recv error oneshot channel: {}", err)))? } async fn load_multiple( &self, - _binding: &Binding, - _network_id: &NetworkId, - _ids: &[TransactionId], + binding: &Binding, + network_id: &NetworkId, + ids: &[TransactionId], ) -> Result>> { - Ok(vec![]) + let db_mutex = self.init_or_get_db(binding, network_id).await?; + let hex_ids = ids.iter().map(|id| id.to_hex()).collect::>(); + + let (tx, rx) = oneshot(); + dispatch(async move { + let db = match db_mutex.lock() { + Ok(db) => db, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error locking database mutex: {}", err)))).await.unwrap(); + return; + } + }; + let transaction = match db.transaction(&[IDB_TRANSACTIONS_STORE], TransactionMode::ReadOnly) { + Ok(transaction) => transaction, + Err(err) => { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + } + }; + let object_store = match transaction.object_store(IDB_TRANSACTIONS_STORE) { + Ok(object_store) => object_store, + Err(err) => { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + } + }; + + for hex_id in hex_ids { + let id_js_value = JsValue::from_str(&hex_id); + let value_opt = match object_store.get(id_js_value).await { + Ok(value_opt) => value_opt, + Err(err) => { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + } + }; + + let value = match value_opt { + Some(value) => value, + None => { + tx.send(Err(Error::Custom(format!("Transaction not found: {}", hex_id)))).await.unwrap(); + return; + } + }; + + println!("value: {:?}", value); + } + + tx.send(Err(Error::NotImplemented)).await.unwrap(); + }); + + rx.recv().await.map_err(|err| Error::Custom(format!("Recv error oneshot channel: {}", err)))? } async fn store(&self, _transaction_records: &[&TransactionRecord]) -> Result<()> { Ok(()) } - async fn remove(&self, _binding: &Binding, _network_id: &NetworkId, _ids: &[&TransactionId]) -> Result<()> { - Ok(()) + async fn remove(&self, binding: &Binding, network_id: &NetworkId, ids: &[&TransactionId]) -> Result<()> { + let db_mutex = self.init_or_get_db(binding, network_id).await?; + let hex_ids = ids.iter().map(|id| id.to_hex()).collect::>(); + let (tx, rx) = oneshot(); + + dispatch(async move { + let db = match db_mutex.lock() { + Ok(db) => db, + Err(err) => { + tx.send(Err(Error::Custom(format!("Error locking database mutex: {}", err)))).await.unwrap(); + return; + } + }; + let transaction = match db.transaction(&[IDB_TRANSACTIONS_STORE], TransactionMode::ReadOnly) { + Ok(transaction) => transaction, + Err(err) => { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + } + }; + let object_store = match transaction.object_store(IDB_TRANSACTIONS_STORE) { + Ok(object_store) => object_store, + Err(err) => { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + } + }; + + for hex_id in hex_ids { + let id_js_value = JsValue::from_str(&hex_id); + if let Err(err) = object_store.delete(id_js_value).await { + tx.send(Err(Error::IdbError(err.to_string()))).await.unwrap(); + return; + }; + } + + tx.send(Err(Error::NotImplemented)).await.unwrap(); + }); + + rx.recv().await.map_err(|err| Error::Custom(format!("Recv error oneshot channel: {}", err)))? } async fn store_transaction_metadata(&self, _id: TransactionId, _metadata: TransactionMetadata) -> Result<()> {