diff --git a/.gitignore b/.gitignore index bf0d0deed0..a13bb1aa5c 100644 --- a/.gitignore +++ b/.gitignore @@ -36,8 +36,7 @@ sn_node_manager/.vagrant .venv/ uv.lock *.so -*.pyc - *.pyc *.swp +/vendor/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 40750e1775..da84a78db1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "autonomi", "autonomi-cli", + "bootstrap_cache", "evmlib", "evm_testnet", "sn_build_info", diff --git a/bootstrap_cache/src/cache.rs b/bootstrap_cache/src/cache.rs index 914f150a0b..74e76a4717 100644 --- a/bootstrap_cache/src/cache.rs +++ b/bootstrap_cache/src/cache.rs @@ -48,7 +48,7 @@ impl CacheManager { Ok(path) } - /// Reads the cache file with file locking + /// Reads the cache file with file locking, handling potential corruption pub fn read_cache(&self) -> Result { debug!("Reading bootstrap cache from {:?}", self.cache_path); @@ -71,10 +71,12 @@ impl CacheManager { })?; let mut contents = String::new(); - file.read_to_string(&mut contents).map_err(|e| { + if let Err(e) = file.read_to_string(&mut contents) { error!("Failed to read cache file: {}", e); - Error::Io(e) - })?; + // Release lock before returning + let _ = file.unlock(); + return Err(Error::Io(e)); + } // Release lock file.unlock().map_err(|e| { @@ -82,10 +84,39 @@ impl CacheManager { Error::LockError })?; - serde_json::from_str(&contents).map_err(|e| { - error!("Failed to parse cache file: {}", e); - Error::Json(e) - }) + // Try to parse the cache, if it fails it might be corrupted + match serde_json::from_str(&contents) { + Ok(cache) => Ok(cache), + Err(e) => { + error!("Cache file appears to be corrupted: {}", e); + Err(Error::CacheCorrupted(e)) + } + } + } + + /// Rebuilds the cache using provided peers or fetches new ones if none provided + pub async fn rebuild_cache(&self, peers: Option>) -> Result { + info!("Rebuilding bootstrap cache"); + + let cache = if let Some(peers) = peers { + info!("Rebuilding cache with {} in-memory peers", peers.len()); + BootstrapCache { + last_updated: chrono::Utc::now(), + peers, + } + } else { + info!("No in-memory peers available, fetching from endpoints"); + let discovery = InitialPeerDiscovery::new(); + let peers = discovery.fetch_peers().await?; + BootstrapCache { + last_updated: chrono::Utc::now(), + peers, + } + }; + + // Write the rebuilt cache + self.write_cache(&cache)?; + Ok(cache) } /// Writes the cache file with file locking and atomic replacement @@ -140,7 +171,9 @@ impl CacheManager { mod tests { use super::*; use chrono::Utc; + use std::fs::OpenOptions; use tempfile::tempdir; + use tokio; #[test] fn test_cache_read_write() { @@ -168,4 +201,164 @@ mod tests { let cache = manager.read_cache().unwrap(); assert!(cache.peers.is_empty()); } + + #[test] + fn test_corrupted_cache_file() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("corrupted.json"); + + // Write corrupted JSON + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&cache_path) + .unwrap(); + file.write_all(b"{invalid json}").unwrap(); + + let manager = CacheManager { cache_path }; + match manager.read_cache() { + Err(Error::CacheCorrupted(_)) => (), + other => panic!("Expected CacheCorrupted error, got {:?}", other), + } + } + + #[test] + fn test_partially_corrupted_cache() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("partial_corrupt.json"); + + // Write partially valid JSON + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&cache_path) + .unwrap(); + file.write_all(b"{\"last_updated\":\"2024-01-01T00:00:00Z\",\"peers\":[{}]}").unwrap(); + + let manager = CacheManager { cache_path }; + match manager.read_cache() { + Err(Error::CacheCorrupted(_)) => (), + other => panic!("Expected CacheCorrupted error, got {:?}", other), + } + } + + #[tokio::test] + async fn test_rebuild_cache_with_memory_peers() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("rebuild.json"); + let manager = CacheManager { cache_path }; + + // Create some test peers + let test_peers = vec![ + BootstrapPeer { + addr: "/ip4/127.0.0.1/tcp/8080".parse().unwrap(), + success_count: 1, + failure_count: 0, + last_success: Some(Utc::now()), + last_failure: None, + } + ]; + + // Rebuild cache with in-memory peers + let rebuilt = manager.rebuild_cache(Some(test_peers.clone())).await.unwrap(); + assert_eq!(rebuilt.peers.len(), 1); + assert_eq!(rebuilt.peers[0].addr, test_peers[0].addr); + + // Verify the cache was written to disk + let read_cache = manager.read_cache().unwrap(); + assert_eq!(read_cache.peers.len(), 1); + assert_eq!(read_cache.peers[0].addr, test_peers[0].addr); + } + + #[tokio::test] + async fn test_rebuild_cache_from_endpoints() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("rebuild_endpoints.json"); + let manager = CacheManager { cache_path }; + + // Write corrupted cache first + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&cache_path) + .unwrap(); + file.write_all(b"{corrupted}").unwrap(); + + // Verify corrupted cache is detected + match manager.read_cache() { + Err(Error::CacheCorrupted(_)) => (), + other => panic!("Expected CacheCorrupted error, got {:?}", other), + } + + // Mock the InitialPeerDiscovery for testing + // Note: In a real implementation, you might want to use a trait for InitialPeerDiscovery + // and mock it properly. This test will actually try to fetch from real endpoints. + match manager.rebuild_cache(None).await { + Ok(cache) => { + // Verify the cache was rebuilt and written + let read_cache = manager.read_cache().unwrap(); + assert_eq!(read_cache.peers.len(), cache.peers.len()); + } + Err(Error::NoPeersFound(_)) => { + // This is also acceptable if no endpoints are reachable during test + () + } + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[test] + fn test_concurrent_cache_access() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("concurrent.json"); + let manager = CacheManager { cache_path.clone() }; + + // Initial cache + let cache = BootstrapCache { + last_updated: Utc::now(), + peers: vec![], + }; + manager.write_cache(&cache).unwrap(); + + // Try to read while holding write lock + let file = OpenOptions::new() + .write(true) + .open(&cache_path) + .unwrap(); + file.lock_exclusive().unwrap(); + + // This should fail with a lock error + match manager.read_cache() { + Err(Error::LockError) => (), + other => panic!("Expected LockError, got {:?}", other), + } + + // Release lock + file.unlock().unwrap(); + } + + #[test] + fn test_cache_file_permissions() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("permissions.json"); + let manager = CacheManager { cache_path: cache_path.clone() }; + + // Write initial cache + let cache = BootstrapCache { + last_updated: Utc::now(), + peers: vec![], + }; + manager.write_cache(&cache).unwrap(); + + // Make file read-only + let mut perms = fs::metadata(&cache_path).unwrap().permissions(); + perms.set_readonly(true); + fs::set_permissions(&cache_path, perms).unwrap(); + + // Try to write to read-only file + match manager.write_cache(&cache) { + Err(Error::Io(_)) => (), + other => panic!("Expected Io error, got {:?}", other), + } + } } diff --git a/bootstrap_cache/src/cache_store.rs b/bootstrap_cache/src/cache_store.rs index b14551e458..b90e91e309 100644 --- a/bootstrap_cache/src/cache_store.rs +++ b/bootstrap_cache/src/cache_store.rs @@ -57,27 +57,42 @@ impl CacheStore { // Create cache directory if it doesn't exist if let Some(parent) = cache_path.parent() { - fs::create_dir_all(parent)?; + fs::create_dir_all(parent) + .map_err(|e| Error::Io(e))?; } let data = if cache_path.exists() { - // Load existing cache - let mut file = OpenOptions::new() - .read(true) - .open(&cache_path) - .map_err(|e| Error::IoError(format!("Failed to open cache file: {}", e)))?; - - // Acquire shared lock for reading - Self::acquire_shared_lock(&file).await?; - - let mut contents = String::new(); - file.read_to_string(&mut contents) - .map_err(|e| Error::IoError(format!("Failed to read cache file: {}", e)))?; - - // Lock will be automatically released when file is dropped - serde_json::from_str(&contents) - .map_err(|e| Error::IoError(format!("Failed to parse cache data: {}", e)))? + match Self::load_cache_data(&cache_path).await { + Ok(data) => { + // If cache data exists but has no peers and file is not read-only, + // fallback to default + let is_readonly = cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if data.peers.is_empty() && !is_readonly { + Self::fallback_to_default(&config).await? + } else { + // Ensure we don't exceed max_peers + let mut filtered_data = data; + if filtered_data.peers.len() > config.max_peers { + let peers: Vec<_> = filtered_data.peers.into_iter().collect(); + filtered_data.peers = peers.into_iter() + .take(config.max_peers) + .map(|(k, v)| (k, v)) + .collect(); + } + filtered_data + } + } + Err(e) => { + tracing::warn!("Failed to load cache data: {}", e); + // If we can't read or parse the cache file, return empty cache + CacheData::default() + } + } } else { + // If cache file doesn't exist, fallback to default Self::fallback_to_default(&config).await? }; @@ -87,33 +102,80 @@ impl CacheStore { data: Arc::new(RwLock::new(data)), }; - // Clean up any stale peers on startup - store.cleanup_stale_peers().await?; + // Only clean up stale peers if the file is not read-only + let is_readonly = store.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if !is_readonly { + if let Err(e) = store.cleanup_stale_peers().await { + tracing::warn!("Failed to clean up stale peers: {}", e); + } + } Ok(store) } async fn fallback_to_default(config: &crate::BootstrapConfig) -> Result { + let mut data = CacheData { + peers: std::collections::HashMap::new(), + last_updated: SystemTime::now(), + version: default_version(), + }; + + // If no endpoints are configured, just return empty cache if config.endpoints.is_empty() { - return Ok(CacheData { - peers: std::collections::HashMap::new(), - last_updated: SystemTime::now(), - version: default_version(), - }); + return Ok(data); } - + + // Try to discover peers from configured endpoints let discovery = InitialPeerDiscovery::with_endpoints(config.endpoints.clone()); - - let peers = discovery.fetch_peers().await?; - let mut peer_map = std::collections::HashMap::new(); - for peer in peers { - peer_map.insert(peer.addr.to_string(), peer); + match discovery.fetch_peers().await { + Ok(peers) => { + // Only add up to max_peers from the discovered peers + for peer in peers.into_iter().take(config.max_peers) { + data.peers.insert(peer.addr.to_string(), peer); + } + Ok(data) + } + Err(e) => { + tracing::warn!("Failed to fetch peers from endpoints: {}", e); + Ok(data) // Return empty cache on error + } + } + } + + async fn load_cache_data(cache_path: &PathBuf) -> Result { + // Try to open the file with read permissions + let mut file = match OpenOptions::new().read(true).open(cache_path) { + Ok(f) => f, + Err(e) => { + tracing::warn!("Failed to open cache file: {}", e); + return Err(Error::from(e)); + } + }; + + // Acquire shared lock for reading + if let Err(e) = Self::acquire_shared_lock(&file).await { + tracing::warn!("Failed to acquire shared lock: {}", e); + return Err(Error::from(e)); + } + + // Read the file contents + let mut contents = String::new(); + if let Err(e) = file.read_to_string(&mut contents) { + tracing::warn!("Failed to read cache file: {}", e); + return Err(Error::from(e)); + } + + // Parse the cache data + match serde_json::from_str::(&contents) { + Ok(data) => Ok(data), + Err(e) => { + tracing::warn!("Failed to parse cache data: {}", e); + Err(Error::Io(io::Error::new(io::ErrorKind::InvalidData, e))) + } } - Ok(CacheData { - peers: peer_map, - last_updated: SystemTime::now(), - version: default_version(), - }) } pub async fn get_peers(&self) -> Vec { @@ -130,8 +192,9 @@ impl CacheStore { .cloned() .collect(); - // If we have no reliable peers, try to refresh from default endpoints - if reliable_peers.is_empty() { + // If we have no reliable peers and the cache file is not read-only, + // try to refresh from default endpoints + if reliable_peers.is_empty() && !self.cache_path.metadata().map(|m| m.permissions().readonly()).unwrap_or(false) { drop(data); if let Ok(new_data) = Self::fallback_to_default(&self.config).await { let mut data = self.data.write().await; @@ -147,34 +210,89 @@ impl CacheStore { } pub async fn update_peer_status(&self, addr: &str, success: bool) -> Result<()> { + // Check if the file is read-only before attempting to modify + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot update peer status: cache file is read-only"); + return Ok(()); + } + let mut data = self.data.write().await; match addr.parse::() { - Ok(_) => { + Ok(addr) => { let peer = data.peers.entry(addr.to_string()).or_insert_with(|| { - BootstrapPeer::new(addr.parse().expect("Already validated")) + BootstrapPeer::new(addr) }); peer.update_status(success); self.save_to_disk(&data).await?; Ok(()) } - Err(e) => Err(Error::InvalidMultiaddr(e)), + Err(e) => Err(Error::from(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid multiaddr: {}", e) + ))), } } pub async fn add_peer(&self, addr: Multiaddr) -> Result<()> { + // Check if the cache file is read-only before attempting any modifications + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot add peer: cache file is read-only"); + return Ok(()); + } + let mut data = self.data.write().await; let addr_str = addr.to_string(); - if !data.peers.contains_key(&addr_str) { - data.peers.insert(addr_str, BootstrapPeer::new(addr)); + tracing::debug!("Adding peer {}, current peers: {}", addr_str, data.peers.len()); + + // If the peer already exists, just update its last_seen time + if let Some(peer) = data.peers.get_mut(&addr_str) { + tracing::debug!("Updating existing peer {}", addr_str); + peer.last_seen = SystemTime::now(); + return self.save_to_disk(&data).await; + } + + // Only add new peers if we haven't reached max_peers + if data.peers.len() < self.config.max_peers { + tracing::debug!("Adding new peer {} (under max_peers limit)", addr_str); + data.peers.insert(addr_str.clone(), BootstrapPeer::new(addr)); self.save_to_disk(&data).await?; + } else { + // If we're at max_peers, replace the oldest peer + if let Some((oldest_addr, oldest_peer)) = data.peers.iter() + .min_by_key(|(_, peer)| peer.last_seen) { + tracing::debug!("Replacing oldest peer {} (last seen: {:?}) with new peer {}", + oldest_addr, oldest_peer.last_seen, addr_str); + let oldest_addr = oldest_addr.clone(); + data.peers.remove(&oldest_addr); + data.peers.insert(addr_str.clone(), BootstrapPeer::new(addr)); + self.save_to_disk(&data).await?; + } } Ok(()) } pub async fn remove_peer(&self, addr: &str) -> Result<()> { + // Check if the file is read-only before attempting to modify + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot remove peer: cache file is read-only"); + return Ok(()); + } + let mut data = self.data.write().await; data.peers.remove(addr); self.save_to_disk(&data).await?; @@ -182,6 +300,16 @@ impl CacheStore { } pub async fn cleanup_unreliable_peers(&self) -> Result<()> { + // Check if the file is read-only before attempting to modify + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot cleanup unreliable peers: cache file is read-only"); + return Ok(()); + } + let mut data = self.data.write().await; let unreliable_peers: Vec = data .peers @@ -199,6 +327,16 @@ impl CacheStore { } pub async fn cleanup_stale_peers(&self) -> Result<()> { + // Check if the file is read-only before attempting to modify + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot cleanup stale peers: cache file is read-only"); + return Ok(()); + } + let mut data = self.data.write().await; let stale_peers: Vec = data .peers @@ -221,16 +359,39 @@ impl CacheStore { Ok(()) } + pub async fn save_to_disk(&self, data: &CacheData) -> Result<()> { + // Check if the file is read-only before attempting to write + let is_readonly = self.cache_path.metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false); + + if is_readonly { + tracing::warn!("Cannot save to disk: cache file is read-only"); + return Ok(()); + } + + match self.atomic_write(data).await { + Ok(_) => Ok(()), + Err(e) => { + tracing::error!("Failed to save cache to disk: {}", e); + Err(e) + } + } + } + async fn acquire_shared_lock(file: &File) -> Result<()> { let file = file.try_clone() - .map_err(|e| Error::IoError(format!("Failed to clone file handle: {}", e)))?; + .map_err(|e| Error::from(e))?; tokio::task::spawn_blocking(move || { file.try_lock_shared() - .map_err(|e| Error::IoError(format!("Failed to acquire shared lock: {}", e))) + .map_err(|e| Error::from(e)) }) .await - .map_err(|e| Error::IoError(format!("Failed to spawn blocking task: {}", e)))? + .map_err(|e| Error::from(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to spawn blocking task: {}", e) + )))? } async fn acquire_exclusive_lock(file: &File) -> Result<()> { @@ -238,37 +399,36 @@ impl CacheStore { let max_attempts = 5; let mut attempts = 0; - while attempts < max_attempts { + loop { match file.try_lock_exclusive() { Ok(_) => return Ok(()), + Err(_) if attempts >= max_attempts => { + return Err(Error::LockError); + } Err(e) if e.kind() == io::ErrorKind::WouldBlock => { attempts += 1; - if attempts == max_attempts { - return Err(Error::IoError("Failed to acquire exclusive lock after max attempts".into())); - } tokio::time::sleep(backoff).await; backoff *= 2; } - Err(e) => return Err(Error::IoError(format!("Failed to acquire exclusive lock: {}", e))), + Err(_) => return Err(Error::LockError), } } - Ok(()) } async fn atomic_write(&self, data: &CacheData) -> Result<()> { // Create parent directory if it doesn't exist if let Some(parent) = self.cache_path.parent() { fs::create_dir_all(parent) - .map_err(|e| Error::IoError(format!("Failed to create directory: {}", e)))?; + .map_err(|e| Error::from(e))?; } // Create a temporary file in the same directory as the cache file let temp_file = NamedTempFile::new() - .map_err(|e| Error::IoError(format!("Failed to create temp file: {}", e)))?; + .map_err(|e| Error::from(e))?; // Write data to temporary file serde_json::to_writer_pretty(&temp_file, &data) - .map_err(|e| Error::IoError(format!("Failed to write to temp file: {}", e)))?; + .map_err(|e| Error::from(e))?; // Open the target file with proper permissions let file = OpenOptions::new() @@ -276,23 +436,21 @@ impl CacheStore { .create(true) .truncate(true) .open(&self.cache_path) - .map_err(|e| Error::IoError(format!("Failed to open cache file: {}", e)))?; + .map_err(|e| Error::from(e))?; // Acquire exclusive lock Self::acquire_exclusive_lock(&file).await?; // Perform atomic rename temp_file.persist(&self.cache_path) - .map_err(|e| Error::IoError(format!("Failed to persist cache file: {}", e)))?; + .map_err(|e| Error::from(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to persist cache file: {}", e) + )))?; // Lock will be automatically released when file is dropped Ok(()) } - - async fn save_to_disk(&self, data: &CacheData) -> Result<()> { - self.atomic_write(data).await?; - Ok(()) - } } #[cfg(test)] diff --git a/bootstrap_cache/src/error.rs b/bootstrap_cache/src/error.rs index ab8c656e77..fecaa86b19 100644 --- a/bootstrap_cache/src/error.rs +++ b/bootstrap_cache/src/error.rs @@ -10,29 +10,26 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("Failed to acquire or release file lock")] + LockError, + #[error("No peers found: {0}")] NoPeersFound(String), - - #[error("Invalid multiaddr: {0}")] - InvalidMultiaddr(#[from] libp2p::multiaddr::Error), - - #[error("HTTP request failed: {0}")] - HttpRequest(#[from] reqwest::Error), - - #[error("JSON parsing error: {0}")] - JsonParsing(#[from] serde_json::Error), - - #[error("I/O error: {0}")] - Io(#[from] std::io::Error), - - #[error("IO error: {0}")] - IoError(String), - - #[error("Request timed out: {0}")] + + #[error("Cache file is corrupted: {0}")] + CacheCorrupted(serde_json::Error), + + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + + #[error("Timeout error: {0}")] Timeout(#[from] tokio::time::error::Elapsed), - - #[error("Failed to acquire file lock")] - LockError, } pub type Result = std::result::Result; diff --git a/bootstrap_cache/tests/cache_tests.rs b/bootstrap_cache/tests/cache_tests.rs new file mode 100644 index 0000000000..3b03e80666 --- /dev/null +++ b/bootstrap_cache/tests/cache_tests.rs @@ -0,0 +1,208 @@ +use bootstrap_cache::{BootstrapConfig, CacheStore}; +use libp2p::Multiaddr; +use std::time::Duration; +use tempfile::TempDir; +use tokio::time::sleep; + +#[tokio::test] +async fn test_cache_store_operations() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + // Create cache store with config + let config = BootstrapConfig { + cache_file_path: cache_path.clone(), + ..Default::default() + }; + let cache_store = CacheStore::new(config).await?; + + // Test adding and retrieving peers + let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".parse()?; + cache_store.add_peer(addr.clone()).await?; + cache_store.update_peer_status(&addr.to_string(), true).await?; + + let peers = cache_store.get_reliable_peers().await; + assert!(!peers.is_empty(), "Cache should contain the added peer"); + assert!(peers.iter().any(|p| p.addr == addr), "Cache should contain our specific peer"); + + Ok(()) +} + +#[tokio::test] +async fn test_cache_persistence() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + // Create first cache store + let config = BootstrapConfig { + cache_file_path: cache_path.clone(), + ..Default::default() + }; + let cache_store1 = CacheStore::new(config.clone()).await?; + + // Add a peer and mark it as reliable + let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".parse()?; + cache_store1.add_peer(addr.clone()).await?; + cache_store1.update_peer_status(&addr.to_string(), true).await?; + + // Create a new cache store with the same path + let cache_store2 = CacheStore::new(config).await?; + let peers = cache_store2.get_reliable_peers().await; + + assert!(!peers.is_empty(), "Cache should persist across instances"); + assert!(peers.iter().any(|p| p.addr == addr), "Specific peer should persist"); + + Ok(()) +} + +#[tokio::test] +async fn test_cache_reliability_tracking() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + let config = BootstrapConfig { + cache_file_path: cache_path, + ..Default::default() + }; + let cache_store = CacheStore::new(config).await?; + + let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".parse()?; + cache_store.add_peer(addr.clone()).await?; + + // Test successful connections + for _ in 0..3 { + cache_store.update_peer_status(&addr.to_string(), true).await?; + } + + let peers = cache_store.get_reliable_peers().await; + assert!(peers.iter().any(|p| p.addr == addr), "Peer should be reliable after successful connections"); + + // Test failed connections + for _ in 0..5 { + cache_store.update_peer_status(&addr.to_string(), false).await?; + } + + let peers = cache_store.get_reliable_peers().await; + assert!(!peers.iter().any(|p| p.addr == addr), "Peer should not be reliable after failed connections"); + + Ok(()) +} + +#[tokio::test] +async fn test_cache_max_peers() -> Result<(), Box> { + let _ = tracing_subscriber::fmt() + .with_env_filter("bootstrap_cache=debug") + .try_init(); + + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + // Create cache with small max_peers limit + let config = BootstrapConfig { + cache_file_path: cache_path, + max_peers: 2, + ..Default::default() + }; + let cache_store = CacheStore::new(config).await?; + + // Add three peers with distinct timestamps + let mut addresses = Vec::new(); + for i in 1..=3 { + let addr: Multiaddr = format!("/ip4/127.0.0.1/udp/808{}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{}", i, i).parse()?; + addresses.push(addr.clone()); + cache_store.add_peer(addr).await?; + // Add a delay to ensure distinct timestamps + sleep(Duration::from_millis(100)).await; + } + + let peers = cache_store.get_peers().await; + assert_eq!(peers.len(), 2, "Cache should respect max_peers limit"); + + // Get the addresses of the peers we have + let peer_addrs: Vec<_> = peers.iter().map(|p| p.addr.to_string()).collect(); + tracing::debug!("Final peers: {:?}", peer_addrs); + + // We should have the two most recently added peers (addresses[1] and addresses[2]) + for peer in peers { + let addr_str = peer.addr.to_string(); + assert!(addresses[1..].iter().any(|a| a.to_string() == addr_str), + "Should have one of the two most recent peers, got {}", addr_str); + } + + Ok(()) +} + +#[tokio::test] +async fn test_cache_concurrent_access() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + let config = BootstrapConfig { + cache_file_path: cache_path, + ..Default::default() + }; + let cache_store = CacheStore::new(config).await?; + let cache_store_clone = cache_store.clone(); + + // Create multiple addresses + let addrs: Vec = (1..=5) + .map(|i| format!("/ip4/127.0.0.1/udp/808{}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{}", i, i).parse().unwrap()) + .collect(); + + // Spawn a task that adds peers + let add_task = tokio::spawn(async move { + for addr in addrs { + if let Err(e) = cache_store.add_peer(addr).await { + eprintln!("Error adding peer: {}", e); + } + sleep(Duration::from_millis(10)).await; + } + }); + + // Spawn another task that reads peers + let read_task = tokio::spawn(async move { + for _ in 0..10 { + let _ = cache_store_clone.get_peers().await; + sleep(Duration::from_millis(5)).await; + } + }); + + // Wait for both tasks to complete + tokio::try_join!(add_task, read_task)?; + + Ok(()) +} + +#[tokio::test] +async fn test_cache_file_corruption() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let cache_path = temp_dir.path().join("cache.json"); + + // Create cache with some peers + let config = BootstrapConfig { + cache_file_path: cache_path.clone(), + ..Default::default() + }; + let cache_store = CacheStore::new(config.clone()).await?; + + // Add a peer + let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER1".parse()?; + cache_store.add_peer(addr.clone()).await?; + + // Corrupt the cache file + tokio::fs::write(&cache_path, "invalid json content").await?; + + // Create a new cache store - it should handle the corruption gracefully + let new_cache_store = CacheStore::new(config).await?; + let peers = new_cache_store.get_peers().await; + assert!(peers.is_empty(), "Cache should be empty after corruption"); + + // Should be able to add peers again + new_cache_store.add_peer(addr).await?; + let peers = new_cache_store.get_peers().await; + assert_eq!(peers.len(), 1, "Should be able to add peers after corruption"); + + Ok(()) +} + +