Skip to content

Commit

Permalink
refactor(bootstrap_cache): improve peer source handling and test netw…
Browse files Browse the repository at this point in the history
…ork isolation

* Refactor CacheStore::from_args to handle peer sources more consistently
* Ensure test network mode is properly isolated from cache system
* Fix default behavior to use URL endpoint when no peers provided
* Add proper handling for local and first node modes
* Prevent cache operations when in test network mode

This change ensures that:
- Test network peers are isolated from cache operations
- Default behavior (no args) correctly uses URL endpoints
- Local and first node modes return empty stores
- Explicit peers take precedence over default behavior
- Cache operations only occur in non-test network mode

The changes make the peer source handling more predictable and maintain
proper isolation between different network modes (test, local, default).
  • Loading branch information
dirvine committed Nov 24, 2024
1 parent f935470 commit 02a8bdd
Show file tree
Hide file tree
Showing 6 changed files with 1,074 additions and 60 deletions.
12 changes: 12 additions & 0 deletions bootstrap_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@ tempfile = "3.8.1"
thiserror = "1.0"
tokio = { version = "1.0", features = ["full", "sync"] }
tracing = "0.1"
url = "2.4.0"

[dev-dependencies]
wiremock = "0.5"
tokio = { version = "1.0", features = ["full", "test-util"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints.rust]
unsafe_code = "forbid"
missing_docs = "warn"

[lints.clippy]
all = "warn"
pedantic = "warn"
nursery = "warn"
unwrap_used = "warn"
missing_docs_in_private_items = "warn"
199 changes: 157 additions & 42 deletions bootstrap_cache/src/cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tempfile::NamedTempFile;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};

const PEER_EXPIRY_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours

Expand Down Expand Up @@ -152,6 +153,119 @@ impl CacheStore {
Ok(store)
}

pub async fn new_without_init(config: crate::BootstrapConfig) -> Result<Self> {
tracing::info!("Creating new CacheStore with config: {:?}", config);
let cache_path = config.cache_file_path.clone();
let config = Arc::new(config);

// Create cache directory if it doesn't exist
if let Some(parent) = cache_path.parent() {
tracing::info!("Attempting to create cache directory at {:?}", parent);
// Try to create the directory
match fs::create_dir_all(parent) {
Ok(_) => {
tracing::info!("Successfully created cache directory");
}
Err(e) => {
tracing::warn!("Failed to create cache directory at {:?}: {}", parent, e);
// Try user's home directory as fallback
if let Some(home) = dirs::home_dir() {
let user_path = home.join(".safe").join("bootstrap_cache.json");
tracing::info!("Falling back to user directory: {:?}", user_path);
if let Some(user_parent) = user_path.parent() {
if let Err(e) = fs::create_dir_all(user_parent) {
tracing::error!("Failed to create user cache directory: {}", e);
return Err(Error::Io(e));
}
tracing::info!("Successfully created user cache directory");
}
let future = Self::new_without_init(crate::BootstrapConfig::with_cache_path(user_path));
return Box::pin(future).await;
}
}
}
}

let store = Self {
cache_path,
config,
data: Arc::new(RwLock::new(CacheData::default())),
};

tracing::info!("Successfully created CacheStore");
Ok(store)
}

pub async fn init(&self) -> Result<()> {
let mut data = if self.cache_path.exists() {
tracing::info!("Cache file exists at {:?}, attempting to load", self.cache_path);
match Self::load_cache_data(&self.cache_path).await {
Ok(data) => {
tracing::info!("Successfully loaded cache data with {} peers", data.peers.len());
// If cache data exists but has no peers and file is not read-only,
// fallback to default
let is_readonly = self.cache_path
.metadata()
.map(|m| m.permissions().readonly())
.unwrap_or(false);

if data.peers.is_empty() && !is_readonly {
tracing::info!("Cache is empty and not read-only, falling back to default");
Self::fallback_to_default(&self.config).await?
} else {
// Ensure we don't exceed max_peers
let mut filtered_data = data;
if filtered_data.peers.len() > self.config.max_peers {
tracing::info!(
"Trimming cache from {} to {} peers",
filtered_data.peers.len(),
self.config.max_peers
);
let peers: Vec<_> = filtered_data.peers.into_iter().collect();
filtered_data.peers = peers
.into_iter()
.take(self.config.max_peers)
.collect();
}
filtered_data
}
}
Err(e) => {
tracing::warn!("Failed to load cache data: {}", e);
// If we can't read or parse the cache file, fallback to default
Self::fallback_to_default(&self.config).await?
}
}
} else {
tracing::info!("Cache file does not exist at {:?}, falling back to default", self.cache_path);
// If cache file doesn't exist, fallback to default
Self::fallback_to_default(&self.config).await?
};

// Only clean up stale peers if the file is not read-only
let is_readonly = self.cache_path
.metadata()
.map(|m| m.permissions().readonly())
.unwrap_or(false);

if !is_readonly {
// Clean up stale peers
let now = SystemTime::now();
data.peers.retain(|_, peer| {
if let Ok(duration) = now.duration_since(peer.last_seen) {
duration < PEER_EXPIRY_DURATION
} else {
false
}
});
}

// Update the store's data
*self.data.write().await = data;

Ok(())
}

async fn fallback_to_default(config: &crate::BootstrapConfig) -> Result<CacheData> {
tracing::info!("Falling back to default peers from endpoints");
let mut data = CacheData {
Expand Down Expand Up @@ -313,59 +427,35 @@ impl CacheStore {
}

pub async fn add_peer(&self, addr: Multiaddr) -> Result<()> {
// Check if the cache file is read-only before attempting any modifications
let is_readonly = self
.cache_path
.metadata()
.map(|m| m.permissions().readonly())
.unwrap_or(false);

if is_readonly {
tracing::warn!("Cannot add peer: cache file is read-only");
return Ok(());
}

let mut data = self.data.write().await;
let addr_str = addr.to_string();

tracing::debug!(
"Adding peer {}, current peers: {}",
addr_str,
data.peers.len()
);

// If the peer already exists, just update its last_seen time
if let Some(peer) = data.peers.get_mut(&addr_str) {
tracing::debug!("Updating existing peer {}", addr_str);
peer.last_seen = SystemTime::now();
return self.save_to_disk(&data).await;
// Check if we already have this peer
if data.peers.contains_key(&addr_str) {
debug!("Updating existing peer {}", addr_str);
if let Some(peer) = data.peers.get_mut(&addr_str) {
peer.last_seen = SystemTime::now();
}
return Ok(());
}

// Only add new peers if we haven't reached max_peers
if data.peers.len() < self.config.max_peers {
tracing::debug!("Adding new peer {} (under max_peers limit)", addr_str);
data.peers
.insert(addr_str.clone(), BootstrapPeer::new(addr));
self.save_to_disk(&data).await?;
} else {
// If we're at max_peers, replace the oldest peer
if let Some((oldest_addr, oldest_peer)) =
data.peers.iter().min_by_key(|(_, peer)| peer.last_seen)
// If we're at max peers, remove the oldest peer
if data.peers.len() >= self.config.max_peers {
debug!("At max peers limit ({}), removing oldest peer", self.config.max_peers);
if let Some((oldest_addr, _)) = data.peers
.iter()
.min_by_key(|(_, peer)| peer.last_seen)
{
tracing::debug!(
"Replacing oldest peer {} (last seen: {:?}) with new peer {}",
oldest_addr,
oldest_peer.last_seen,
addr_str
);
let oldest_addr = oldest_addr.clone();
data.peers.remove(&oldest_addr);
data.peers
.insert(addr_str.clone(), BootstrapPeer::new(addr));
self.save_to_disk(&data).await?;
}
}

// Add the new peer
debug!("Adding new peer {} (under max_peers limit)", addr_str);
data.peers.insert(addr_str, BootstrapPeer::new(addr));
self.save_to_disk(&data).await?;

Ok(())
}

Expand Down Expand Up @@ -542,6 +632,31 @@ impl CacheStore {
// Lock will be automatically released when file is dropped
Ok(())
}

/// Clear all peers from the cache
pub async fn clear_peers(&self) -> Result<()> {
let mut data = self.data.write().await;
data.peers.clear();
Ok(())
}

/// Save the current cache to disk
pub async fn save_cache(&self) -> Result<()> {
let data = self.data.read().await;
let temp_file = NamedTempFile::new()?;
let file = File::create(&temp_file)?;
file.lock_exclusive()?;

serde_json::to_writer_pretty(&file, &*data)?;
file.sync_all()?;
file.unlock()?;

// Atomically replace the cache file
temp_file.persist(&self.cache_path)?;
info!("Successfully wrote cache file at {:?}", self.cache_path);

Ok(())
}
}

#[cfg(test)]
Expand Down
14 changes: 6 additions & 8 deletions bootstrap_cache/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ pub enum Error {
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Request error: {0}")]
Request(#[from] reqwest::Error),
#[error("Failed to acquire or release file lock")]
LockError,
#[error("Cache file is corrupted: {0}")]
CacheCorrupted(serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("Timeout error: {0}")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Failed to persist file: {0}")]
Persist(#[from] tempfile::PersistError),
#[error("Failed to acquire or release file lock")]
LockError,
#[error("Circuit breaker open for endpoint: {0}")]
CircuitBreakerOpen(String),
#[error("Endpoint temporarily unavailable: {0}")]
EndpointUnavailable(String),
#[error("Request failed: {0}")]
RequestFailed(String),
#[error("Request timed out")]
Expand Down
Loading

0 comments on commit 02a8bdd

Please sign in to comment.