-
Notifications
You must be signed in to change notification settings - Fork 262
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
3217: Harden council and restart running jobs with a missing dep graph (ENG-2274) r=nickgerace a=nickgerace ## Summary This PR is a first pass at hardening council. It primarily encloses council's core loop in an infallible wrapper as well as handles situations where we have a missing dependency graph and need to restart all running jobs. The restart broadcast is also published whenever council boots up in case of a panic or restart. <img src="https://media2.giphy.com/media/JkADkL9786mQBORwCS/giphy.gif"/> ## Description Council: - Upon startup, council will publish a message for all subscribers (all running pinga instances) to restart actively running jobs - The core loop for "bin/council" has been wrapped with an infallible wrapper (within our control, the only way to escape the wrapper is via a shutdown call or a closed connection) - Bubbled up errors are logged in otel - Explicit panics are now disallowed in "lib/council-server" with additional clippy lints - All "unwrap" usages have been replaced and are either bubbled up or handled Missing Dependency Graph: - In the two instances where a depenendecy graph can have missing data within council, we need to restart the actively running job Management Client: - There is a new client for council exclusively for subscribing to management messages from council - This is solely used for restarting actively running jobs Subject Generation: - For both client and server, all subject management has been centralized into a new module in "lib/council-server" - There are two new subjects: "<prefix>.council.management" and "<prefix>.council.management.reply" Co-authored-by: Nick Gerace <[email protected]>
- Loading branch information
Showing
13 changed files
with
436 additions
and
128 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
//! This module contains [`ManagementClient`], which is used to subscribe to management-type messages from a council | ||
//! server (example: notify all running pinga instances that they should [`restart`](ManagementResponse::Restart) | ||
//! actively running jobs). | ||
use futures::StreamExt; | ||
use si_data_nats::{NatsClient, Subject, Subscriber}; | ||
use std::time::Duration; | ||
use telemetry::prelude::*; | ||
|
||
use crate::client::{ClientError, ClientResult}; | ||
use crate::{ManagementResponse, SubjectGenerator}; | ||
|
||
#[derive(Debug)] | ||
pub struct ManagementClient { | ||
management_channel: Subject, | ||
management_subscriber: Subscriber, | ||
} | ||
|
||
impl ManagementClient { | ||
pub async fn new(nats: &NatsClient, subject_prefix: Option<String>) -> ClientResult<Self> { | ||
let management_channel = SubjectGenerator::for_management_client(subject_prefix); | ||
Ok(Self { | ||
management_subscriber: nats.subscribe(management_channel.clone()).await?, | ||
management_channel: management_channel.into(), | ||
}) | ||
} | ||
|
||
// None means subscriber has been unsubscribed or that the connection has been closed | ||
pub async fn fetch_response(&mut self) -> ClientResult<Option<ManagementResponse>> { | ||
// TODO: timeout so we don't get stuck here forever if council goes away | ||
// TODO: handle message.data() empty with Status header as 503: https://github.com/nats-io/nats.go/pull/576 | ||
let msg = loop { | ||
let res = | ||
tokio::time::timeout(Duration::from_secs(60), self.management_subscriber.next()) | ||
.await; | ||
|
||
match res { | ||
Ok(msg) => break msg, | ||
Err(_) => { | ||
warn!(management_channel = ?self.management_channel, "Council client waiting for response on management channel for 60 seconds"); | ||
} | ||
} | ||
}; | ||
|
||
match msg { | ||
Some(msg) => { | ||
if msg.payload().is_empty() { | ||
return Err(ClientError::NoListenerAvailable); | ||
} | ||
Ok(Some(serde_json::from_slice::<ManagementResponse>( | ||
msg.payload(), | ||
)?)) | ||
} | ||
None => Ok(None), | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.