Skip to content

Commit

Permalink
Remove case where no metadata stored inside database regarding timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles-Schleich committed Jul 3, 2024
1 parent 975db55 commit 06c44eb
Showing 1 changed file with 8 additions and 27 deletions.
35 changes: 8 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use config::VolumeConfig;
use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
use std::borrow::Cow;
use std::collections::HashMap;
Expand Down Expand Up @@ -71,14 +72,14 @@ pub struct RocksDbBackend {}
zenoh_plugin_trait::declare_plugin!(RocksDbBackend);

impl Plugin for RocksDbBackend {
type StartArgs = Runtime;
type StartArgs = VolumeConfig;
type Instance = VolumeInstance;

const DEFAULT_NAME: &'static str = "rocks_backend";
const PLUGIN_VERSION: &'static str = plugin_version!();
const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();

fn start(_name: &str, runtime: &Self::StartArgs) -> ZResult<Self::Instance> {
fn start(_name: &str, _config: &Self::StartArgs) -> ZResult<Self::Instance> {
try_init_log_from_env();
debug!("RocksDB backend {}", Self::PLUGIN_LONG_VERSION);

Expand All @@ -97,18 +98,13 @@ impl Plugin for RocksDbBackend {
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect();
Ok(Box::new(RocksdbVolume {
admin_status,
root,
runtime: runtime.clone(),
}))
Ok(Box::new(RocksdbVolume { admin_status, root }))
}
}

pub struct RocksdbVolume {
admin_status: serde_json::Value,
root: PathBuf,
runtime: Runtime,
}

#[async_trait]
Expand Down Expand Up @@ -201,7 +197,6 @@ impl Volume for RocksdbVolume {
on_closure,
read_only,
db,
runtime: self.runtime.clone(),
}))
}
}
Expand All @@ -212,7 +207,6 @@ struct RocksdbStorage {
read_only: bool,
// Note: rocksdb isn't thread-safe. See https://github.com/rust-rocksdb/rust-rocksdb/issues/404
db: Arc<Mutex<Option<DB>>>,
runtime: Runtime,
}

#[async_trait]
Expand Down Expand Up @@ -285,7 +279,7 @@ impl Storage for RocksdbStorage {

// Get the matching key/value
debug!("getting key `{:?}` with parameters `{}`", key, _parameters);
match get_kv(db, key.clone(), self.runtime.clone()) {
match get_kv(db, key.clone()) {
Ok(Some((value, timestamp))) => Ok(vec![StoredData { value, timestamp }]),
Ok(None) => Ok(vec![]),
Err(e) => Err(format!("Error when getting key {:?} : {}", key, e).into()),
Expand Down Expand Up @@ -442,11 +436,7 @@ fn delete_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<StorageInsertionResu
}
}

fn get_kv(
db: &DB,
key: Option<OwnedKeyExpr>,
runtime: Runtime,
) -> ZResult<Option<(Value, Timestamp)>> {
fn get_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<Option<(Value, Timestamp)>> {
trace!("Get key {:?} from {:?}", key, db);
// TODO: use MultiGet when available (see https://github.com/rust-rocksdb/rust-rocksdb/issues/485)
let key = match key {
Expand Down Expand Up @@ -475,20 +465,11 @@ fn get_kv(
Ok(Some((Value::new(payload, encoding), timestamp)))
}
}
(Ok(Some(payload)), Ok(None)) => {
(Ok(_), Ok(None)) => {
trace!("second ok");
// Only the payload is present in DB!
// Possibly legacy data. Consider as encoding as APP_OCTET_STREAM and create timestamp from now()
match runtime.new_timestamp() {
Some(timestamp) => Ok(Some((
Value::new(payload, Encoding::APPLICATION_OCTET_STREAM),
timestamp,
))),
None => {
warn!("DB only contains Payload data and Cannot get a timestamp from Runtime, Returning Ok(None)");
Ok(None)
}
}
Ok(None)
}
(Ok(None), _) => Ok(None),
(Err(err), _) | (_, Err(err)) => Err(rocksdb_err_to_zerr(err)),
Expand Down

0 comments on commit 06c44eb

Please sign in to comment.