From 818546cc06b770fd88695c364b8bf78026c62104 Mon Sep 17 00:00:00 2001 From: Chris H-C Date: Tue, 19 Nov 2024 15:16:18 -0500 Subject: [PATCH] bug 1928288 - Permit shutdowns to interrupt an RLB UploadManager Wait task This permits must faster orderly shutdowns if we happen to be throttled. This is acceptable because the point of having a 30s external timeout on upload shutdown is to allow a session time to attempt upload of at least one ping. If we are throttled, it's because we've sent more than one (default: 15) ping. --- .dictionary | 3 +- CHANGELOG.md | 3 + glean-core/rlb/Cargo.toml | 2 +- glean-core/rlb/src/net/mod.rs | 17 +- .../rlb/tests/interruptible_shutdown.rs | 146 ++++++++++++++++++ 5 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 glean-core/rlb/tests/interruptible_shutdown.rs diff --git a/.dictionary b/.dictionary index ff4448b813..e9c78ca5b7 100644 --- a/.dictionary +++ b/.dictionary @@ -1,4 +1,4 @@ -personal_ws-1.1 en 293 utf-8 +personal_ws-1.1 en 294 utf-8 AAR AARs ABI @@ -97,6 +97,7 @@ UUIDs Unbreak Underflowing UniFFI +UploadManager Uploaders VPN Walkthrough diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a3732c48c..8676d0bfd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ [Full changelog](https://github.com/mozilla/glean/compare/v62.0.0...main) +* Rust + * Permit Glean shutdown to interrupt UploadManager Wait tasks ([bug 1928288](https://bugzilla.mozilla.org/show_bug.cgi?id=1928288)) + # v62.0.0 (2024-11-05) [Full changelog](https://github.com/mozilla/glean/compare/v61.2.0...v62.0.0) diff --git a/glean-core/rlb/Cargo.toml b/glean-core/rlb/Cargo.toml index d3bc34d3de..46bffb9b4a 100644 --- a/glean-core/rlb/Cargo.toml +++ b/glean-core/rlb/Cargo.toml @@ -26,13 +26,13 @@ path = ".." version = "62.0.0" [dependencies] +crossbeam-channel = "0.5" inherent = "1" log = "0.4.8" once_cell = "1.18.0" whatsys = "0.3.0" [dev-dependencies] -crossbeam-channel = "0.5" env_logger = { version = "0.10.0", default-features = false, features = ["humantime"] } flate2 = "1.0.19" jsonschema-valid = "0.5.0" diff --git a/glean-core/rlb/src/net/mod.rs b/glean-core/rlb/src/net/mod.rs index 5546078e63..ee75e2b47e 100644 --- a/glean-core/rlb/src/net/mod.rs +++ b/glean-core/rlb/src/net/mod.rs @@ -7,6 +7,7 @@ //! This doesn't perform the actual upload but rather handles //! retries, upload limitations and error tracking. +use crossbeam_channel::{Receiver, Sender}; use std::sync::{atomic::Ordering, Arc, Mutex}; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -59,6 +60,8 @@ struct Inner { uploader: Box, thread_running: AtomicState, handle: Mutex>>, + rx: Receiver<()>, + tx: Sender<()>, } impl UploadManager { @@ -72,12 +75,15 @@ impl UploadManager { server_endpoint: String, new_uploader: Box, ) -> Self { + let (tx, rx) = crossbeam_channel::bounded(1); Self { inner: Arc::new(Inner { server_endpoint, uploader: new_uploader, thread_running: AtomicState::new(State::Stopped), handle: Mutex::new(None), + rx, + tx, }), } } @@ -141,7 +147,13 @@ impl UploadManager { } PingUploadTask::Wait { time } => { log::trace!("Instructed to wait for {:?}ms", time); - thread::sleep(Duration::from_millis(time)); + let _ = inner.rx.recv_timeout(Duration::from_millis(time)); + + let status = inner.thread_running.load(Ordering::SeqCst); + // asked to shut down. let's do it. + if status == State::ShuttingDown { + break; + } } PingUploadTask::Done { .. } => { log::trace!("Received PingUploadTask::Done. Exiting."); @@ -192,6 +204,9 @@ impl UploadManager { let thread = handle.take(); if let Some(thread) = thread { + // poke the thread in case it's in `Wait`. + let _ = self.inner.tx.send(()); + thread .join() .expect("couldn't join on the uploader thread."); diff --git a/glean-core/rlb/tests/interruptible_shutdown.rs b/glean-core/rlb/tests/interruptible_shutdown.rs new file mode 100644 index 0000000000..db950ba116 --- /dev/null +++ b/glean-core/rlb/tests/interruptible_shutdown.rs @@ -0,0 +1,146 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! This integration test should model how the RLB is used when embedded in another Rust application +//! (e.g. FOG/Firefox Desktop). +//! +//! We write a single test scenario per file to avoid any state keeping across runs +//! (different files run as different processes). + +mod common; + +use std::io::Read; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Instant; + +use crossbeam_channel::{bounded, Sender}; +use flate2::read::GzDecoder; +use serde_json::Value as JsonValue; + +use glean::net; +use glean::{ConfigurationBuilder, PingRateLimit}; + +mod metrics { + #![allow(non_upper_case_globals)] + use glean::{private::BooleanMetric, CommonMetricData, Lifetime}; + use once_cell::sync::Lazy; + + pub static sample_boolean: Lazy = Lazy::new(|| { + BooleanMetric::new(CommonMetricData { + name: "sample_boolean".into(), + category: "test.metrics".into(), + send_in_pings: vec!["validation".into()], + disabled: false, + lifetime: Lifetime::Ping, + ..Default::default() + }) + }); +} + +mod pings { + use glean::private::PingType; + use once_cell::sync::Lazy; + + #[allow(non_upper_case_globals)] + pub static validation: Lazy = Lazy::new(|| { + glean::private::PingType::new("validation", true, true, true, true, true, vec![], vec![]) + }); +} + +// Define a fake uploader that reports when and what it uploads. +#[derive(Debug)] +struct ReportingUploader { + calls: AtomicUsize, + sender: Sender, +} + +impl net::PingUploader for ReportingUploader { + fn upload(&self, upload_request: net::PingUploadRequest) -> net::UploadResult { + let calls = self.calls.fetch_add(1, Ordering::SeqCst); + let body = upload_request.body; + let decode = |body: Vec| { + let mut gzip_decoder = GzDecoder::new(&body[..]); + let mut s = String::with_capacity(body.len()); + + gzip_decoder + .read_to_string(&mut s) + .ok() + .map(|_| &s[..]) + .or_else(|| std::str::from_utf8(&body).ok()) + .and_then(|payload| serde_json::from_str(payload).ok()) + .unwrap() + }; + + match calls { + // First goes through immediately. + 0 => { + self.sender.send(decode(body)).unwrap(); + net::UploadResult::http_status(200) + } + // Second SHOULD NEVER SEND + 1 => panic!("We should shutdown before getting to the second ping"), + // Any others ought to be impossible. + _ => panic!("Wat."), + } + } +} + +/// Test scenario: We can interrupt a long glean.upload Wait during shutdown. +/// +/// The app is initialized, in turn Glean gets initialized without problems. +/// A custom ping is submitted once, triggering ping throttling. +/// It is submitted a second time to convince `glean.upload` to be in a `Wait` task. +/// From this position we ask for Glean to promptly shut down. +/// We expect this to happen reasonably quickly (within 2s) instead of waiting for the +/// entire throttling interval _or_ for uploader_shutdown's last-ditch timeout. +#[test] +fn interruptible_shutdown() { + common::enable_test_logging(); + + // Create a custom configuration to use our reporting uploader. + let dir = tempfile::tempdir().unwrap(); + let tmpname = dir.path().to_path_buf(); + let (tx, rx) = bounded(1); + + let mut cfg = ConfigurationBuilder::new(true, tmpname.clone(), "glean-interruptible-shutdown") + .with_server_endpoint("invalid-test-host") + .with_use_core_mps(false) + .with_uploader(ReportingUploader { + calls: AtomicUsize::new(0), + sender: tx, + }) + .build(); + cfg.rate_limit = Some(PingRateLimit { + seconds_per_interval: 600, // Needs only be longer than the timeout in `glean::uploader_shutdown()`. + pings_per_interval: 1, // throttle thyself immediately. + }); + common::initialize(cfg); + + // Wait for init to finish, + // otherwise we might be to quick with calling `shutdown`. + let _ = metrics::sample_boolean.test_get_value(None); + + // fast + pings::validation.submit(None); + // wait for it to be uploaded + let _body = rx.recv().unwrap(); + + // Now we're in throttling territory. + pings::validation.submit(None); + + // Now we shut down. + // This should complete really fast because we'll interrupt the `glean.upload` thread + // from its 600-second Wait task. + // ...so long as everything's working properly. So let's time it. + + let pre_shutdown = Instant::now(); + + glean::shutdown(); + + let shutdown_elapsed = pre_shutdown.elapsed(); + assert!( + shutdown_elapsed.as_secs() < 2, + "We shouldn't have been slow on shutdown." + ); +}