Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: Broadcast message to logged in users before rebooting #479

Merged
merged 1 commit into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dist/systemd/system/zincati.service
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ After=systemd-machine-id-commit.service
[Service]
User=zincati
Group=zincati
SupplementaryGroups=tty
Environment=ZINCATI_VERBOSITY="-v"
Type=notify
ExecStart=/usr/libexec/zincati agent ${ZINCATI_VERBOSITY}
Expand Down
23 changes: 20 additions & 3 deletions src/update_agent/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Update agent actor.

use super::{UpdateAgent, UpdateAgentState};
use super::{broadcast, UpdateAgent, UpdateAgentState};
use crate::rpm_ostree::{self, Release};
use actix::prelude::*;
use failure::Error;
Expand Down Expand Up @@ -379,10 +379,27 @@ impl UpdateAgent {
return Box::pin(actix::fut::err(()));
}

log::info!(
"staged deployment '{}' available, proceeding to finalize it",
// Warn logged in users of imminent reboot.
let msg = format!(
"staged deployment '{}' available, proceeding to finalize it and reboot",
release.version
);
log::info!("{}", &msg);
match broadcast(&msg) {
Ok((sessions_total, sessions_broadcasted)) => {
if sessions_total != sessions_broadcasted {
log::warn!(
"{} sessions found, but only broadcasted to {}",
sessions_total,
sessions_broadcasted
);
}
}
Err(e) => {
log::error!("failed to broadcast to user sessions: {}", e);
}
}

let msg = rpm_ostree::FinalizeDeployment { release };
let upgrade = self
.rpm_ostree_actor
Expand Down
97 changes: 97 additions & 0 deletions src/update_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::rpm_ostree::{Release, RpmOstreeClient};
use crate::strategy::UpdateStrategy;
use actix::Addr;
use chrono::prelude::*;
use failure::{bail, Fallible, ResultExt};
use prometheus::IntGauge;
use serde::{Deserialize, Deserializer};
use std::fs;
use std::time::Duration;

/// Default refresh interval for steady state (in seconds).
Expand Down Expand Up @@ -37,6 +40,28 @@ lazy_static::lazy_static! {
)).unwrap();
}

/// JSON output from `loginctl list-sessions --output=json`
#[derive(Debug, Deserialize)]
pub struct SessionsJSON {
user: String,
#[serde(deserialize_with = "empty_string_as_none")]
tty: Option<String>,
}

/// Function to deserialize field to `Option<String>`, where empty strings are
/// deserialized into `None`.
fn empty_string_as_none<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s.is_empty() {
Ok(None)
} else {
Ok(Some(s))
}
}

/// State machine for the agent.
#[derive(Clone, Debug, PartialEq, Eq)]
enum UpdateAgentState {
Expand Down Expand Up @@ -238,6 +263,78 @@ impl UpdateAgent {
}
}

/// Attempt to broadcast msg to all sessions registered in systemd's login manager.
/// Returns a Result with a tuple of total sessions found and sessions broadcasted to,
/// if no error.
fn broadcast(msg: &str) -> Fallible<(usize, usize)> {
let sessions = get_user_sessions()?;
let sessions_total = sessions.len();
let mut sessions_broadcasted: usize = 0;

let broadcast_msg = format!(
"\nBroadcast message from Zincati at {}:\n{}\n",
chrono::Utc::now().format("%a %Y-%m-%d %H:%M:%S %Z"),
msg
);

// Iterate over sessions and attempt to write to each session's tty.
for session in sessions.into_iter() {
let user = session.user;
let tty_dev = match session.tty {
Some(mut tty) => {
tty.insert_str(0, "/dev/");
tty
}
None => {
log::debug!(
"found user {} with no tty, skipping broadcast to this user",
user
);
continue;
}
};

log::trace!(
"Attempting to broadcast a message to user {} at {}",
user,
tty_dev
);

{
if let Err(e) = fs::write(&tty_dev, &broadcast_msg) {
log::error!("failed to write to {}: {}", &tty_dev, e);
continue;
};
}

sessions_broadcasted = sessions_broadcasted.saturating_add(1);
}

Ok((sessions_total, sessions_broadcasted))
}

/// Get sessions with users logged in using `loginctl`.
/// Returns a Result with vector of `SessionsJSON`, if no error.
fn get_user_sessions() -> Fallible<Vec<SessionsJSON>> {
let cmdrun = std::process::Command::new("loginctl")
.arg("list-sessions")
.arg("--output=json")
.output()
.context("failed to run `loginctl` binary")?;

if !cmdrun.status.success() {
bail!(
"`loginctl` failed to list current sessions: {}",
String::from_utf8_lossy(&cmdrun.stderr)
);
}

let sessions = serde_json::from_slice(&cmdrun.stdout)
.context("failed to deserialize output of `loginctl`")?;

Ok(sessions)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down