diff --git a/bootstrap_cache/src/cache_store.rs b/bootstrap_cache/src/cache_store.rs index b90e91e309..6d4b6877bf 100644 --- a/bootstrap_cache/src/cache_store.rs +++ b/bootstrap_cache/src/cache_store.rs @@ -6,7 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{BootstrapPeer, Error, Result, InitialPeerDiscovery}; +use crate::{BootstrapPeer, Error, InitialPeerDiscovery, Result}; +use fs2::FileExt; use libp2p::Multiaddr; use serde::{Deserialize, Serialize}; use std::fs::{self, File, OpenOptions}; @@ -14,7 +15,6 @@ use std::io::{self, Read}; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use fs2::FileExt; use tempfile::NamedTempFile; use tokio::sync::RwLock; @@ -57,8 +57,7 @@ impl CacheStore { // Create cache directory if it doesn't exist if let Some(parent) = cache_path.parent() { - fs::create_dir_all(parent) - .map_err(|e| Error::Io(e))?; + fs::create_dir_all(parent).map_err(|e| Error::Io(e))?; } let data = if cache_path.exists() { @@ -66,7 +65,8 @@ impl CacheStore { Ok(data) => { // If cache data exists but has no peers and file is not read-only, // fallback to default - let is_readonly = cache_path.metadata() + let is_readonly = cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -77,7 +77,8 @@ impl CacheStore { let mut filtered_data = data; if filtered_data.peers.len() > config.max_peers { let peers: Vec<_> = filtered_data.peers.into_iter().collect(); - filtered_data.peers = peers.into_iter() + filtered_data.peers = peers + .into_iter() .take(config.max_peers) .map(|(k, v)| (k, v)) .collect(); @@ -103,7 +104,9 @@ impl CacheStore { }; // Only clean up stale peers if the file is not read-only - let is_readonly = store.cache_path.metadata() + let is_readonly = store + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -194,12 +197,20 @@ impl CacheStore { // If we have no reliable peers and the cache file is not read-only, // try to refresh from default endpoints - if reliable_peers.is_empty() && !self.cache_path.metadata().map(|m| m.permissions().readonly()).unwrap_or(false) { + if reliable_peers.is_empty() + && !self + .cache_path + .metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false) + { drop(data); if let Ok(new_data) = Self::fallback_to_default(&self.config).await { let mut data = self.data.write().await; *data = new_data; - return data.peers.values() + return data + .peers + .values() .filter(|peer| peer.success_count > peer.failure_count) .cloned() .collect(); @@ -211,7 +222,9 @@ impl CacheStore { pub async fn update_peer_status(&self, addr: &str, success: bool) -> Result<()> { // Check if the file is read-only before attempting to modify - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -221,26 +234,29 @@ impl CacheStore { } let mut data = self.data.write().await; - + match addr.parse::() { Ok(addr) => { - let peer = data.peers.entry(addr.to_string()).or_insert_with(|| { - BootstrapPeer::new(addr) - }); + let peer = data + .peers + .entry(addr.to_string()) + .or_insert_with(|| BootstrapPeer::new(addr)); peer.update_status(success); self.save_to_disk(&data).await?; Ok(()) } Err(e) => Err(Error::from(std::io::Error::new( std::io::ErrorKind::InvalidInput, - format!("Invalid multiaddr: {}", e) + format!("Invalid multiaddr: {}", e), ))), } } pub async fn add_peer(&self, addr: Multiaddr) -> Result<()> { // Check if the cache file is read-only before attempting any modifications - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -252,7 +268,11 @@ impl CacheStore { let mut data = self.data.write().await; let addr_str = addr.to_string(); - tracing::debug!("Adding peer {}, current peers: {}", addr_str, data.peers.len()); + tracing::debug!( + "Adding peer {}, current peers: {}", + addr_str, + data.peers.len() + ); // If the peer already exists, just update its last_seen time if let Some(peer) = data.peers.get_mut(&addr_str) { @@ -264,17 +284,24 @@ impl CacheStore { // Only add new peers if we haven't reached max_peers if data.peers.len() < self.config.max_peers { tracing::debug!("Adding new peer {} (under max_peers limit)", addr_str); - data.peers.insert(addr_str.clone(), BootstrapPeer::new(addr)); + data.peers + .insert(addr_str.clone(), BootstrapPeer::new(addr)); self.save_to_disk(&data).await?; } else { // If we're at max_peers, replace the oldest peer - if let Some((oldest_addr, oldest_peer)) = data.peers.iter() - .min_by_key(|(_, peer)| peer.last_seen) { - tracing::debug!("Replacing oldest peer {} (last seen: {:?}) with new peer {}", - oldest_addr, oldest_peer.last_seen, addr_str); + if let Some((oldest_addr, oldest_peer)) = + data.peers.iter().min_by_key(|(_, peer)| peer.last_seen) + { + tracing::debug!( + "Replacing oldest peer {} (last seen: {:?}) with new peer {}", + oldest_addr, + oldest_peer.last_seen, + addr_str + ); let oldest_addr = oldest_addr.clone(); data.peers.remove(&oldest_addr); - data.peers.insert(addr_str.clone(), BootstrapPeer::new(addr)); + data.peers + .insert(addr_str.clone(), BootstrapPeer::new(addr)); self.save_to_disk(&data).await?; } } @@ -284,7 +311,9 @@ impl CacheStore { pub async fn remove_peer(&self, addr: &str) -> Result<()> { // Check if the file is read-only before attempting to modify - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -301,7 +330,9 @@ impl CacheStore { pub async fn cleanup_unreliable_peers(&self) -> Result<()> { // Check if the file is read-only before attempting to modify - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -328,7 +359,9 @@ impl CacheStore { pub async fn cleanup_stale_peers(&self) -> Result<()> { // Check if the file is read-only before attempting to modify - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -361,7 +394,9 @@ impl CacheStore { pub async fn save_to_disk(&self, data: &CacheData) -> Result<()> { // Check if the file is read-only before attempting to write - let is_readonly = self.cache_path.metadata() + let is_readonly = self + .cache_path + .metadata() .map(|m| m.permissions().readonly()) .unwrap_or(false); @@ -380,18 +415,16 @@ impl CacheStore { } async fn acquire_shared_lock(file: &File) -> Result<()> { - let file = file.try_clone() - .map_err(|e| Error::from(e))?; - - tokio::task::spawn_blocking(move || { - file.try_lock_shared() - .map_err(|e| Error::from(e)) - }) - .await - .map_err(|e| Error::from(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to spawn blocking task: {}", e) - )))? + let file = file.try_clone().map_err(|e| Error::from(e))?; + + tokio::task::spawn_blocking(move || file.try_lock_shared().map_err(|e| Error::from(e))) + .await + .map_err(|e| { + Error::from(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to spawn blocking task: {}", e), + )) + })? } async fn acquire_exclusive_lock(file: &File) -> Result<()> { @@ -418,17 +451,14 @@ impl CacheStore { async fn atomic_write(&self, data: &CacheData) -> Result<()> { // Create parent directory if it doesn't exist if let Some(parent) = self.cache_path.parent() { - fs::create_dir_all(parent) - .map_err(|e| Error::from(e))?; + fs::create_dir_all(parent).map_err(|e| Error::from(e))?; } // Create a temporary file in the same directory as the cache file - let temp_file = NamedTempFile::new() - .map_err(|e| Error::from(e))?; + let temp_file = NamedTempFile::new().map_err(|e| Error::from(e))?; // Write data to temporary file - serde_json::to_writer_pretty(&temp_file, &data) - .map_err(|e| Error::from(e))?; + serde_json::to_writer_pretty(&temp_file, &data).map_err(|e| Error::from(e))?; // Open the target file with proper permissions let file = OpenOptions::new() @@ -442,11 +472,12 @@ impl CacheStore { Self::acquire_exclusive_lock(&file).await?; // Perform atomic rename - temp_file.persist(&self.cache_path) - .map_err(|e| Error::from(std::io::Error::new( + temp_file.persist(&self.cache_path).map_err(|e| { + Error::from(std::io::Error::new( std::io::ErrorKind::Other, - format!("Failed to persist cache file: {}", e) - )))?; + format!("Failed to persist cache file: {}", e), + )) + })?; // Lock will be automatically released when file is dropped Ok(()) @@ -479,16 +510,20 @@ mod tests { async fn test_peer_update_and_save() { let (store, _) = create_test_store().await; let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); - + // Manually add a peer without using fallback { let mut data = store.data.write().await; - data.peers.insert(addr.to_string(), BootstrapPeer::new(addr.clone())); + data.peers + .insert(addr.to_string(), BootstrapPeer::new(addr.clone())); store.save_to_disk(&data).await.unwrap(); } - - store.update_peer_status(&addr.to_string(), true).await.unwrap(); - + + store + .update_peer_status(&addr.to_string(), true) + .await + .unwrap(); + let peers = store.get_peers().await; assert_eq!(peers.len(), 1); assert_eq!(peers[0].addr, addr); @@ -501,20 +536,26 @@ mod tests { let (store, _) = create_test_store().await; let good_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); let bad_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap(); - + // Add peers store.add_peer(good_addr.clone()).await.unwrap(); store.add_peer(bad_addr.clone()).await.unwrap(); - + // Make one peer reliable and one unreliable - store.update_peer_status(&good_addr.to_string(), true).await.unwrap(); + store + .update_peer_status(&good_addr.to_string(), true) + .await + .unwrap(); for _ in 0..5 { - store.update_peer_status(&bad_addr.to_string(), false).await.unwrap(); + store + .update_peer_status(&bad_addr.to_string(), false) + .await + .unwrap(); } - + // Clean up unreliable peers store.cleanup_unreliable_peers().await.unwrap(); - + // Get all peers (not just reliable ones) let peers = store.get_peers().await; assert_eq!(peers.len(), 1); @@ -525,7 +566,7 @@ mod tests { async fn test_stale_peer_cleanup() { let (store, _) = create_test_store().await; let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); - + // Add a peer with more failures than successes let mut peer = BootstrapPeer::new(addr.clone()); peer.success_count = 1; @@ -535,10 +576,10 @@ mod tests { data.peers.insert(addr.to_string(), peer); store.save_to_disk(&data).await.unwrap(); } - + // Clean up unreliable peers store.cleanup_unreliable_peers().await.unwrap(); - + // Should have no peers since the only peer was unreliable let peers = store.get_reliable_peers().await; assert_eq!(peers.len(), 0); @@ -549,35 +590,39 @@ mod tests { let (store, _) = create_test_store().await; let store = Arc::new(store); let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); - + // Manually add a peer without using fallback { let mut data = store.data.write().await; - data.peers.insert(addr.to_string(), BootstrapPeer::new(addr.clone())); + data.peers + .insert(addr.to_string(), BootstrapPeer::new(addr.clone())); store.save_to_disk(&data).await.unwrap(); } - + let mut handles = vec![]; - + // Spawn multiple tasks to update peer status concurrently for i in 0..10 { let store = Arc::clone(&store); let addr = addr.clone(); - + handles.push(tokio::spawn(async move { - store.update_peer_status(&addr.to_string(), i % 2 == 0).await.unwrap(); + store + .update_peer_status(&addr.to_string(), i % 2 == 0) + .await + .unwrap(); })); } - + // Wait for all tasks to complete for handle in handles { handle.await.unwrap(); } - + // Verify the final state - should have one peer let peers = store.get_peers().await; assert_eq!(peers.len(), 1); - + // The peer should have a mix of successes and failures assert!(peers[0].success_count > 0); assert!(peers[0].failure_count > 0); diff --git a/bootstrap_cache/src/circuit_breaker.rs b/bootstrap_cache/src/circuit_breaker.rs index 88c6b6cdc3..5584b117ca 100644 --- a/bootstrap_cache/src/circuit_breaker.rs +++ b/bootstrap_cache/src/circuit_breaker.rs @@ -45,10 +45,7 @@ impl EndpointState { self.last_failure = Instant::now(); self.last_attempt = Instant::now(); // Exponential backoff with max limit - self.backoff_duration = std::cmp::min( - self.backoff_duration * 2, - max_backoff, - ); + self.backoff_duration = std::cmp::min(self.backoff_duration * 2, max_backoff); } fn record_success(&mut self, min_backoff: Duration) { @@ -97,11 +94,13 @@ impl CircuitBreaker { pub async fn check_endpoint(&self, endpoint: &str) -> bool { let mut states = self.states.write().await; - let state = states.entry(endpoint.to_string()).or_insert_with(|| { - EndpointState::new(self.config.min_backoff) - }); - - if state.is_open(self.config.max_failures, self.config.reset_timeout) && !state.should_retry() { + let state = states + .entry(endpoint.to_string()) + .or_insert_with(|| EndpointState::new(self.config.min_backoff)); + + if state.is_open(self.config.max_failures, self.config.reset_timeout) + && !state.should_retry() + { false } else { true @@ -117,15 +116,16 @@ impl CircuitBreaker { pub async fn record_failure(&self, endpoint: &str) { let mut states = self.states.write().await; - let state = states.entry(endpoint.to_string()).or_insert_with(|| { - EndpointState::new(self.config.min_backoff) - }); + let state = states + .entry(endpoint.to_string()) + .or_insert_with(|| EndpointState::new(self.config.min_backoff)); state.record_failure(self.config.max_backoff); } pub async fn get_backoff_duration(&self, endpoint: &str) -> Duration { let states = self.states.read().await; - states.get(endpoint) + states + .get(endpoint) .map(|state| state.backoff_duration) .unwrap_or(self.config.min_backoff) } @@ -176,11 +176,17 @@ mod tests { // Record a failure cb.record_failure(endpoint).await; - assert_eq!(cb.get_backoff_duration(endpoint).await, config.min_backoff * 2); + assert_eq!( + cb.get_backoff_duration(endpoint).await, + config.min_backoff * 2 + ); // Record another failure cb.record_failure(endpoint).await; - assert_eq!(cb.get_backoff_duration(endpoint).await, config.min_backoff * 4); + assert_eq!( + cb.get_backoff_duration(endpoint).await, + config.min_backoff * 4 + ); // Success should reset backoff cb.record_success(endpoint).await; diff --git a/bootstrap_cache/src/config.rs b/bootstrap_cache/src/config.rs index 659104f0a7..7c3328bbc3 100644 --- a/bootstrap_cache/src/config.rs +++ b/bootstrap_cache/src/config.rs @@ -6,8 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use std::time::Duration; use std::path::{Path, PathBuf}; +use std::time::Duration; /// Configuration for the bootstrap cache #[derive(Clone, Debug)] @@ -146,7 +146,7 @@ mod tests { Duration::from_secs(5), 5, ); - + assert_eq!(config.endpoints, endpoints); assert_eq!(config.max_peers, 2000); assert_eq!(config.cache_file_path, path); diff --git a/bootstrap_cache/src/initial_peer_discovery.rs b/bootstrap_cache/src/initial_peer_discovery.rs index 3406c54796..fe106e402b 100644 --- a/bootstrap_cache/src/initial_peer_discovery.rs +++ b/bootstrap_cache/src/initial_peer_discovery.rs @@ -6,14 +6,18 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{BootstrapPeer, BootstrapEndpoints, Error, Result, circuit_breaker::{CircuitBreaker, CircuitBreakerConfig}}; +use crate::{ + circuit_breaker::{CircuitBreaker, CircuitBreakerConfig}, + BootstrapEndpoints, BootstrapPeer, Error, Result, +}; use libp2p::Multiaddr; use reqwest::Client; use serde_json; -use tracing::{info, warn}; use tokio::time::timeout; +use tracing::{info, warn}; -const DEFAULT_JSON_ENDPOINT: &str = "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json"; +const DEFAULT_JSON_ENDPOINT: &str = + "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json"; const DEFAULT_BOOTSTRAP_ENDPOINTS: &[&str] = &[ DEFAULT_JSON_ENDPOINT, @@ -34,7 +38,10 @@ pub struct InitialPeerDiscovery { impl InitialPeerDiscovery { pub fn new() -> Self { Self { - endpoints: DEFAULT_BOOTSTRAP_ENDPOINTS.iter().map(|s| s.to_string()).collect(), + endpoints: DEFAULT_BOOTSTRAP_ENDPOINTS + .iter() + .map(|s| s.to_string()) + .collect(), client: Client::new(), circuit_breaker: CircuitBreaker::new(), } @@ -48,7 +55,10 @@ impl InitialPeerDiscovery { } } - pub fn with_config(endpoints: Vec, circuit_breaker_config: CircuitBreakerConfig) -> Self { + pub fn with_config( + endpoints: Vec, + circuit_breaker_config: CircuitBreakerConfig, + ) -> Self { Self { endpoints, client: Client::new(), @@ -86,9 +96,14 @@ impl InitialPeerDiscovery { if peers.is_empty() { if let Some(e) = last_error { - Err(Error::NoPeersFound(format!("No valid peers found from any endpoint: {}", e))) + Err(Error::NoPeersFound(format!( + "No valid peers found from any endpoint: {}", + e + ))) } else { - Err(Error::NoPeersFound("No valid peers found from any endpoint".to_string())) + Err(Error::NoPeersFound( + "No valid peers found from any endpoint".to_string(), + )) } } else { Ok(peers) @@ -114,33 +129,38 @@ impl InitialPeerDiscovery { let content = response.text().await?; info!("Received response content: {}", content); - + // Try parsing as JSON first if content.trim().starts_with('{') { match serde_json::from_str::(&content) { Ok(json_endpoints) => { info!("Successfully parsed JSON response"); - let peers = json_endpoints.peers.into_iter() - .filter_map(|addr| { - match addr.parse::() { - Ok(addr) => Some(BootstrapPeer::new(addr)), - Err(e) => { - warn!("Failed to parse multiaddr {}: {}", addr, e); - None - } + let peers = json_endpoints + .peers + .into_iter() + .filter_map(|addr| match addr.parse::() { + Ok(addr) => Some(BootstrapPeer::new(addr)), + Err(e) => { + warn!("Failed to parse multiaddr {}: {}", addr, e); + None } }) .collect::>(); if peers.is_empty() { - Err(Error::NoPeersFound("No valid peers found in JSON response".to_string())) + Err(Error::NoPeersFound( + "No valid peers found in JSON response".to_string(), + )) } else { Ok(peers) } } Err(e) => { warn!("Failed to parse JSON response: {}", e); - Err(Error::InvalidResponse(format!("Invalid JSON format: {}", e))) + Err(Error::InvalidResponse(format!( + "Invalid JSON format: {}", + e + ))) } } } else { @@ -148,24 +168,25 @@ impl InitialPeerDiscovery { let peers = content .lines() .filter(|line| !line.trim().is_empty()) - .filter_map(|line| { - match line.trim().parse::() { - Ok(addr) => Some(BootstrapPeer::new(addr)), - Err(e) => { - warn!("Failed to parse multiaddr {}: {}", line, e); - None - } + .filter_map(|line| match line.trim().parse::() { + Ok(addr) => Some(BootstrapPeer::new(addr)), + Err(e) => { + warn!("Failed to parse multiaddr {}: {}", line, e); + None } }) .collect::>(); if peers.is_empty() { - Err(Error::NoPeersFound("No valid peers found in plain text response".to_string())) + Err(Error::NoPeersFound( + "No valid peers found in plain text response".to_string(), + )) } else { Ok(peers) } } - }.await; + } + .await; match result { Ok(peers) => { @@ -194,9 +215,10 @@ mod tests { Mock::given(method("GET")) .and(path("/")) - .respond_with(ResponseTemplate::new(200).set_body_string( - "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080", - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_string("/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080"), + ) .mount(&mock_server) .await; @@ -227,9 +249,7 @@ mod tests { // Second endpoint succeeds Mock::given(method("GET")) .and(path("/")) - .respond_with(ResponseTemplate::new(200).set_body_string( - "/ip4/127.0.0.1/tcp/8080", - )) + .respond_with(ResponseTemplate::new(200).set_body_string("/ip4/127.0.0.1/tcp/8080")) .mount(&mock_server2) .await; @@ -249,9 +269,11 @@ mod tests { Mock::given(method("GET")) .and(path("/")) - .respond_with(ResponseTemplate::new(200).set_body_string( - "/ip4/127.0.0.1/tcp/8080\ninvalid-addr\n/ip4/127.0.0.2/tcp/8080", - )) + .respond_with( + ResponseTemplate::new(200).set_body_string( + "/ip4/127.0.0.1/tcp/8080\ninvalid-addr\n/ip4/127.0.0.2/tcp/8080", + ), + ) .mount(&mock_server) .await; @@ -286,9 +308,9 @@ mod tests { Mock::given(method("GET")) .and(path("/")) - .respond_with(ResponseTemplate::new(200).set_body_string( - "\n \n/ip4/127.0.0.1/tcp/8080\n \n", - )) + .respond_with( + ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080\n \n"), + ) .mount(&mock_server) .await; diff --git a/bootstrap_cache/src/lib.rs b/bootstrap_cache/src/lib.rs index 1f51482cef..a710af21a6 100644 --- a/bootstrap_cache/src/lib.rs +++ b/bootstrap_cache/src/lib.rs @@ -6,17 +6,17 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -mod initial_peer_discovery; mod cache_store; +mod circuit_breaker; pub mod config; mod error; -mod circuit_breaker; +mod initial_peer_discovery; +use chrono; use libp2p::Multiaddr; use serde::{Deserialize, Serialize}; use std::{fmt, time::SystemTime}; use thiserror::Error; -use chrono; pub use cache_store::CacheStore; pub use config::BootstrapConfig; diff --git a/bootstrap_cache/tests/cache_tests.rs b/bootstrap_cache/tests/cache_tests.rs index 3b03e80666..186eaa263a 100644 --- a/bootstrap_cache/tests/cache_tests.rs +++ b/bootstrap_cache/tests/cache_tests.rs @@ -17,13 +17,20 @@ async fn test_cache_store_operations() -> Result<(), Box> let cache_store = CacheStore::new(config).await?; // Test adding and retrieving peers - let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".parse()?; + let addr: Multiaddr = + "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE" + .parse()?; cache_store.add_peer(addr.clone()).await?; - cache_store.update_peer_status(&addr.to_string(), true).await?; + cache_store + .update_peer_status(&addr.to_string(), true) + .await?; let peers = cache_store.get_reliable_peers().await; assert!(!peers.is_empty(), "Cache should contain the added peer"); - assert!(peers.iter().any(|p| p.addr == addr), "Cache should contain our specific peer"); + assert!( + peers.iter().any(|p| p.addr == addr), + "Cache should contain our specific peer" + ); Ok(()) } @@ -41,16 +48,23 @@ async fn test_cache_persistence() -> Result<(), Box> { let cache_store1 = CacheStore::new(config.clone()).await?; // Add a peer and mark it as reliable - let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".parse()?; + let addr: Multiaddr = + "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE" + .parse()?; cache_store1.add_peer(addr.clone()).await?; - cache_store1.update_peer_status(&addr.to_string(), true).await?; + cache_store1 + .update_peer_status(&addr.to_string(), true) + .await?; // Create a new cache store with the same path let cache_store2 = CacheStore::new(config).await?; let peers = cache_store2.get_reliable_peers().await; - + assert!(!peers.is_empty(), "Cache should persist across instances"); - assert!(peers.iter().any(|p| p.addr == addr), "Specific peer should persist"); + assert!( + peers.iter().any(|p| p.addr == addr), + "Specific peer should persist" + ); Ok(()) } @@ -66,24 +80,36 @@ async fn test_cache_reliability_tracking() -> Result<(), Box Result<(), Box> { // We should have the two most recently added peers (addresses[1] and addresses[2]) for peer in peers { let addr_str = peer.addr.to_string(); - assert!(addresses[1..].iter().any(|a| a.to_string() == addr_str), - "Should have one of the two most recent peers, got {}", addr_str); + assert!( + addresses[1..].iter().any(|a| a.to_string() == addr_str), + "Should have one of the two most recent peers, got {}", + addr_str + ); } Ok(()) @@ -186,7 +215,9 @@ async fn test_cache_file_corruption() -> Result<(), Box> let cache_store = CacheStore::new(config.clone()).await?; // Add a peer - let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER1".parse()?; + let addr: Multiaddr = + "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER1" + .parse()?; cache_store.add_peer(addr.clone()).await?; // Corrupt the cache file @@ -200,9 +231,11 @@ async fn test_cache_file_corruption() -> Result<(), Box> // Should be able to add peers again new_cache_store.add_peer(addr).await?; let peers = new_cache_store.get_peers().await; - assert_eq!(peers.len(), 1, "Should be able to add peers after corruption"); + assert_eq!( + peers.len(), + 1, + "Should be able to add peers after corruption" + ); Ok(()) } - - diff --git a/bootstrap_cache/tests/integration_tests.rs b/bootstrap_cache/tests/integration_tests.rs index 273d9600e9..672ae2cc36 100644 --- a/bootstrap_cache/tests/integration_tests.rs +++ b/bootstrap_cache/tests/integration_tests.rs @@ -61,11 +61,19 @@ async fn test_individual_s3_endpoints() { let endpoint = format!("{}/peers", mock_server.uri()); let discovery = InitialPeerDiscovery::with_endpoints(vec![endpoint.clone()]); - + match discovery.fetch_peers().await { Ok(peers) => { - println!("Successfully fetched {} peers from {}", peers.len(), endpoint); - assert!(!peers.is_empty(), "Expected to find peers from {}", endpoint); + println!( + "Successfully fetched {} peers from {}", + peers.len(), + endpoint + ); + assert!( + !peers.is_empty(), + "Expected to find peers from {}", + endpoint + ); // Verify first peer's multiaddr format if let Some(first_peer) = peers.first() { @@ -77,7 +85,10 @@ async fn test_individual_s3_endpoints() { assert!(addr_str.contains("/p2p/"), "Expected peer ID"); // Try to parse it back to ensure it's valid - assert!(addr_str.parse::().is_ok(), "Should be valid multiaddr"); + assert!( + addr_str.parse::().is_ok(), + "Should be valid multiaddr" + ); } } Err(e) => { @@ -104,7 +115,10 @@ async fn test_response_format() { assert!(components.contains(&"ip4"), "Missing IP4 component"); assert!(components.contains(&"udp"), "Missing UDP component"); assert!(components.contains(&"quic-v1"), "Missing QUIC component"); - assert!(components.iter().any(|&c| c == "p2p"), "Missing P2P component"); + assert!( + components.iter().any(|&c| c == "p2p"), + "Missing P2P component" + ); // Ensure we can parse it back into a multiaddr let parsed: Multiaddr = addr_str.parse().expect("Should be valid multiaddr"); @@ -132,44 +146,54 @@ async fn test_json_endpoint_format() { // Mount the mock Mock::given(method("GET")) - .and(path("/")) // Use root path instead of /peers + .and(path("/")) // Use root path instead of /peers .respond_with(ResponseTemplate::new(200).set_body_string(json_response)) .mount(&mock_server) .await; let endpoint = format!("{}", mock_server.uri()); let discovery = InitialPeerDiscovery::with_endpoints(vec![endpoint.clone()]); - + let peers = discovery.fetch_peers().await.unwrap(); assert_eq!(peers.len(), 2); // Verify peer addresses let addrs: Vec = peers.iter().map(|p| p.addr.to_string()).collect(); - assert!(addrs.contains(&"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".to_string())); - assert!(addrs.contains(&"/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERF".to_string())); + assert!(addrs.contains( + &"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE" + .to_string() + )); + assert!(addrs.contains( + &"/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERF" + .to_string() + )); } #[tokio::test] async fn test_s3_json_format() { init_logging(); - + // Fetch and parse the bootstrap cache JSON - let response = reqwest::get("https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json") - .await - .unwrap(); + let response = + reqwest::get("https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json") + .await + .unwrap(); let json_str = response.text().await.unwrap(); - + // Parse using our BootstrapEndpoints struct let endpoints: BootstrapEndpoints = serde_json::from_str(&json_str).unwrap(); - + // Verify we got all the peers assert_eq!(endpoints.peers.len(), 24); - + // Verify we can parse each peer address for peer in endpoints.peers { peer.parse::().unwrap(); } - + // Verify metadata - assert_eq!(endpoints.metadata.description, "Safe Network testnet bootstrap cache"); + assert_eq!( + endpoints.metadata.description, + "Safe Network testnet bootstrap cache" + ); }