Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Commit

Permalink
Add input buffer param, cleanup args, lower active tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
brad-richardson committed Sep 4, 2024
1 parent 211b044 commit 384b910
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ cargo run --release -- --input your.osm.pbf --output ./parquet


## Benchmarks
osm-pbf-parquet prioritizes transcode speed over file size, file count or perserving ordering. Here is a comparison against similar tools on the 2024-06-24 OSM planet PBF:
osm-pbf-parquet prioritizes transcode speed over file size, file count or perserving ordering. Here is a comparison against similar tools on the 2024-06-24 OSM planet PBF with target file size of 500MB:
| | Time (wall) | Output size | File count |
| - | - | - | - |
| **osm-pbf-parquet** (zstd:3) | 36 minutes | 182GB | 600 |
| **osm-pbf-parquet** (zstd:9) | 72 minutes | 165GB | 600 |
| **osm-pbf-parquet** (zstd:3) | 30 minutes | 182GB | ~600 |
| **osm-pbf-parquet** (zstd:9) | 60 minutes | 165GB | ~600 |
| [osm-parquetizer](https://github.com/adrianulbona/osm-parquetizer) | 196 minutes | 285GB | 3 |
| [osm2orc](https://github.com/mojodna/osm2orc) | 385 minutes | 110GB | 1 |

Expand Down
14 changes: 4 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ pub mod sink;
pub mod util;
use crate::osm_arrow::OSMType;
use crate::sink::ElementSink;
use crate::util::{
default_worker_thread_count, Args, ARGS, DEFAULT_BUF_READER_SIZE, ELEMENT_COUNTER,
};
use crate::util::{Args, ARGS, ELEMENT_COUNTER};

type SinkpoolStore = HashMap<OSMType, Arc<Mutex<Vec<ElementSink>>>>;

Expand Down Expand Up @@ -92,7 +90,7 @@ async fn create_s3_buf_reader(url: Url) -> Result<BufReader, anyhow::Error> {
Ok(BufReader::with_capacity(
Arc::new(s3_store),
&meta,
DEFAULT_BUF_READER_SIZE,
ARGS.get().unwrap().get_input_buffer_size_bytes(),
))
}

Expand All @@ -104,7 +102,7 @@ async fn create_local_buf_reader(path: &str) -> Result<BufReader, anyhow::Error>
Ok(BufReader::with_capacity(
Arc::new(local_store),
&meta,
DEFAULT_BUF_READER_SIZE,
ARGS.get().unwrap().get_input_buffer_size_bytes(),
))
}

Expand All @@ -124,11 +122,7 @@ async fn process_blobs(
]));

// Avoid too many tasks in memory
let active_tasks = 2 * ARGS
.get()
.unwrap()
.worker_threads
.unwrap_or(default_worker_thread_count());
let active_tasks = (1.5 * ARGS.get().unwrap().get_worker_threads() as f32) as usize;
let semaphore = Arc::new(Semaphore::new(active_tasks));

let mut join_set = JoinSet::new();
Expand Down
6 changes: 2 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ use clap::Parser;
use env_logger::{Builder, Env};

use osm_pbf_parquet::driver;
use osm_pbf_parquet::util::{default_worker_thread_count, Args};
use osm_pbf_parquet::util::Args;

fn main() {
Builder::from_env(Env::default().default_filter_or("info")).init();

let args = Args::parse();
println!("{:?}", args);

let worker_threads = args.worker_threads.unwrap_or(default_worker_thread_count());

tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.worker_threads(args.get_worker_threads())
.enable_all()
.build()
.unwrap()
Expand Down
11 changes: 3 additions & 8 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use url::Url;
use crate::osm_arrow::osm_arrow_schema;
use crate::osm_arrow::OSMArrowBuilder;
use crate::osm_arrow::OSMType;
use crate::util::{default_record_batch_size_mb, ARGS};
use crate::util::ARGS;

pub struct ElementSink {
// Config for writing file
Expand All @@ -42,11 +42,6 @@ impl ElementSink {
let buf_writer = Self::create_buf_writer(&full_path)?;
let writer = Self::create_writer(buf_writer, args.compression, args.max_row_group_count)?;

let target_record_batch_bytes = args
.record_batch_target_mb
.unwrap_or(default_record_batch_size_mb())
* 1_000_000usize;

Ok(ElementSink {
osm_type,
filenum,
Expand All @@ -56,8 +51,8 @@ impl ElementSink {

estimated_record_batch_bytes: 0usize,
estimated_file_bytes: 0usize,
target_record_batch_bytes,
target_file_bytes: args.file_target_mb * 1_000_000usize,
target_record_batch_bytes: args.get_record_batch_target_bytes(),
target_file_bytes: args.get_file_target_bytes(),
last_write_cycle: Instant::now(),
})
}
Expand Down
35 changes: 29 additions & 6 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ pub static ARGS: OnceLock<Args> = OnceLock::new();
// Element counter to track read progress
pub static ELEMENT_COUNTER: AtomicU64 = AtomicU64::new(0);

// Max recommended size of an uncompressed single blob is 16MB, assumes compression ratio of 2:1 or better
pub const DEFAULT_BUF_READER_SIZE: usize = 1024 * 1024 * 8;
static BYTES_IN_MB: usize = 1024 * 1024;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Path to input PBF
/// S3 URIs and filesystem paths are supported
#[arg(short, long)]
pub input: String,

Expand All @@ -33,6 +33,9 @@ pub struct Args {
#[arg(long)]
pub worker_threads: Option<usize>,

/// Input buffer size, default 8MB
pub input_buffer_size_mb: Option<usize>,

/// Override target record batch size, balance this with available memory
/// default is total memory (MB) / CPU count / 8
#[arg(long)]
Expand All @@ -42,7 +45,7 @@ pub struct Args {
#[arg(long)]
pub max_row_group_count: Option<usize>,

/// Override target parquet file size
/// Override target parquet file size, default 500MB
#[arg(long, default_value_t = 500usize)]
pub file_target_mb: usize,
}
Expand All @@ -54,20 +57,40 @@ impl Args {
output,
compression,
worker_threads: None,
input_buffer_size_mb: None,
record_batch_target_mb: None,
max_row_group_count: None,
file_target_mb: 500usize,
}
}

pub fn get_worker_threads(&self) -> usize {
self.worker_threads.unwrap_or(default_worker_thread_count())
}

pub fn get_input_buffer_size_bytes(&self) -> usize {
// Max size of an uncompressed single blob is 32MB, assumes compression ratio of 2:1 or better
self.input_buffer_size_mb.unwrap_or(16) * BYTES_IN_MB
}

pub fn get_record_batch_target_bytes(&self) -> usize {
self.record_batch_target_mb
.unwrap_or(default_record_batch_size_mb())
* BYTES_IN_MB
}

pub fn get_file_target_bytes(&self) -> usize {
self.file_target_mb * BYTES_IN_MB
}
}

pub fn default_record_batch_size_mb() -> usize {
fn default_record_batch_size_mb() -> usize {
let system = System::new_all();
// Estimate per thread available memory, leaving overhead for copies and system processes
return ((system.total_memory() as usize / 1_000_000usize) / system.cpus().len()) / 8usize;
return ((system.total_memory() as usize / BYTES_IN_MB) / system.cpus().len()) / 8usize;
}

pub fn default_worker_thread_count() -> usize {
fn default_worker_thread_count() -> usize {
let system = System::new_all();
return system.cpus().len();
}

0 comments on commit 384b910

Please sign in to comment.