Skip to content

Commit

Permalink
fix: close stream receiver if lagged (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Oct 26, 2023
2 parents 6c68c3c + 6fb561e commit 8ab6ed2
Show file tree
Hide file tree
Showing 18 changed files with 110 additions and 6 deletions.
3 changes: 3 additions & 0 deletions front/app/src/lib/css/app.css
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

/* @import url("./circular/circular.css"); */
/* @import url("./mada/font.css"); */

@import url("./sofia-sans/font.css");
/* @import url("./brandon/font.css"); */

@import url("./azeret-mono/azeret-mono.css");
/* @import url("./chivo-mono/chivo-mono.css"); */

Expand Down
Binary file added front/app/src/lib/css/brandon/black.woff2
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/black_italic.woff2
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/bold.woff2
Binary file not shown.
Binary file not shown.
83 changes: 83 additions & 0 deletions front/app/src/lib/css/brandon/font.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 200;
src: url("./thin.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 200;
src: url("./thin_italic.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 300;
src: url("./light.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 300;
src: url("./light_italic.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 400;
src: url("./regular.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 400;
src: url("./regular_italic.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 600;
src: url("./medium.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 600;
src: url("./medium_italic.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 700;
src: url("./bold.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 700;
src: url("./bold_italic.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: normal;
font-weight: 900;
src: url("./black.woff2") format('woff2');
}

@font-face {
font-family: Brandon;
font-style: italic;
font-weight: 900;
src: url("./black_italic.woff2") format('woff2');
}
Binary file added front/app/src/lib/css/brandon/light.woff2
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/light_italic.woff2
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/medium.woff2
Binary file not shown.
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/regular.woff2
Binary file not shown.
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/thin.woff2
Binary file not shown.
Binary file added front/app/src/lib/css/brandon/thin_italic.woff2
Binary file not shown.
File renamed without changes.
1 change: 1 addition & 0 deletions rs/packages/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[features]
default = ["analytics-max-concurrent"]
# default = []
analytics-max-concurrent = []

# do not enable this, is for internal benchmarking only
Expand Down
2 changes: 2 additions & 0 deletions rs/packages/media/src/handle/external_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ pub fn run_external_relay_source(
None => break,
Some(Err(_e)) => break,
Some(Ok(bytes)) => {
transfer.fetch_add(bytes.len(), Ordering::Relaxed);

match sender.send(bytes) {
Ok(_) => {
no_listeners_since = None;
Expand Down
27 changes: 21 additions & 6 deletions rs/packages/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use tokio::time::{sleep, Duration};
use transfer_map::TransferTracer;

mod error;
Expand Down Expand Up @@ -451,23 +452,37 @@ impl StreamHandler {

loop_i += 1;

let mut last_recv = Instant::now();

'recv: loop {
let r = rx.recv().await;
match rx.recv().await {
// Here the channel has been dropped
Err(RecvError::Closed) => break 'recv,

match r {
// if lagged we ignore the error and continue with the oldest message buffered in the channel
// TODO: maybe we should advance to the newest message with stream.resubscribe()
Err(RecvError::Lagged(_)) => continue 'recv,
Err(RecvError::Lagged(_)) => {
// break this receiver if lagged behind more than 1 minute
if last_recv.elapsed().as_millis() > 60_000 {
break 'root;
}

// Here the channel has been dropped
Err(RecvError::Closed) => break 'recv,
continue 'recv;
}

// Receive bytes and pass it to response body
Ok(bytes) => {
rx_had_data = true;
last_recv = Instant::now();

let len = bytes.len();
match body_sender.send_data(bytes).await {

let r = tokio::select! {
_ = sleep(Duration::from_millis(60_000)) => break 'root,
r = body_sender.send_data(bytes) => r,
};

match r {
Err(_) => break 'root,
Ok(()) => {
transfer_map.increment(&station.account_id, len);
Expand Down

0 comments on commit 8ab6ed2

Please sign in to comment.