Skip to content

Commit

Permalink
Merge branch 'stefan/psw_parallel_creation_and_sending' into stefan/i…
Browse files Browse the repository at this point in the history
…mproved_parallel_forknet
  • Loading branch information
stedfn committed Jan 6, 2025
2 parents a052e36 + c085f2a commit 356ef6b
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 82 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 64 additions & 4 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;

use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use crate::resharding::types::{
Expand Down Expand Up @@ -1025,9 +1025,10 @@ fn copy_kv_to_child(

// Sanity check we are truly writing to one of the expected children shards.
if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard {
let err_msg = "account id doesn't map to any child shard!";
error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);
return Err(Error::ReshardingError(err_msg.to_string()));
let err_msg = "account id doesn't map to any child shard! - skipping it";
warn!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);
// Do not fail resharding. Just skip this entry.
return Ok(());
}
// Add the new flat store entry.
store_update.set(new_shard_uid, key, value);
Expand Down Expand Up @@ -2561,4 +2562,63 @@ mod tests {
Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }))
);
}

/// This test asserts that resharding doesn't fail if flat storage iteration goes over an account
/// which is not part of any children shards after the shard layout changes.
#[test]
fn unrelated_account_do_not_fail_splitting() {
init_test_logger();
let (mut chain, resharder, sender) =
create_chain_resharder_sender::<DelayedSender>(simple_shard_layout());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);
let ReshardingSplitShardParams {
parent_shard, left_child_shard, right_child_shard, ..
} = match resharding_event_type.clone() {
ReshardingEventType::SplitShard(params) => params,
};
let flat_store = resharder.runtime.store().flat_store();

// Add two blocks on top of genesis. This will make the resharding block (height 0) final.
add_blocks_to_chain(
&mut chain,
2,
PreviousBlockHeight::ChainHead,
NextBlockHeight::ChainHeadPlusOne,
);

// Inject an account which doesn't belong to the parent shard into its flat storage.
let mut store_update = flat_store.store_update();
let test_value = Some(FlatStateValue::Inlined(vec![0]));
let key = TrieKey::Account { account_id: account!("ab") };
store_update.set(parent_shard, key.to_vec(), test_value);
store_update.commit().unwrap();

// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());
sender.call_split_shard_task();

// Check final status of parent flat storage.
let parent = ShardUId { version: 3, shard_id: 1 };
assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty));
assert_eq!(flat_store.iter(parent).count(), 0);
assert!(resharder
.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent)
.is_none());

// Check intermediate status of children flat storages.
// If children reached the catching up state, it means that the split task succeeded.
for child in [left_child_shard, right_child_shard] {
assert_eq!(
flat_store.get_flat_storage_status(child),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
chain.final_head().unwrap().into()
)))
);
// However, the unrelated account should not end up in any child.
assert!(flat_store.get(child, &key.to_vec()).is_ok_and(|val| val.is_none()));
}
}
}
19 changes: 10 additions & 9 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,15 +638,16 @@ pub(crate) static BLOCK_PRODUCER_MISSING_ENDORSEMENT_COUNT: LazyLock<HistogramVe
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});
pub(crate) static PARTIAL_WITNESS_ENCODE_AND_SEND_TIME: LazyLock<HistogramVec> =
LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_and_send_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter;
use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage};
use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache};
use rand::Rng;
use rayon::{iter::ParallelIterator, prelude::*};

use crate::client_actor::ClientSenderForPartialWitness;
use crate::metrics;
Expand Down Expand Up @@ -251,14 +252,14 @@ impl PartialWitnessActor {
}

// Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part.
fn generate_state_witness_parts(
fn generate_and_send_state_witness_parts(
&mut self,
epoch_id: EpochId,
chunk_header: &ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Vec<(AccountId, PartialEncodedStateWitness)> {
) {
tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
Expand All @@ -270,24 +271,28 @@ impl PartialWitnessActor {
let encoder = self.witness_encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
chunk_validators.par_iter().zip_eq(parts.into_par_iter()).enumerate().for_each(
|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
epoch_id,
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
part.unwrap().into_vec(),
encoded_length,
signer,
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec()

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(
chunk_validator.clone(),
partial_witness,
),
));
},
);
}

fn generate_contract_deploys_parts(
Expand Down Expand Up @@ -342,10 +347,10 @@ impl PartialWitnessActor {

// Record time taken to encode the state witness parts.
let shard_id_label = chunk_header.shard_id().to_string();
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_AND_SEND_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let validator_witness_tuple = self.generate_state_witness_parts(
self.generate_and_send_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
Expand All @@ -359,13 +364,8 @@ impl PartialWitnessActor {
self.state_witness_tracker.record_witness_sent(
chunk_hash,
witness_size_in_bytes,
validator_witness_tuple.len(),
chunk_validators.len(),
);

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
}

/// Function to handle receiving partial_encoded_state_witness message from chunk producer.
Expand Down
14 changes: 6 additions & 8 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,12 @@ fn process_peer_manager_message_default(
}
}
}
NetworkRequests::PartialEncodedStateWitness(partial_witnesses) => {
for (account, partial_witness) in partial_witnesses {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
NetworkRequests::PartialEncodedStateWitness(account, partial_witness) => {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
}
}
Expand Down
39 changes: 19 additions & 20 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ near-store.workspace = true
near-schema-checker-lib.workspace = true

[dev-dependencies]
near-chain.workspace = true
assert_matches.workspace = true
bolero.workspace = true
criterion.workspace = true
Expand All @@ -76,29 +77,27 @@ serde_json.workspace = true

[features]
nightly_protocol = [
"near-async/nightly_protocol",
"near-chain-configs/nightly_protocol",
"near-fmt/nightly_protocol",
"near-o11y/nightly_protocol",
"near-primitives/nightly_protocol",
"near-store/nightly_protocol",
"near-async/nightly_protocol",
"near-chain-configs/nightly_protocol",
"near-fmt/nightly_protocol",
"near-o11y/nightly_protocol",
"near-primitives/nightly_protocol",
"near-store/nightly_protocol",
]
nightly = [
"near-async/nightly",
"near-chain-configs/nightly",
"near-fmt/nightly",
"near-o11y/nightly",
"near-primitives/nightly",
"near-store/nightly",
"nightly_protocol",
]
performance_stats = [
"near-performance-metrics/performance_stats",
"near-async/nightly",
"near-chain-configs/nightly",
"near-fmt/nightly",
"near-o11y/nightly",
"near-primitives/nightly",
"near-store/nightly",
"nightly_protocol",
]
performance_stats = ["near-performance-metrics/performance_stats"]
test_features = []
protocol_schema = [
"near-schema-checker-lib/protocol_schema",
"near-crypto/protocol_schema",
"near-primitives/protocol_schema",
"near-store/protocol_schema",
"near-schema-checker-lib/protocol_schema",
"near-crypto/protocol_schema",
"near-primitives/protocol_schema",
"near-store/protocol_schema",
]
Loading

0 comments on commit 356ef6b

Please sign in to comment.