From 1b8968caf72a191ab97cef07189f573f8b2e2868 Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Mon, 13 May 2024 12:23:10 -0600 Subject: [PATCH] chore: use "real" car files for events in scenarios (#171) * chore: update some dependencies lots of breaking cid, multihash, ipld stuff between c1 versions in http/base. for now working around it * chore: use real event carfiles for tests * chore: use a random model in recon tests don't hard code model to avoid syncing previous data when starting new test. this is a valid test, but different than what we've done so far. --- Cargo.lock | 335 ++++++++++++++---------- Cargo.toml | 6 +- operator/Cargo.toml | 1 + operator/src/network/ipfs_rpc.rs | 8 +- runner/Cargo.toml | 8 +- runner/src/scenario/ceramic/anchor.rs | 60 +---- runner/src/scenario/ceramic/mod.rs | 15 +- runner/src/scenario/ceramic/util.rs | 23 -- runner/src/scenario/ipfs_block_fetch.rs | 23 +- runner/src/scenario/mod.rs | 3 +- runner/src/scenario/recon_sync.rs | 121 +++------ runner/src/scenario/util.rs | 120 +++++++++ 12 files changed, 404 insertions(+), 319 deletions(-) delete mode 100644 runner/src/scenario/ceramic/util.rs create mode 100644 runner/src/scenario/util.rs diff --git a/Cargo.lock b/Cargo.lock index a7edef44..31f4e2a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -664,7 +664,7 @@ dependencies = [ "hex", "http", "iri-string", - "libipld 0.14.0", + "libipld", "serde", "serde_with 2.3.3", "siwe", @@ -721,21 +721,21 @@ dependencies = [ "regex", "serde", "serde_bytes", - "serde_ipld_dagcbor", + "serde_ipld_dagcbor 0.3.0", "serde_json", "ssi", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] name = "ceramic-core" -version = "0.9.0" -source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" +version = "0.17.0" +source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#032981d6699a937a217ab466926668c3eb0d5dbb" dependencies = [ "anyhow", "async-trait", "base64 0.21.7", - "cid 0.10.1", + "cid 0.11.1", "did-method-key", "did-pkh", "hex", @@ -743,14 +743,16 @@ dependencies = [ "libp2p-identity", "minicbor", "multibase 0.9.1", + "multihash-codetable", + "multihash-derive 0.9.0", "once_cell", "regex", "serde", "serde_bytes", - "serde_ipld_dagcbor", + "serde_ipld_dagcbor 0.6.1", "serde_json", "ssi", - "unsigned-varint", + "unsigned-varint 0.8.0", ] [[package]] @@ -759,7 +761,7 @@ version = "0.9.0" source = "git+https://github.com/3box/rust-ceramic?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" dependencies = [ "anyhow", - "ceramic-core 0.9.0 (git+https://github.com/3box/rust-ceramic?branch=main)", + "ceramic-core 0.9.0", "multihash 0.18.1", "once_cell", "rand", @@ -812,7 +814,7 @@ dependencies = [ "multihash 0.16.3", "serde", "serde_bytes", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -826,7 +828,21 @@ dependencies = [ "multihash 0.18.1", "serde", "serde_bytes", - "unsigned-varint", + "unsigned-varint 0.7.2", +] + +[[package]] +name = "cid" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" +dependencies = [ + "core2", + "multibase 0.9.1", + "multihash 0.19.1", + "serde", + "serde_bytes", + "unsigned-varint 0.8.0", ] [[package]] @@ -2415,6 +2431,29 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ipld-core" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b" +dependencies = [ + "cid 0.11.1", + "serde", + "serde_bytes", +] + +[[package]] +name = "ipld-dagpb" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "500af0d645ad3c26b544d2ec77c87fefec9319f5eda567d0993b9db59708994f" +dependencies = [ + "bytes", + "ipld-core", + "quick-protobuf", + "thiserror", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -2445,14 +2484,14 @@ dependencies = [ [[package]] name = "iroh-car" -version = "0.9.0" -source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" +version = "0.17.0" +source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#032981d6699a937a217ab466926668c3eb0d5dbb" dependencies = [ - "cid 0.10.1", + "cid 0.11.1", "futures", "integer-encoding", - "libipld 0.16.0", - "libipld-cbor 0.16.0", + "serde", + "serde_ipld_dagcbor 0.6.1", "thiserror", "tokio", ] @@ -2772,10 +2811,11 @@ dependencies = [ "k8s-openapi", "keramik-common", "kube", + "libp2p-identity", "mockall", "multiaddr", "multibase 0.9.1", - "multihash 0.18.1", + "multihash 0.19.1", "opentelemetry", "rand", "reqwest", @@ -2797,7 +2837,7 @@ dependencies = [ "anyhow", "async-trait", "base64 0.21.7", - "ceramic-core 0.9.0 (git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main)", + "ceramic-core 0.17.0", "ceramic-http-client", "chrono", "clap", @@ -2805,22 +2845,27 @@ dependencies = [ "ed25519-dalek", "goose", "hex", + "ipld-core", + "ipld-dagpb", "iroh-car", "keramik-common", - "libipld 0.16.0", "multibase 0.9.1", - "multihash 0.18.1", + "multihash 0.19.1", + "multihash-codetable", "opentelemetry", "rand", "redis", "reqwest", "schemars", "serde", + "serde_ipld_dagcbor 0.6.1", + "serde_ipld_dagjson", "serde_json", "test-log", "tokio", "tracing", "tracing-subscriber", + "unsigned-varint 0.8.0", "uuid", ] @@ -3041,35 +3086,17 @@ dependencies = [ "async-trait", "cached", "fnv", - "libipld-cbor 0.14.0", - "libipld-cbor-derive 0.14.0", - "libipld-core 0.14.0", - "libipld-json 0.14.0", - "libipld-macro 0.14.0", + "libipld-cbor", + "libipld-cbor-derive", + "libipld-core", + "libipld-json", + "libipld-macro", "log", "multihash 0.16.3", "parking_lot", "thiserror", ] -[[package]] -name = "libipld" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ccd6b8ffb3afee7081fcaec00e1b099fd1c7ccf35ba5729d88538fcc3b4599" -dependencies = [ - "fnv", - "libipld-cbor 0.16.0", - "libipld-cbor-derive 0.16.0", - "libipld-core 0.16.0", - "libipld-json 0.16.0", - "libipld-macro 0.16.0", - "libipld-pb", - "log", - "multihash 0.18.1", - "thiserror", -] - [[package]] name = "libipld-cbor" version = "0.14.0" @@ -3077,18 +3104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8dd1ab68c9d26f20c7d0dfea6eecbae8c00359875210001b33ca27d4a02f3d09" dependencies = [ "byteorder", - "libipld-core 0.14.0", - "thiserror", -] - -[[package]] -name = "libipld-cbor" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77d98c9d1747aa5eef1cf099cd648c3fd2d235249f5fed07522aaebc348e423b" -dependencies = [ - "byteorder", - "libipld-core 0.16.0", + "libipld-core", "thiserror", ] @@ -3105,19 +3121,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "libipld-cbor-derive" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d5ba3a729b72973e456a1812b0afe2e176a376c1836cc1528e9fc98ae8cb838" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 1.0.109", - "synstructure", -] - [[package]] name = "libipld-core" version = "0.14.0" @@ -3133,73 +3136,25 @@ dependencies = [ "thiserror", ] -[[package]] -name = "libipld-core" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acd707e8d8b092e967b2af978ed84709eaded82b75effe6cb6f6cc797ef8158" -dependencies = [ - "anyhow", - "cid 0.10.1", - "core2", - "multibase 0.9.1", - "multihash 0.18.1", - "serde", - "thiserror", -] - [[package]] name = "libipld-json" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18aa481a87f084d98473dd9ece253a9569c762b75f6bbba8217d54e48c9d63b3" dependencies = [ - "libipld-core 0.14.0", + "libipld-core", "multihash 0.16.3", "serde", "serde_json", ] -[[package]] -name = "libipld-json" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25856def940047b07b25c33d4e66d248597049ab0202085215dc4dca0487731c" -dependencies = [ - "libipld-core 0.16.0", - "multihash 0.18.1", - "serde", - "serde_json", -] - [[package]] name = "libipld-macro" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "852c011562ae5059b67c3a917f9f5945af5a68df8e39ede4444fff33274d25e2" dependencies = [ - "libipld-core 0.14.0", -] - -[[package]] -name = "libipld-macro" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71171c54214f866ae6722f3027f81dff0931e600e5a61e6b1b6a49ca0b5ed4ae" -dependencies = [ - "libipld-core 0.16.0", -] - -[[package]] -name = "libipld-pb" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3f2d0f866c4cd5dc9aa8068c429ba478d2882a3a4b70ab56f7e9a0eddf5d16f" -dependencies = [ - "bytes", - "libipld-core 0.16.0", - "quick-protobuf", - "thiserror", + "libipld-core", ] [[package]] @@ -3412,20 +3367,20 @@ checksum = "e7627d8bbeb17edbf1c3f74b21488e4af680040da89713b4217d0010e9cbd97e" [[package]] name = "multiaddr" -version = "0.17.1" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b36f567c7099511fa8612bbbb52dda2419ce0bdbacf31714e3a5ffdb766d3bd" +checksum = "8b852bc02a2da5feed68cd14fa50d0774b92790a5bdbfa932a813926c8472070" dependencies = [ "arrayref", "byteorder", "data-encoding", - "log", + "libp2p-identity", "multibase 0.9.1", - "multihash 0.17.0", + "multihash 0.19.1", "percent-encoding", "serde", "static_assertions", - "unsigned-varint", + "unsigned-varint 0.7.2", "url", ] @@ -3462,23 +3417,12 @@ dependencies = [ "blake3", "core2", "digest 0.10.7", - "multihash-derive", + "multihash-derive 0.8.0", "serde", "serde-big-array", "sha2 0.10.8", "sha3", - "unsigned-varint", -] - -[[package]] -name = "multihash" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "835d6ff01d610179fbce3de1694d007e500bf33a7f29689838941d6bf783ae40" -dependencies = [ - "core2", - "multihash-derive", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -3492,12 +3436,12 @@ dependencies = [ "blake3", "core2", "digest 0.10.7", - "multihash-derive", + "multihash-derive 0.8.0", "serde", "serde-big-array", "sha2 0.10.8", "sha3", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -3507,7 +3451,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" dependencies = [ "core2", - "unsigned-varint", + "serde", + "unsigned-varint 0.7.2", +] + +[[package]] +name = "multihash-codetable" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bbdf904c8be009e82ff968c4dab84388cbafc45dfaff61936eca4bf40f1b5" +dependencies = [ + "blake2b_simd 1.0.2", + "blake2s_simd", + "blake3", + "core2", + "digest 0.10.7", + "multihash-derive 0.9.0", + "ripemd", + "sha1", + "sha2 0.10.8", + "sha3", + "strobe-rs", ] [[package]] @@ -3524,6 +3488,31 @@ dependencies = [ "synstructure", ] +[[package]] +name = "multihash-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "890e72cb7396cb99ed98c1246a97b243cc16394470d94e0bc8b0c2c11d84290e" +dependencies = [ + "core2", + "multihash 0.19.1", + "multihash-derive-impl", +] + +[[package]] +name = "multihash-derive-impl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38685e08adb338659871ecfc6ee47ba9b22dcc8abcf6975d379cc49145c3040" +dependencies = [ + "proc-macro-crate", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", + "synstructure", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -4573,6 +4562,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ripemd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd124222d17ad93a644ed9d011a40f4fb64aa54275c08cc216524a9ea82fb09f" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "ripemd160" version = "0.9.1" @@ -4898,6 +4896,29 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_ipld_dagcbor" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493" +dependencies = [ + "cbor4ii", + "ipld-core", + "scopeguard", + "serde", +] + +[[package]] +name = "serde_ipld_dagjson" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3359b47ba7f4a306ef5984665e10539e212e97217afa489437d533208eecda36" +dependencies = [ + "ipld-core", + "serde", + "serde_json", +] + [[package]] name = "serde_jcs" version = "0.1.0" @@ -5009,6 +5030,17 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -5389,7 +5421,7 @@ dependencies = [ "simple_asn1", "ssi-crypto", "thiserror", - "unsigned-varint", + "unsigned-varint 0.7.2", "zeroize", ] @@ -5501,7 +5533,7 @@ checksum = "0982f62c7860922026a9d9edc6c604de79693ee4c5c6bd65be11e2ff66b1df09" dependencies = [ "base64 0.12.3", "chrono", - "libipld 0.14.0", + "libipld", "serde", "serde_json", "serde_with 1.14.0", @@ -5527,7 +5559,7 @@ dependencies = [ "chrono", "flate2", "iref", - "libipld 0.14.0", + "libipld", "multihash 0.16.3", "reqwest", "serde", @@ -5577,6 +5609,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strobe-rs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabb238a1cccccfa4c4fb703670c0d157e1256c1ba695abf1b93bd2bb14bab2d" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "keccak", + "subtle", + "zeroize", +] + [[package]] name = "strsim" version = "0.9.3" @@ -6329,6 +6374,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index e46558e6..ac01214d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,11 @@ env_logger = "0.10.0" expect-patch = { path = "./expect-patch/" } hex = "0.4.3" keramik-common = { path = "./common/", default-features = false } -multiaddr = "0.17" +multiaddr = "0.18" multibase = "0.9.1" -multihash = "0.18" +multihash = { version = "0.19" } +multihash-codetable = { version = "0.1", features = ["sha2", "sha3"] } +# multihash-derive = { version = "0.9" } opentelemetry = { version = "0.21", features = ["metrics", "trace"] } opentelemetry-otlp = { version = "0.14", features = [ "metrics", diff --git a/operator/Cargo.toml b/operator/Cargo.toml index 19d562f6..90953032 100644 --- a/operator/Cargo.toml +++ b/operator/Cargo.toml @@ -58,6 +58,7 @@ kube = { version = "0.88", features = [ multiaddr = { workspace = true, optional = true } multibase = { workspace = true, optional = true } multihash = { workspace = true, optional = true } +libp2p-identity = "0.2" opentelemetry = { workspace = true, optional = true } rand = { version = "0.8.5" } reqwest = { workspace = true, optional = true } diff --git a/operator/src/network/ipfs_rpc.rs b/operator/src/network/ipfs_rpc.rs index 3c956901..b085c236 100644 --- a/operator/src/network/ipfs_rpc.rs +++ b/operator/src/network/ipfs_rpc.rs @@ -47,10 +47,10 @@ impl IpfsRpcClient for HttpRpcClient { addresses: Vec, } let data: Response = resp.json().await?; - - let p2p_proto = Protocol::P2p(Multihash::from_bytes( - &multibase::Base::Base58Btc.decode(data.id.clone())?, - )?); + let hash = Multihash::from_bytes(&multibase::Base::Base58Btc.decode(data.id.clone())?)?; + let peer_id = libp2p_identity::PeerId::from_multihash(hash) + .map_err(|e| anyhow!("failed to build multiash: {:?}", e))?; + let p2p_proto = Protocol::P2p(peer_id); // We expect to find at least one non loop back address let p2p_addrs = data .addresses diff --git a/runner/Cargo.toml b/runner/Cargo.toml index feb72d61..b31fe9d4 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -17,13 +17,17 @@ did-method-key = "0.2" goose = { version = "0.16", features = ["gaggle"] } hex.workspace = true keramik-common = { workspace = true, features = ["telemetry", "tokio-console"] } -libipld = { version = "0.16.0", features = ["serde-codec"] } +ipld-core = "0.4" +ipld-dagpb = "0.2" multihash.workspace = true +multihash-codetable.workspace = true opentelemetry.workspace = true rand = "0.8.5" redis = { version = "0.24", features = ["tokio-comp"] } reqwest.workspace = true serde = { version = "1.0", features = ["derive"] } +serde_ipld_dagcbor = "0.6" +serde_ipld_dagjson = "0.2" schemars.workspace = true serde_json.workspace = true tokio.workspace = true @@ -35,5 +39,7 @@ uuid = { version = "1.6.1", features = ["v4"] } chrono = "0.4.31" ed25519-dalek = "2.1" +unsigned-varint = "0.8" # temporary until we can use our http client updated for new c1 + [dev-dependencies] test-log = "0.2" diff --git a/runner/src/scenario/ceramic/anchor.rs b/runner/src/scenario/ceramic/anchor.rs index d7f2157e..6e3de0b3 100644 --- a/runner/src/scenario/ceramic/anchor.rs +++ b/runner/src/scenario/ceramic/anchor.rs @@ -1,21 +1,19 @@ use anyhow::Result; -use base64::{engine::general_purpose, Engine}; -use ceramic_http_client::ceramic_event::{ - Cid, DidDocument, JwkSigner, Jws, StreamId, StreamIdType, -}; +use ceramic_core::{Cid, DagCborEncoded}; +use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner, Jws, StreamId}; use chrono::Utc; use goose::prelude::*; +use ipld_core::ipld; use iroh_car::{CarHeader, CarWriter}; -use libipld::{cbor::DagCborCodec, ipld, prelude::Codec, Ipld, IpldCodec}; -use multihash::{Code::Sha2_256, MultihashDigest}; -use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use multihash_codetable::{Code, MultihashDigest}; + use redis::{aio::MultiplexedConnection, AsyncCommands}; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; -use crate::scenario::get_redis_client; +use crate::scenario::{get_redis_client, util::DAG_CBOR_CODEC}; #[derive(Serialize, Deserialize)] struct CasAuthPayload { @@ -59,8 +57,8 @@ pub async fn stream_tip_car( "tip": genesis_cid, }); - let ipld_bytes = DagCborCodec.encode(&root_block)?; - let root_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes)); + let ipld_bytes = DagCborEncoded::new(&root_block)?; + let root_cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(ipld_bytes.as_ref())); let car_header = CarHeader::new_v1(vec![root_cid]); let mut car_writer = CarWriter::new(car_header, Vec::new()); car_writer.write(root_cid, ipld_bytes).await.unwrap(); @@ -80,14 +78,14 @@ pub async fn create_anchor_request_on_cas( .unwrap_or_else(|_| "https://cas-dev.3boxlabs.com".to_string()); let node_controller = std::env::var("node_controller") .unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string()); - let (stream_id, genesis_cid, genesis_block) = create_stream(StreamIdType::Tile).unwrap(); + let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap(); let (root_cid, car_bytes) = stream_tip_car( stream_id.clone(), genesis_cid, - genesis_block.clone(), + genesis_block.as_ref().to_vec(), genesis_cid, - genesis_block, + genesis_block.as_ref().to_vec(), ) .await .unwrap(); @@ -131,37 +129,3 @@ pub async fn cas_benchmark() -> Result { Ok(scenario!("CeramicCasBenchmark").register_transaction(create_anchor_request)) } - -/// Create a new Ceramic stream -pub fn create_stream(stream_type: StreamIdType) -> Result<(StreamId, Cid, Vec)> { - let controller: String = thread_rng() - .sample_iter(&Alphanumeric) - .take(32) - .map(char::from) - .collect(); - - let genesis_commit = ipld!({ - "header": { - "unique": stream_unique_header(), - "controllers": [controller] - } - }); - // Deserialize the genesis commit, encode it as CBOR, and compute the CID. - let ipld_map: BTreeMap = libipld::serde::from_ipld(genesis_commit)?; - let ipld_bytes = DagCborCodec.encode(&ipld_map)?; - let genesis_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes)); - Ok(( - StreamId { - r#type: stream_type, - cid: genesis_cid, - }, - genesis_cid, - ipld_bytes, - )) -} - -fn stream_unique_header() -> String { - let mut data = [0u8; 8]; - thread_rng().fill(&mut data); - general_purpose::STANDARD.encode(data) -} diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 5133cf31..2b93d59b 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -4,12 +4,11 @@ mod models; pub mod new_streams; pub mod query; pub mod simple; -pub mod util; pub mod write_only; -use ceramic_core::ssi::did::{DIDMethod, DocumentBuilder, Source}; +use ceramic_core::ssi::did::{DIDMethod, Document, DocumentBuilder, Source}; use ceramic_core::ssi::jwk::{self, Base64urlUInt, Params, JWK}; -use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner}; +use ceramic_http_client::ceramic_event::JwkSigner; use ceramic_http_client::CeramicHttpClient; use models::RandomModelInstance; @@ -22,12 +21,12 @@ pub type CeramicClient = CeramicHttpClient; #[derive(Clone)] pub struct Credentials { pub signer: JwkSigner, - pub did: DidDocument, + pub did: Document, } impl Credentials { pub async fn from_env() -> Result { - let did = DidDocument::new(&std::env::var("DID_KEY").expect("DID_KEY is required")); + let did = Document::new(&std::env::var("DID_KEY").expect("DID_KEY is required")); let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required"); let signer = JwkSigner::new(did.clone(), &private_key).await?; Ok(Self { signer, did }) @@ -60,12 +59,12 @@ impl Credentials { Ok(Self { signer, did }) } - fn generate_did_for_jwk(key: &JWK) -> anyhow::Result { + fn generate_did_for_jwk(key: &JWK) -> anyhow::Result { let did = did_method_key::DIDKey .generate(&Source::Key(key)) .ok_or_else(|| anyhow::anyhow!("Failed to generate DID"))?; - let doc: DidDocument = DocumentBuilder::default() + let doc = DocumentBuilder::default() .id(did) .build() .map_err(|e| anyhow::anyhow!("failed to build DID document: {}", e))?; @@ -74,7 +73,7 @@ impl Credentials { } /// Returns (Private Key, DID Document) - fn generate_did_and_pk() -> anyhow::Result<(String, DidDocument)> { + fn generate_did_and_pk() -> anyhow::Result<(String, Document)> { let key = jwk::JWK::generate_ed25519()?; let private_key = if let Params::OKP(params) = &key.params { let pk = params diff --git a/runner/src/scenario/ceramic/util.rs b/runner/src/scenario/ceramic/util.rs deleted file mode 100644 index 6d69ad66..00000000 --- a/runner/src/scenario/ceramic/util.rs +++ /dev/null @@ -1,23 +0,0 @@ -use goose::GooseError; - -pub fn goose_error(err: anyhow::Error) -> GooseError { - GooseError::Io(std::io::Error::new(std::io::ErrorKind::Other, err)) -} - -/// Macro to transform errors from an expression to a goose transaction failiure -#[macro_export] -macro_rules! goose_try { - ($user:ident, $tag:expr, $request:expr, $func:expr) => { - match $func { - Ok(ret) => Ok(ret), - Err(e) => { - let err = e.to_string(); - if let Err(e) = $user.set_failure($tag, $request, None, Some(&err)) { - Err(e) - } else { - panic!("Unreachable") - } - } - } - }; -} diff --git a/runner/src/scenario/ipfs_block_fetch.rs b/runner/src/scenario/ipfs_block_fetch.rs index 4d57e397..21c84097 100644 --- a/runner/src/scenario/ipfs_block_fetch.rs +++ b/runner/src/scenario/ipfs_block_fetch.rs @@ -1,13 +1,15 @@ +use std::{sync::Arc, time::Duration}; + use anyhow::Result; -use ceramic_core::Cid; +use ceramic_core::{Cid, DagCborEncoded}; use goose::prelude::*; -use libipld::prelude::Codec; -use libipld::{ipld, json::DagJsonCodec}; -use multihash::{Code, MultihashDigest}; -use std::{sync::Arc, time::Duration}; +use ipld_core::ipld; +use multihash_codetable::{Code, MultihashDigest}; use crate::simulate::Topology; +use super::util::DAG_CBOR_CODEC; + pub fn scenario(topo: Topology) -> Result { let put: Transaction = Transaction::new(Arc::new(move |user| { Box::pin(async move { put(topo, user).await }) @@ -40,22 +42,22 @@ fn global_user_id(user: usize, topo: Topology) -> u64 { } /// Produce DAG-JSON IPLD node that contains determisiticly unique data for the user. -fn user_data(local_user: usize, topo: Topology) -> (Cid, Vec) { +fn user_data(local_user: usize, topo: Topology) -> (Cid, DagCborEncoded) { let id = global_user_id(local_user, topo); let data = ipld!({ "user": id, "nonce": topo.nonce, }); - let bytes = DagJsonCodec.encode(&data).unwrap(); - - let hash = Code::Sha2_256.digest(bytes.as_slice()); - (Cid::new_v1(DagJsonCodec.into(), hash), bytes) + let bytes = DagCborEncoded::new(&data).unwrap(); + let cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(bytes.as_ref())); + (cid, bytes) } // Generate determisitic random data and put it into IPFS async fn put(topo: Topology, user: &mut GooseUser) -> TransactionResult { let (cid, data) = user_data(user.weighted_users_index, topo); + let data = data.as_ref().to_vec(); println!( "put id: {} user: {} nonce: {} cid: {}", topo.target_worker, user.weighted_users_index, topo.nonce, cid, @@ -118,6 +120,7 @@ async fn get(mut topo: Topology, user: &mut GooseUser) -> TransactionResult { // Check that all written data is accounted for. async fn check(topo: Topology, user: &mut GooseUser) -> TransactionResult { let (cid, data) = user_data(user.weighted_users_index, topo); + let data = data.as_ref().to_vec(); println!( "stop id: {} user: {} cid: {}", topo.target_worker, user.weighted_users_index, cid, diff --git a/runner/src/scenario/mod.rs b/runner/src/scenario/mod.rs index 4984b331..ee00d7aa 100644 --- a/runner/src/scenario/mod.rs +++ b/runner/src/scenario/mod.rs @@ -1,9 +1,10 @@ -use crate::scenario::ceramic::util::goose_error; +use crate::scenario::util::goose_error; use goose::GooseError; pub mod ceramic; pub mod ipfs_block_fetch; pub mod recon_sync; +pub mod util; static FIRST_USER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true); diff --git a/runner/src/scenario/recon_sync.rs b/runner/src/scenario/recon_sync.rs index eeebeb7e..9630696e 100644 --- a/runner/src/scenario/recon_sync.rs +++ b/runner/src/scenario/recon_sync.rs @@ -1,17 +1,17 @@ +use std::str::FromStr; +use std::sync::atomic::AtomicU64; +use std::{sync::Arc, time::Duration}; + +use ceramic_http_client::ceramic_event::{StreamId, StreamIdType}; +use goose::prelude::*; +use tracing::{info, instrument}; + use crate::scenario::ceramic::model_instance::{loop_until_key_value_set, set_key_to_stream_id}; use crate::scenario::{ get_redis_client, is_goose_global_leader, is_goose_lead_user, is_goose_lead_worker, }; -use ceramic_core::{Cid, EventId}; -use ceramic_http_client::ceramic_event::{StreamId, StreamIdType}; -use goose::prelude::*; -use libipld::cid; -use multihash::{Code, MultihashDigest}; -use rand::rngs::ThreadRng; -use rand::Rng; -use std::sync::atomic::AtomicU64; -use std::{sync::Arc, time::Duration}; -use tracing::{info, instrument}; + +use super::util::random_init_event_car; const MODEL_ID_KEY: &str = "event_id_sync_model_id"; pub(crate) const CREATE_EVENT_TX_NAME: &str = "create_new_event"; @@ -22,17 +22,16 @@ pub(crate) const CREATE_EVENT_REQ_NAME: &str = "POST create_new_event"; static NEW_EVENT_CNT: AtomicU64 = AtomicU64::new(0); static TOTAL_BYTES_GENERATED: AtomicU64 = AtomicU64::new(0); -#[derive(Clone)] +#[derive(Debug)] struct ReconCeramicModelInstanceTestUser { model_id: StreamId, - with_data: bool, } -async fn init_scenario(with_data: bool) -> Result { +async fn init_scenario() -> Result { let redis_cli = get_redis_client().await?; let test_start = Transaction::new(Arc::new(move |user| { - Box::pin(setup(user, redis_cli.clone(), with_data)) + Box::pin(setup(user, redis_cli.clone())) })) .set_name("setup") .set_on_start(); @@ -54,7 +53,7 @@ async fn log_results(_user: &mut GooseUser) -> TransactionResult { } pub async fn event_sync_scenario() -> Result { - let test_start = init_scenario(true).await?; + let test_start = init_scenario().await?; let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME); let stop = transaction!(log_results) .set_name("log_results") @@ -68,20 +67,21 @@ pub async fn event_sync_scenario() -> Result { /// One user on one node creates a model. /// One user on each node subscribes to the model via Recon #[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] -async fn setup( - user: &mut GooseUser, - redis_cli: redis::Client, - with_data: bool, -) -> TransactionResult { +async fn setup(user: &mut GooseUser, redis_cli: redis::Client) -> TransactionResult { let mut conn = redis_cli.get_async_connection().await.unwrap(); let first = is_goose_global_leader(is_goose_lead_user()); + let model_id = if first { info!("creating model for event ID sync test"); // We only need a model ID we do not need it to be a real model. + // CID version mismatch between http/c1 versions right now + let cid = random_cid().unwrap(); + // could hard code an ID if we wanted the test to be different let model_id = StreamId { r#type: StreamIdType::Model, - cid: random_cid(), + cid, }; + set_key_to_stream_id(&mut conn, MODEL_ID_KEY, &model_id).await; // TODO: set a real model @@ -91,15 +91,11 @@ async fn setup( loop_until_key_value_set(&mut conn, MODEL_ID_KEY).await }; - tracing::debug!(%model_id, "syncing model"); + tracing::info!(%model_id, "syncing model"); let path = format!("/ceramic/interests/model/{}", model_id); - let user_data = ReconCeramicModelInstanceTestUser { - model_id, - with_data, - }; + let user_data = ReconCeramicModelInstanceTestUser { model_id }; user.set_session_data(user_data); - let request_builder = user .get_request_builder(&GooseMethod::Post, &path)? .timeout(Duration::from_secs(5)); @@ -124,15 +120,16 @@ async fn create_new_event(user: &mut GooseUser) -> TransactionResult { let user_data: &ReconCeramicModelInstanceTestUser = user .get_session_data() .expect("we are missing sync_event_id user data"); - + let data = random_init_event_car( + SORT_KEY, + user_data.model_id.to_vec().unwrap(), + Some(TEST_CONTROLLER.to_string()), + ) + .await + .unwrap(); + let event_key_body = serde_json::json!({"data": data}); // eventId needs to be a multibase encoded string for the API to accept it - let event_id = format!("F{}", random_event_id(&user_data.model_id.to_string())); - let event_key_body = if user_data.with_data { - let payload = random_car_1kb_body().await; - serde_json::json!({"id": event_id, "data": payload}) - } else { - serde_json::json!({"id": event_id}) - }; + let cnt = NEW_EVENT_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if cnt == 0 || cnt % 1000 == 0 { @@ -153,54 +150,18 @@ async fn create_new_event(user: &mut GooseUser) -> TransactionResult { } } -fn random_cid() -> cid::Cid { - let mut data = [0u8; 8]; - rand::Rng::fill(&mut rand::thread_rng(), &mut data); - let hash = Code::Sha2_256.digest(data.as_slice()); - Cid::new_v1(0x00, hash) -} - const SORT_KEY: &str = "model"; // hard code test controller in case we want to find/prune later const TEST_CONTROLLER: &str = "did:key:z6MkoFUppcKEVYTS8oVidrja94UoJTatNhnhxJRKF7NYPScS"; -fn random_event_id(sort_value: &str) -> ceramic_core::EventId { - let cid = random_cid(); - EventId::new( - &ceramic_core::Network::Local(0), - SORT_KEY, - sort_value, - TEST_CONTROLLER, - &cid, - 0, - &cid, - ) -} - -fn random_block() -> (Cid, Vec) { - let mut rng = rand::thread_rng(); - TOTAL_BYTES_GENERATED.fetch_add(1000, std::sync::atomic::Ordering::Relaxed); - let unique: [u8; 1000] = gen_rand_bytes(&mut rng); +// TODO: delete. mismatch between http and c1 versions currently. in flight updates. +fn random_cid() -> anyhow::Result { + use multihash_codetable::MultihashDigest; - let hash = ::multihash::MultihashDigest::digest(&::multihash::Code::Sha2_256, &unique); - (Cid::new_v1(0x00, hash), unique.to_vec()) -} - -async fn random_car_1kb_body() -> String { - let mut bytes = Vec::with_capacity(1500); - let (cid, block) = random_block(); - let roots = vec![cid]; - let mut writer = iroh_car::CarWriter::new(iroh_car::CarHeader::V1(roots.into()), &mut bytes); - writer.write(cid, block).await.unwrap(); - writer.finish().await.unwrap(); - - multibase::encode(multibase::Base::Base36Lower, bytes) -} - -fn gen_rand_bytes(rng: &mut ThreadRng) -> [u8; SIZE] { - let mut arr = [0; SIZE]; - for x in &mut arr { - *x = rng.gen_range(0..=255); - } - arr + let mut data = [0u8; 8]; + rand::Rng::fill(&mut rand::thread_rng(), &mut data); + let hash = multihash_codetable::Code::Sha2_256.digest(data.as_slice()); + let hash = multibase::encode(multibase::Base::Base36Lower, hash.to_bytes()); + let cid = ceramic_http_client::ceramic_event::Cid::from_str(&hash)?; + Ok(cid) } diff --git a/runner/src/scenario/util.rs b/runner/src/scenario/util.rs new file mode 100644 index 00000000..a603356d --- /dev/null +++ b/runner/src/scenario/util.rs @@ -0,0 +1,120 @@ +use std::io::Write; + +use ceramic_core::{Cid, DagCborEncoded}; +use goose::GooseError; +use ipld_core::ipld; +use multihash_codetable::{Code, MultihashDigest}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use unsigned_varint::encode; + +pub fn goose_error(err: anyhow::Error) -> GooseError { + GooseError::Io(std::io::Error::new(std::io::ErrorKind::Other, err)) +} + +/// Macro to transform errors from an expression to a goose transaction failiure +#[macro_export] +macro_rules! goose_try { + ($user:ident, $tag:expr, $request:expr, $func:expr) => { + match $func { + Ok(ret) => Ok(ret), + Err(e) => { + let err = e.to_string(); + if let Err(e) = $user.set_failure($tag, $request, None, Some(&err)) { + Err(e) + } else { + panic!("Unreachable") + } + } + } + }; +} + +pub(crate) const DAG_CBOR_CODEC: u64 = 0x71; + +/// Create a new Ceramic stream +pub fn create_stream() -> anyhow::Result<( + ceramic_http_client::ceramic_event::StreamId, + Cid, + DagCborEncoded, +)> { + let controller: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(32) + .map(char::from) + .collect(); + + let genesis_commit = ipld!({ + "header": { + "unique": gen_rand_bytes::<12>().as_slice(), + "controllers": [controller] + } + }); + + let bytes = DagCborEncoded::new(&genesis_commit)?; + let cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(bytes.as_ref())); + + let stream_id = write_stream_bytes(&cid)?; + let stream_id = ceramic_http_client::ceramic_event::StreamId::try_from(stream_id.as_slice())?; + Ok((stream_id, cid, bytes)) +} + +const STREAMID_CODEC: u64 = 206; + +pub fn write_stream_bytes(cid: &Cid) -> anyhow::Result> { + let mut writer = std::io::BufWriter::new(Vec::new()); + let mut buf = encode::u64_buffer(); + let v = encode::u64(STREAMID_CODEC, &mut buf); + writer.write_all(v)?; + let v = encode::u64(3, &mut buf); // Model instance doc + writer.write_all(v)?; + cid.write_bytes(&mut writer)?; + writer.flush()?; + Ok(writer.into_inner()?) +} + +pub(crate) async fn random_init_event_car( + sort_key: &str, + model: Vec, + controller: Option, +) -> anyhow::Result { + let controller = if let Some(owner) = controller { + owner + } else { + thread_rng() + .sample_iter(&Alphanumeric) + .take(32) + .map(char::from) + .collect() + }; + + let unique = gen_rand_bytes::<12>(); + let init = ipld!({ + "header": { + "controllers": [controller], + "model": model, + "sep": sort_key, + "unique": unique.as_slice(), + } + }); + + let block = DagCborEncoded::new(&init)?; + let cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(block.as_ref())); + + let mut buf = Vec::new(); + let roots = vec![cid]; + let mut writer = iroh_car::CarWriter::new(iroh_car::CarHeader::V1(roots.into()), &mut buf); + writer.write(cid, block).await?; + writer.finish().await.unwrap(); + + Ok(multibase::encode(multibase::Base::Base36Lower, buf)) +} + +fn gen_rand_bytes() -> [u8; SIZE] { + // can't take &mut rng cause of Send even if we drop it + let mut rng = thread_rng(); + let mut arr = [0; SIZE]; + for x in &mut arr { + *x = rng.gen_range(0..=255); + } + arr +}