Skip to content

Commit

Permalink
Support offloaded timelines during shard split (#9489)
Browse files Browse the repository at this point in the history
Before, we didn't copy over the `index-part.json` of offloaded timelines
to the new shard's location, resulting in the new shard not knowing the
timeline even exists.

In #9444, we copy over the manifest, but we also need to do this for
`index-part.json`.

As the operations to do are mostly the same between offloaded and
non-offloaded timelines, we can iterate over all of them in the same
loop, after the introduction of a `TimelineOrOffloadedArcRef` type to
generalize over the two cases. This is analogous to the deletion code
added in #8907.

The added test also ensures that the sharded archival config endpoint
works, something that has not yet been ensured by tests.

Part of #8088
  • Loading branch information
arpad-m authored Oct 25, 2024
1 parent b3bedda commit 4d9036b
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 31 deletions.
113 changes: 86 additions & 27 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,25 +597,29 @@ pub enum TimelineOrOffloaded {
}

impl TimelineOrOffloaded {
pub fn tenant_shard_id(&self) -> TenantShardId {
pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
TimelineOrOffloaded::Timeline(timeline) => {
TimelineOrOffloadedArcRef::Timeline(timeline)
}
TimelineOrOffloaded::Offloaded(offloaded) => {
TimelineOrOffloadedArcRef::Offloaded(offloaded)
}
}
}
pub fn tenant_shard_id(&self) -> TenantShardId {
self.arc_ref().tenant_shard_id()
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
}
self.arc_ref().timeline_id()
}
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
match self {
TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress,
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
}
}
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
Expand All @@ -632,6 +636,38 @@ impl TimelineOrOffloaded {
}
}

pub enum TimelineOrOffloadedArcRef<'a> {
Timeline(&'a Arc<Timeline>),
Offloaded(&'a Arc<OffloadedTimeline>),
}

impl TimelineOrOffloadedArcRef<'_> {
pub fn tenant_shard_id(&self) -> TenantShardId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
}
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
}
}
}

impl<'a> From<&'a Arc<Timeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<Timeline>) -> Self {
Self::Timeline(timeline)
}
}

impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<OffloadedTimeline>) -> Self {
Self::Offloaded(timeline)
}
}

#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
Expand Down Expand Up @@ -2940,33 +2976,58 @@ impl Tenant {
&self,
child_shards: &Vec<TenantShardId>,
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let (timelines, offloaded) = {
let timelines = self.timelines.lock().unwrap();
let offloaded = self.timelines_offloaded.lock().unwrap();
(timelines.clone(), offloaded.clone())
};
let timelines_iter = timelines
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from)
.chain(
offloaded
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from),
);
for timeline in timelines_iter {
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.

// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
let timeline_id = timeline.timeline_id();

if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline {
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(%timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
}

let remote_client = match timeline {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloadedArcRef::Offloaded(offloaded) => {
let remote_client = self
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
Arc::new(remote_client)
}
};

// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
timeline.remote_client.shutdown().await;
tracing::info!(%timeline_id, "Shutting down remote storage client");
remote_client.shutdown().await;

// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
let result = timeline.remote_client
tracing::info!(%timeline_id, "Downloading index_part from parent");
let result = remote_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))
.await?;
let index_part = match result {
MaybeDeletedIndexPart::Deleted(_) => {
Expand All @@ -2976,11 +3037,11 @@ impl Tenant {
};

for child_shard in child_shards {
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(
&self.remote_storage,
child_shard,
&timeline.timeline_id,
&timeline_id,
self.generation,
&index_part,
&self.cancel,
Expand All @@ -2989,8 +3050,6 @@ impl Tenant {
}
}

// TODO: also copy index files of offloaded timelines

let tenant_manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,14 @@ impl RemoteTimelineClient {
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return,
UploadQueue::Stopped(_) => {
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Uninitialized => {
// transition into Stopped state
self.stop_impl(&mut guard);
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Initialized(ref mut init) => init,
Expand Down
27 changes: 26 additions & 1 deletion test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@

from fixtures import overlayfs
from fixtures.auth_tokens import AuthKeys, TokenScope
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
from fixtures.common_types import (
Lsn,
NodeId,
TenantId,
TenantShardId,
TimelineArchivalState,
TimelineId,
)
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
Expand Down Expand Up @@ -2132,6 +2139,24 @@ def step_down(self):
response.raise_for_status()
return response.json()

def timeline_archival_config(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
state: TimelineArchivalState,
):
config = {"state": state.value}
log.info(
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
)
res = self.request(
"PUT",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
json=config,
headers=self.headers(TokenScope.ADMIN),
)
return res.json()

def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
Expand Down
25 changes: 25 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ def from_json(cls, d: dict[str, Any]) -> TenantConfig:
)


@dataclass
class TimelinesInfoAndOffloaded:
timelines: list[dict[str, Any]]
offloaded: list[dict[str, Any]]

@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded:
return TimelinesInfoAndOffloaded(
timelines=d["timelines"],
offloaded=d["offloaded"],
)


class PageserverHttpClient(requests.Session, MetricsGetter):
def __init__(
self,
Expand Down Expand Up @@ -464,6 +477,18 @@ def timeline_list(
assert isinstance(res_json, list)
return res_json

def timeline_and_offloaded_list(
self,
tenant_id: Union[TenantId, TenantShardId],
) -> TimelinesInfoAndOffloaded:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded",
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return TimelinesInfoAndOffloaded.from_json(res_json)

def timeline_create(
self,
pg_version: PgVersion,
Expand Down
Loading

1 comment on commit 4d9036b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5247 tests run: 5030 passed, 3 failed, 214 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_physical_replication[debug-pg17] or test_tenant_import[debug-pg17-None-real_s3] or test_many_timelines[release-pg17]"

Test coverage report is not available

The comment gets automatically updated with the latest test results
4d9036b at 2024-10-25T11:47:56.178Z :recycle:

Please sign in to comment.