diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index ff35567ff388..d162ab5c65b3 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -1159,9 +1159,12 @@ impl Service { let (waiters, response_shards) = { let mut locked = self.inner.write().unwrap(); - let (_nodes, tenants, scheduler) = locked.parts_mut(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); let mut response_shards = Vec::new(); + let mut schcedule_error = None; for tenant_shard_id in create_ids { tracing::info!("Creating shard {tenant_shard_id}..."); @@ -1198,23 +1201,20 @@ impl Service { continue; } Entry::Vacant(entry) => { - let mut state = TenantState::new( + let state = entry.insert(TenantState::new( tenant_shard_id, ShardIdentity::from_params( tenant_shard_id.shard_number, &create_req.shard_parameters, ), placement_policy.clone(), - ); + )); state.generation = initial_generation; state.config = create_req.config.clone(); - - state.schedule(scheduler).map_err(|e| { - ApiError::Conflict(format!( - "Failed to schedule shard {tenant_shard_id}: {e}" - )) - })?; + if let Err(e) = state.schedule(scheduler) { + schcedule_error = Some(e); + } // Only include shards in result if we are attaching: the purpose // of the response is to tell the caller where the shards are attached. @@ -1228,24 +1228,27 @@ impl Service { generation: generation.into().unwrap(), }); } - entry.insert(state) } }; } - // Take a snapshot of pageservers - let pageservers = locked.nodes.clone(); - - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); + // If we failed to schedule shards, then they are still created in the controller, + // but we return an error to the requester to avoid a silent failure when someone + // tries to e.g. create a tenant whose placement policy requires more nodes than + // are present in the system. We do this here rather than in the above loop, to + // avoid situations where we only create a subset of shards in the tenant. + if let Some(e) = schcedule_error { + return Err(ApiError::Conflict(format!( + "Failed to schedule shard(s): {e}" + ))); + } - let waiters = locked - .tenants + let waiters = tenants .range_mut(TenantShardId::tenant_range(tenant_id)) .filter_map(|(_shard_id, shard)| { shard.maybe_reconcile( result_tx.clone(), - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, @@ -2516,6 +2519,19 @@ impl Service { let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); + let Some(node) = nodes.get(&migrate_req.node_id) else { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Node {} not found", + migrate_req.node_id + ))); + }; + + if node.availability != NodeAvailability::Active { + // Warn but proceed: the caller may intend to manually adjust the placement of + // a shard even if the node is down, e.g. if intervening during an incident. + tracing::warn!("Migrating to an unavailable node ({})", node.id); + } + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant shard not found").into(), @@ -2645,6 +2661,18 @@ impl Service { .map(|t| t.to_persistent()) .collect::>(); + // This method can only validate the state of an idle system: if a reconcile is in + // progress, fail out early to avoid giving false errors on state that won't match + // between database and memory under a ReconcileResult is processed. + for t in locked.tenants.values() { + if t.reconciler.is_some() { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard {} reconciliation in progress", + t.tenant_shard_id + ))); + } + } + (expect_nodes, expect_shards) }; diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index e14a2f22cf1a..6c46b83622cc 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -976,7 +976,7 @@ impl LayerInner { } self.consecutive_failures.store(0, Ordering::Relaxed); - tracing::info!("on-demand download successful"); + tracing::info!(size=%self.desc.file_size, "on-demand download successful"); Ok(permit) }