Skip to content

Commit

Permalink
remote_storage config: move handling of empty inline table {} to ca…
Browse files Browse the repository at this point in the history
…llers (#8193)

Before this PR, `RemoteStorageConfig::from_toml` would support
deserializing an
empty `{}` TOML inline table to a `None`, otherwise try `Some()`.

We can instead let
* in proxy: let clap derive handle the Option
* in PS & SK: assume that if the field is specified, it must be a valid
  RemtoeStorageConfig

(This PR started with a much simpler goal of factoring out the
`deserialize_item` function because I need that in another PR).
  • Loading branch information
problame authored and VladLazar committed Jul 8, 2024
1 parent 712ba95 commit e7e8fec
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 6 additions & 19 deletions libs/remote_storage/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};

use anyhow::bail;
use aws_sdk_s3::types::StorageClass;
use camino::Utf8PathBuf;

Expand Down Expand Up @@ -176,28 +175,16 @@ fn serialize_storage_class<S: serde::Serializer>(
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);

pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};

if document.is_empty() {
return Ok(None);
}

Ok(Some(toml_edit::de::from_document(document)?))
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
}
}

#[cfg(test)]
mod tests {
use super::*;

fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
Expand All @@ -207,7 +194,7 @@ mod tests {
let input = "local_path = '.'
timeout = '5s'";

let config = parse(input).unwrap().expect("it exists");
let config = parse(input).unwrap();

assert_eq!(
config,
Expand All @@ -229,7 +216,7 @@ timeout = '5s'";
timeout = '7s'
";

let config = parse(toml).unwrap().expect("it exists");
let config = parse(toml).unwrap();

assert_eq!(
config,
Expand Down Expand Up @@ -257,7 +244,7 @@ timeout = '5s'";
timeout = '7s'
";

let config = parse(toml).unwrap().expect("it exists");
let config = parse(toml).unwrap();

assert_eq!(
config,
Expand Down
1 change: 1 addition & 0 deletions libs/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ thiserror.workspace = true
tokio.workspace = true
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
Expand Down
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub mod env;

pub mod poison;

pub mod toml_edit_ext;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
Expand Down
22 changes: 22 additions & 0 deletions libs/utils/src/toml_edit_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("item is not a document")]
ItemIsNotADocument,
#[error(transparent)]
Serde(toml_edit::de::Error),
}

pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::Document = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => return Err(Error::ItemIsNotADocument),
};

toml_edit::de::from_document(document).map_err(Error::Serde)
}
2 changes: 1 addition & 1 deletion pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
let config = RemoteStorageConfig::from_toml(toml_item)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config);
let cancel = CancellationToken::new();
storage
Expand Down
19 changes: 16 additions & 3 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub mod defaults {
#ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
[remote_storage]
#[remote_storage]
"#
);
Expand Down Expand Up @@ -918,7 +918,7 @@ impl PageServerConf {
"http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
"pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
"remote_storage" => {
builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item).context("remote_storage")?))
}
"tenant_config" => {
t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
Expand Down Expand Up @@ -946,7 +946,7 @@ impl PageServerConf {
builder.metric_collection_endpoint(Some(endpoint));
},
"metric_collection_bucket" => {
builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
builder.metric_collection_bucket(Some(RemoteStorageConfig::from_toml(item)?))
}
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
Expand Down Expand Up @@ -1681,6 +1681,19 @@ threshold = "20m"
}
}

#[test]
fn empty_remote_storage_is_error() {
let tempdir = tempdir().unwrap();
let (workdir, _) = prepare_fs(&tempdir).unwrap();
let input = r#"
remote_storage = {}
"#;
let doc = toml_edit::Document::from_str(input).unwrap();
let err = PageServerConf::parse_and_validate(&doc, &workdir)
.expect_err("empty remote_storage field should fail, don't specify it if you want no remote_storage");
assert!(format!("{err}").contains("remote_storage"), "{err}");
}

fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
let tempdir_path = tempdir.path();

Expand Down
9 changes: 4 additions & 5 deletions proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use proxy::usage_metrics;
use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use remote_storage::RemoteStorageConfig;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
Expand Down Expand Up @@ -205,8 +206,8 @@ struct ProxyCliArgs {
/// remote storage configuration for backup metric collection
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, default_value = "{}")]
metric_backup_collection_remote_storage: String,
#[clap(long, value_parser = remote_storage_from_toml)]
metric_backup_collection_remote_storage: Option<RemoteStorageConfig>,
/// chunk size for backup metric collection
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
Expand Down Expand Up @@ -511,9 +512,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: remote_storage_from_toml(
&args.metric_backup_collection_remote_storage,
)?,
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
chunk_size: args.metric_backup_collection_chunk_size,
};

Expand Down
8 changes: 2 additions & 6 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,11 @@ impl FromStr for EndpointCacheConfig {
#[derive(Debug)]
pub struct MetricBackupCollectionConfig {
pub interval: Duration,
pub remote_storage_config: OptRemoteStorageConfig,
pub remote_storage_config: Option<RemoteStorageConfig>,
pub chunk_size: usize,
}

/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
/// runtime type errors from the value parser we use.
pub type OptRemoteStorageConfig = Option<RemoteStorageConfig>;

pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
RemoteStorageConfig::from_toml(&s.parse()?)
}

Expand Down
15 changes: 6 additions & 9 deletions proxy/src/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ use parquet::{
record::RecordWriter,
};
use pq_proto::StartupMessageParams;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
use serde::ser::SerializeMap;
use tokio::{sync::mpsc, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span};
use utils::backoff;

use crate::{
config::{remote_storage_from_toml, OptRemoteStorageConfig},
context::LOG_CHAN_DISCONNECT,
};
use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};

use super::{RequestMonitoring, LOG_CHAN};

Expand All @@ -33,11 +30,11 @@ pub struct ParquetUploadArgs {
/// Storage location to upload the parquet files to.
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
parquet_upload_remote_storage: OptRemoteStorageConfig,
#[clap(long, value_parser = remote_storage_from_toml)]
parquet_upload_remote_storage: Option<RemoteStorageConfig>,

#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
#[clap(long, value_parser = remote_storage_from_toml)]
parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,

/// How many rows to include in a row group
#[clap(long, default_value_t = 8192)]
Expand Down
13 changes: 2 additions & 11 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use sd_notify::NotifyState;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::logging::SecretString;

use std::env::{var, VarError};
Expand Down Expand Up @@ -126,7 +125,7 @@ struct Args {
peer_recovery: bool,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
/// {max_concurrent_syncs = 17, max_sync_errors = 13, bucket_name = "<BUCKETNAME>", bucket_region = "<REGION>", concurrency_limit = 119}
/// Safekeeper offloads WAL to
/// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
/// structure on the file system.
Expand Down Expand Up @@ -553,16 +552,8 @@ fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
Ok(my_id)
}

// Parse RemoteStorage from TOML table.
fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
let storage_conf_toml = format!("remote_storage = {storage_conf}");
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
// XXX: Don't print the original toml here, there might be some sensitive data
parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
})
RemoteStorageConfig::from_toml(&storage_conf.parse()?)
}

#[test]
Expand Down
4 changes: 3 additions & 1 deletion test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,9 @@ def __init__(self, config: NeonEnvBuilder):
if config.auth_enabled:
sk_cfg["auth_enabled"] = True
if self.safekeepers_remote_storage is not None:
sk_cfg["remote_storage"] = self.safekeepers_remote_storage.to_toml_inline_table()
sk_cfg[
"remote_storage"
] = self.safekeepers_remote_storage.to_toml_inline_table().strip()
self.safekeepers.append(Safekeeper(env=self, id=id, port=port))
cfg["safekeepers"].append(sk_cfg)

Expand Down

0 comments on commit e7e8fec

Please sign in to comment.