Skip to content

Commit

Permalink
bug 1928288 - Permit shutdowns to interrupt an RLB UploadManager Wait…
Browse files Browse the repository at this point in the history
… task

This permits must faster orderly shutdowns if we happen to be throttled.

This is acceptable because the point of having a 30s external timeout on upload
shutdown is to allow a session time to attempt upload of at least one ping.
If we are throttled, it's because we've sent more than one (default: 15) ping.
  • Loading branch information
chutten authored and badboy committed Nov 22, 2024
1 parent 705da12 commit 818546c
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 3 deletions.
3 changes: 2 additions & 1 deletion .dictionary
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
personal_ws-1.1 en 293 utf-8
personal_ws-1.1 en 294 utf-8
AAR
AARs
ABI
Expand Down Expand Up @@ -97,6 +97,7 @@ UUIDs
Unbreak
Underflowing
UniFFI
UploadManager
Uploaders
VPN
Walkthrough
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

[Full changelog](https://github.com/mozilla/glean/compare/v62.0.0...main)

* Rust
* Permit Glean shutdown to interrupt UploadManager Wait tasks ([bug 1928288](https://bugzilla.mozilla.org/show_bug.cgi?id=1928288))

# v62.0.0 (2024-11-05)

[Full changelog](https://github.com/mozilla/glean/compare/v61.2.0...v62.0.0)
Expand Down
2 changes: 1 addition & 1 deletion glean-core/rlb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ path = ".."
version = "62.0.0"

[dependencies]
crossbeam-channel = "0.5"
inherent = "1"
log = "0.4.8"
once_cell = "1.18.0"
whatsys = "0.3.0"

[dev-dependencies]
crossbeam-channel = "0.5"
env_logger = { version = "0.10.0", default-features = false, features = ["humantime"] }
flate2 = "1.0.19"
jsonschema-valid = "0.5.0"
Expand Down
17 changes: 16 additions & 1 deletion glean-core/rlb/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! This doesn't perform the actual upload but rather handles
//! retries, upload limitations and error tracking.

use crossbeam_channel::{Receiver, Sender};
use std::sync::{atomic::Ordering, Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -59,6 +60,8 @@ struct Inner {
uploader: Box<dyn PingUploader + 'static>,
thread_running: AtomicState,
handle: Mutex<Option<JoinHandle<()>>>,
rx: Receiver<()>,
tx: Sender<()>,
}

impl UploadManager {
Expand All @@ -72,12 +75,15 @@ impl UploadManager {
server_endpoint: String,
new_uploader: Box<dyn PingUploader + 'static>,
) -> Self {
let (tx, rx) = crossbeam_channel::bounded(1);
Self {
inner: Arc::new(Inner {
server_endpoint,
uploader: new_uploader,
thread_running: AtomicState::new(State::Stopped),
handle: Mutex::new(None),
rx,
tx,
}),
}
}
Expand Down Expand Up @@ -141,7 +147,13 @@ impl UploadManager {
}
PingUploadTask::Wait { time } => {
log::trace!("Instructed to wait for {:?}ms", time);
thread::sleep(Duration::from_millis(time));
let _ = inner.rx.recv_timeout(Duration::from_millis(time));

let status = inner.thread_running.load(Ordering::SeqCst);
// asked to shut down. let's do it.
if status == State::ShuttingDown {
break;
}
}
PingUploadTask::Done { .. } => {
log::trace!("Received PingUploadTask::Done. Exiting.");
Expand Down Expand Up @@ -192,6 +204,9 @@ impl UploadManager {
let thread = handle.take();

if let Some(thread) = thread {
// poke the thread in case it's in `Wait`.
let _ = self.inner.tx.send(());

thread
.join()
.expect("couldn't join on the uploader thread.");
Expand Down
146 changes: 146 additions & 0 deletions glean-core/rlb/tests/interruptible_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! This integration test should model how the RLB is used when embedded in another Rust application
//! (e.g. FOG/Firefox Desktop).
//!
//! We write a single test scenario per file to avoid any state keeping across runs
//! (different files run as different processes).

mod common;

use std::io::Read;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;

use crossbeam_channel::{bounded, Sender};
use flate2::read::GzDecoder;
use serde_json::Value as JsonValue;

use glean::net;
use glean::{ConfigurationBuilder, PingRateLimit};

mod metrics {
#![allow(non_upper_case_globals)]
use glean::{private::BooleanMetric, CommonMetricData, Lifetime};
use once_cell::sync::Lazy;

pub static sample_boolean: Lazy<BooleanMetric> = Lazy::new(|| {
BooleanMetric::new(CommonMetricData {
name: "sample_boolean".into(),
category: "test.metrics".into(),
send_in_pings: vec!["validation".into()],
disabled: false,
lifetime: Lifetime::Ping,
..Default::default()
})
});
}

mod pings {
use glean::private::PingType;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static validation: Lazy<PingType> = Lazy::new(|| {
glean::private::PingType::new("validation", true, true, true, true, true, vec![], vec![])
});
}

// Define a fake uploader that reports when and what it uploads.
#[derive(Debug)]
struct ReportingUploader {
calls: AtomicUsize,
sender: Sender<JsonValue>,
}

impl net::PingUploader for ReportingUploader {
fn upload(&self, upload_request: net::PingUploadRequest) -> net::UploadResult {
let calls = self.calls.fetch_add(1, Ordering::SeqCst);
let body = upload_request.body;
let decode = |body: Vec<u8>| {
let mut gzip_decoder = GzDecoder::new(&body[..]);
let mut s = String::with_capacity(body.len());

gzip_decoder
.read_to_string(&mut s)
.ok()
.map(|_| &s[..])
.or_else(|| std::str::from_utf8(&body).ok())
.and_then(|payload| serde_json::from_str(payload).ok())
.unwrap()
};

match calls {
// First goes through immediately.
0 => {
self.sender.send(decode(body)).unwrap();
net::UploadResult::http_status(200)
}
// Second SHOULD NEVER SEND
1 => panic!("We should shutdown before getting to the second ping"),
// Any others ought to be impossible.
_ => panic!("Wat."),
}
}
}

/// Test scenario: We can interrupt a long glean.upload Wait during shutdown.
///
/// The app is initialized, in turn Glean gets initialized without problems.
/// A custom ping is submitted once, triggering ping throttling.
/// It is submitted a second time to convince `glean.upload` to be in a `Wait` task.
/// From this position we ask for Glean to promptly shut down.
/// We expect this to happen reasonably quickly (within 2s) instead of waiting for the
/// entire throttling interval _or_ for uploader_shutdown's last-ditch timeout.
#[test]
fn interruptible_shutdown() {
common::enable_test_logging();

// Create a custom configuration to use our reporting uploader.
let dir = tempfile::tempdir().unwrap();
let tmpname = dir.path().to_path_buf();
let (tx, rx) = bounded(1);

let mut cfg = ConfigurationBuilder::new(true, tmpname.clone(), "glean-interruptible-shutdown")
.with_server_endpoint("invalid-test-host")
.with_use_core_mps(false)
.with_uploader(ReportingUploader {
calls: AtomicUsize::new(0),
sender: tx,
})
.build();
cfg.rate_limit = Some(PingRateLimit {
seconds_per_interval: 600, // Needs only be longer than the timeout in `glean::uploader_shutdown()`.
pings_per_interval: 1, // throttle thyself immediately.
});
common::initialize(cfg);

// Wait for init to finish,
// otherwise we might be to quick with calling `shutdown`.
let _ = metrics::sample_boolean.test_get_value(None);

// fast
pings::validation.submit(None);
// wait for it to be uploaded
let _body = rx.recv().unwrap();

// Now we're in throttling territory.
pings::validation.submit(None);

// Now we shut down.
// This should complete really fast because we'll interrupt the `glean.upload` thread
// from its 600-second Wait task.
// ...so long as everything's working properly. So let's time it.

let pre_shutdown = Instant::now();

glean::shutdown();

let shutdown_elapsed = pre_shutdown.elapsed();
assert!(
shutdown_elapsed.as_secs() < 2,
"We shouldn't have been slow on shutdown."
);
}

0 comments on commit 818546c

Please sign in to comment.