Skip to content

Commit

Permalink
Update rocksdb to use timestamp_rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles-Schleich committed Jul 2, 2024
1 parent 4667d27 commit 975db55
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 46 deletions.
52 changes: 26 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ zenoh_backend_traits = { git = "https://github.com/eclipse-zenoh/zenoh", branch
zenoh-plugin-trait = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "dev/1.0.0" }
zenoh-codec = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "dev/1.0.0" }


[build-dependencies]
rustc_version = "0.4.0"

Expand Down
46 changes: 26 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use tracing::{debug, error, trace, warn};
use uhlc::NTP64;
use zenoh::bytes::ZBytes;
use zenoh::encoding::Encoding;
use zenoh::internal::runtime::Runtime;
use zenoh::internal::{bail, zenoh_home, zerror, Value};
use zenoh::key_expr::OwnedKeyExpr;
use zenoh::selector::Parameters;
use zenoh::time::{new_timestamp, Timestamp};
use zenoh::time::Timestamp;
use zenoh::{try_init_log_from_env, Error, Result as ZResult};
use zenoh_backend_traits::config::{StorageConfig, VolumeConfig};
use zenoh_backend_traits::config::StorageConfig;
use zenoh_backend_traits::*;
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};

Expand Down Expand Up @@ -70,14 +71,14 @@ pub struct RocksDbBackend {}
zenoh_plugin_trait::declare_plugin!(RocksDbBackend);

impl Plugin for RocksDbBackend {
type StartArgs = VolumeConfig;
type StartArgs = Runtime;
type Instance = VolumeInstance;

const DEFAULT_NAME: &'static str = "rocks_backend";
const PLUGIN_VERSION: &'static str = plugin_version!();
const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();

fn start(_name: &str, _config: &Self::StartArgs) -> ZResult<Self::Instance> {
fn start(_name: &str, runtime: &Self::StartArgs) -> ZResult<Self::Instance> {
try_init_log_from_env();
debug!("RocksDB backend {}", Self::PLUGIN_LONG_VERSION);

Expand All @@ -96,13 +97,18 @@ impl Plugin for RocksDbBackend {
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect();
Ok(Box::new(RocksdbVolume { admin_status, root }))
Ok(Box::new(RocksdbVolume {
admin_status,
root,
runtime: runtime.clone(),
}))
}
}

pub struct RocksdbVolume {
admin_status: serde_json::Value,
root: PathBuf,
runtime: Runtime,
}

#[async_trait]
Expand Down Expand Up @@ -195,6 +201,7 @@ impl Volume for RocksdbVolume {
on_closure,
read_only,
db,
runtime: self.runtime.clone(),
}))
}
}
Expand All @@ -205,6 +212,7 @@ struct RocksdbStorage {
read_only: bool,
// Note: rocksdb isn't thread-safe. See https://github.com/rust-rocksdb/rust-rocksdb/issues/404
db: Arc<Mutex<Option<DB>>>,
runtime: Runtime,
}

#[async_trait]
Expand Down Expand Up @@ -277,7 +285,7 @@ impl Storage for RocksdbStorage {

// Get the matching key/value
debug!("getting key `{:?}` with parameters `{}`", key, _parameters);
match get_kv(db, key.clone()) {
match get_kv(db, key.clone(), self.runtime.clone()) {
Ok(Some((value, timestamp))) => Ok(vec![StoredData { value, timestamp }]),
Ok(None) => Ok(vec![]),
Err(e) => Err(format!("Error when getting key {:?} : {}", key, e).into()),
Expand Down Expand Up @@ -434,7 +442,11 @@ fn delete_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<StorageInsertionResu
}
}

fn get_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<Option<(Value, Timestamp)>> {
fn get_kv(
db: &DB,
key: Option<OwnedKeyExpr>,
runtime: Runtime,
) -> ZResult<Option<(Value, Timestamp)>> {
trace!("Get key {:?} from {:?}", key, db);
// TODO: use MultiGet when available (see https://github.com/rust-rocksdb/rust-rocksdb/issues/485)
let key = match key {
Expand Down Expand Up @@ -467,20 +479,14 @@ fn get_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<Option<(Value, Timestam
trace!("second ok");
// Only the payload is present in DB!
// Possibly legacy data. Consider as encoding as APP_OCTET_STREAM and create timestamp from now()

match std::num::NonZeroU128::new(1u128)
.map(new_timestamp)
.map(|timestamp| {
(
Value::new(payload, Encoding::APPLICATION_OCTET_STREAM),
timestamp,
)
})
.map(|x| Ok(Some(x)))
{
Some(x) => x,
match runtime.new_timestamp() {
Some(timestamp) => Ok(Some((
Value::new(payload, Encoding::APPLICATION_OCTET_STREAM),
timestamp,
))),
None => {
bail!("Error Creating Timestamp ID for 0x01");
warn!("DB only contains Payload data and Cannot get a timestamp from Runtime, Returning Ok(None)");
Ok(None)
}
}
}
Expand Down

0 comments on commit 975db55

Please sign in to comment.