Skip to content

Commit

Permalink
fix: use unbounded channels in config_watcher (rapiz1#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
rapiz1 authored Feb 7, 2022
1 parent 4c08779 commit 90aa0a4
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions src/config_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,17 @@ impl InstanceConfig for ClientConfig {
}

pub struct ConfigWatcherHandle {
pub event_rx: mpsc::Receiver<ConfigChange>,
pub event_rx: mpsc::UnboundedReceiver<ConfigChange>,
}

impl ConfigWatcherHandle {
pub async fn new(path: &Path, shutdown_rx: broadcast::Receiver<bool>) -> Result<Self> {
let (event_tx, event_rx) = mpsc::channel(16);

let (event_tx, event_rx) = mpsc::unbounded_channel();
let origin_cfg = Config::from_file(path).await?;

// Initial start
event_tx
.send(ConfigChange::General(Box::new(origin_cfg.clone())))
.await
.unwrap();

tokio::spawn(config_watcher(
Expand All @@ -124,7 +122,7 @@ impl ConfigWatcherHandle {
async fn config_watcher(
_path: PathBuf,
mut shutdown_rx: broadcast::Receiver<bool>,
_event_tx: mpsc::Sender<ConfigChange>,
_event_tx: mpsc::UnboundedSender<ConfigChange>,
_old: Config,
) -> Result<()> {
// Do nothing except waiting for ctrl-c
Expand All @@ -137,7 +135,7 @@ async fn config_watcher(
async fn config_watcher(
path: PathBuf,
mut shutdown_rx: broadcast::Receiver<bool>,
event_tx: mpsc::Sender<ConfigChange>,
event_tx: mpsc::UnboundedSender<ConfigChange>,
mut old: Config,
) -> Result<()> {
let (fevent_tx, mut fevent_rx) = mpsc::channel(16);
Expand All @@ -146,7 +144,7 @@ async fn config_watcher(
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(e) => {
if let EventKind::Modify(ModifyKind::Data(_)) = e.kind {
let _ = fevent_tx.blocking_send(true);
let _ = fevent_tx.send(true);
}
}
Err(e) => error!("watch error: {:#}", e),
Expand All @@ -171,7 +169,7 @@ async fn config_watcher(
};

for event in calculate_events(&old, &new) {
event_tx.send(event).await?;
event_tx.send(event)?;
}

old = new;
Expand Down

0 comments on commit 90aa0a4

Please sign in to comment.