Skip to content

Commit

Permalink
feat: add auto-upgrade for station images at startup, increase webp q…
Browse files Browse the repository at this point in the history
…uality, add more png sizes (#114)
  • Loading branch information
ramiroaisen authored Jul 7, 2023
2 parents aea5b69 + 79a23f7 commit bd8e3db
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 9 deletions.
2 changes: 2 additions & 0 deletions defs/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export const REAL_IP_HEADER = "x-real-ip";

export const RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 10;

export const STATION_PICTURES_VERSION = 2.0;

export const STREAM_BURST_LENGTH = 12;

export const STREAM_CHANNEL_CAPACITY = 16;
Expand Down
3 changes: 2 additions & 1 deletion front/app/src/service-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { CacheFirst, NetworkOnly } from "workbox-strategies";
import { CacheableResponsePlugin } from 'workbox-cacheable-response';
import { matchPrecache, precacheAndRoute } from "workbox-precaching";
import { build, version } from "$service-worker";
import { STATION_PICTURES_VERSION } from "../../../defs/constants"

precacheAndRoute([
{ url: "/offline", revision: version },
Expand Down Expand Up @@ -45,7 +46,7 @@ registerRoute(
registerRoute(
({ request, url }) => request.destination === "image" && url.pathname.startsWith(`/station-pictures`),
new CacheFirst({
cacheName: "station-pictures",
cacheName: `station-pictures-v${STATION_PICTURES_VERSION}`,
plugins: [
new CacheableResponsePlugin({
statuses: [0, 200]
Expand Down
2 changes: 2 additions & 0 deletions rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
let deployment_id = deployment.id.clone();
Deployment::insert(&deployment).await?;

tokio::spawn(db::station_picture::upgrade_images_if_needed());

tokio::spawn({
let shutdown = shutdown.clone();
let deployment_id = deployment_id.clone();
Expand Down
6 changes: 6 additions & 0 deletions rs/config/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub const REAL_IP_HEADER: &str = "x-real-ip";
#[const_register]
pub const PAYMENTS_ACCESS_TOKEN_HEADER: &str = "x-access-token";

/// we need to update this value after making code changes to the station pictures logic or sizes
/// changing to this value will make startup check and recreation of outdated images
/// and invalidation of service workers station pictures caches
#[const_register]
pub const STATION_PICTURES_VERSION: f64 = 2.0;

#[cfg(test)]
pub mod test {
use std::path::Path;
Expand Down
197 changes: 189 additions & 8 deletions rs/packages/db/src/models/station_picture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{run_transaction, Model};
use bytes::Bytes;
use mongodb::bson::doc;
use mongodb::{ClientSession, IndexModel};
use ril::{Paste, Rgba};
use ril::{Encoder, Paste, Rgba};
use serde::{Deserialize, Serialize};
use serde_util::DateTime;
use ts_rs::TS;
Expand Down Expand Up @@ -45,9 +45,182 @@ impl Model for StationPicture {
}
}

pub async fn upgrade_images_if_needed() -> Result<(), CreateStationPictureError> {
let r: Result<(), CreateStationPictureError> = async {

log::info!(
target: "station-pictures",
"starting images upgrade process for version {}",
StationPicture::VERSION
);

let ids: Vec<String> = {
let filter = doc! { StationPicture::KEY_VERSION: { "$ne": StationPicture::VERSION } };
use futures_util::TryStreamExt;
StationPicture::cl()
.find(filter, None)
.await?
.try_collect::<Vec<StationPicture>>()
.await?
.into_iter()
.map(|item| item.id)
.collect()
};

log::info!(
target: "station-pictures",
"found {} outdated images",
ids.len()
);

async fn upgrade_one(i: usize, total: usize, id: String) -> Result<(), CreateStationPictureError> {

run_transaction!(session => {
let picture = match tx_try!(StationPicture::get_by_id_with_session(&id, &mut session).await) {
Some(picture) => picture,
None => {
log::warn!(
target: "station-pictures",
"picture {} of {} with id {} not found",
i + 1,
total,
id
);

return Ok(());
}
};

if picture.version == StationPicture::VERSION {
log::info!(
target: "station-pictures",
"picture {} of {} with id {} already converted, skipping",
i + 1,
total,
id
)
}

log::info!(
target: "station-pictures",
"processing picture {} of {} with id {} => current_version={} target_version={}",
i + 1,
total,
id,
picture.version,
StationPicture::VERSION,
);


let source = {
let filter = doc! {
StationPictureVariant::KEY_PICTURE_ID: &picture.id,
StationPictureVariant::KEY_FORMAT: StationPictureVariantFormat::KEY_ENUM_VARIANT_SOURCE,
};

match tx_try!(StationPictureVariant::get_with_session(filter, &mut session).await) {
Some(source) => source,
None => {
log::warn!(
target: "station-pictures",
"picture {} of {} with id {} => source variant not found, skipping",
i + 1,
total,
id,
);

return Ok(());
}
}
};

let (new_picture, variants) = match StationPicture::create_variants(id.clone(), picture.account_id, picture.src_filename, picture.src_content_type, source.data).await {
Ok((new_picture, variants)) => (new_picture, variants),
Err(e) => {
log::warn!(
target: "station-pictures",
"picture {} of {} with id {} error generating variants: {} => {:?}",
i + 1,
total,
id,
e,
e,
);

return Ok(());
}
};

log::info!(
target: "station-pictures",
"picture {} of {} with id {} generated {} variants",
i + 1,
total,
id,
variants.len()
);

{
let filter = doc!{ StationPictureVariant::KEY_PICTURE_ID: &id };
tx_try!(StationPictureVariant::cl().delete_many_with_session(filter, None, &mut session).await);
}

tx_try!(StationPictureVariant::insert_many_with_session(&variants, &mut session).await);
tx_try!(StationPicture::replace_with_session(&id, &new_picture, &mut session).await);

Ok(())
})
}

let total = ids.len();
for (i, id) in ids.into_iter().enumerate() {
let r = upgrade_one(i, total, id.clone()).await;
match r {
Ok(_) => {

},

Err(e) => {
log::warn!(
target: "station-pictures",
"upgrage of picture {} of {} => {} failed with error {} => {:?}",
i,
total,
id,
e,
e,
)
}
}
}

Ok(())
}.await;

match &r {
Ok(_) => {
log::info!(
target: "station-pictures",
"station pictures upgrade ended with Ok"
)
}

Err(e) => {
log::warn!(
target: "station-pictures",
"station pictures upgrade ended with Error => {} => {:?}",
e,
e,
)
}
}

r
}

impl StationPicture {
pub const VERSION: f64 = 1.0;
pub const PNG_SIZES: [f64; 2] = [192.0, 512.0];
pub const VERSION: f64 = constants::STATION_PICTURES_VERSION;
pub const PNG_SIZES: [f64; 6] = [32.0, 64.0, 128.0, 192.0, 256.0, 512.0];
pub const WEBP_SIZES: [f64; 5] = [32.0, 64.0, 128.0, 256.0, 512.0];

pub async fn delete_with_session(
Expand All @@ -69,6 +242,7 @@ impl StationPicture {
}

pub async fn create_variants(
id: String,
account_id: String,
filename: String,
content_type: String,
Expand All @@ -80,7 +254,7 @@ impl StationPicture {
// use image::GenericImageView;
// use std::io::Cursor;

use ril::{Image, ImageFormat};
use ril::Image;

let img = Image::<Rgba>::from_bytes_inferred(&data)?;

Expand All @@ -105,7 +279,7 @@ impl StationPicture {
let now = DateTime::now();

let doc = StationPicture {
id: StationPicture::uid(),
id,
account_id: account_id.clone(),
version: StationPicture::VERSION,
src_filename: filename.clone(),
Expand Down Expand Up @@ -161,8 +335,12 @@ impl StationPicture {
let img = bg;

let mut buf = vec![];
img.encode(ImageFormat::Png, &mut buf)?;
let mut encoder = ril::encodings::png::PngEncoder::default()
.with_compression(ril::encodings::png::Compression::Best);

encoder.encode(&img, &mut buf)?;

// img.encode(ImageFormat::Png, &mut buf)?;
// match img.write_to(&mut Cursor::new(&mut buf), image::ImageOutputFormat::Png) {
// Ok(()) => {}
// Err(e) => return Err(e.into()),
Expand Down Expand Up @@ -194,7 +372,9 @@ impl StationPicture {
// );

let mut buf = vec![];
img.encode(ImageFormat::WebP, &mut buf)?;
let mut encoder = ril::encodings::webp::WebPEncoder::new().with_quality(100.0);
encoder.encode(&img, &mut buf)?;
// img.encode(ImageFormat::WebP, &mut buf)?;

// let encoder = match Encoder::from_image(&img) {
// Err(s) => return Err(CreateStationPictureError::Webp(String::from(s))),
Expand Down Expand Up @@ -234,8 +414,9 @@ impl StationPicture {
content_type: String,
data: Bytes,
) -> Result<(StationPicture, Vec<StationPictureVariant>), CreateStationPictureError> {
let id = StationPicture::uid();
let (doc, variants) =
Self::create_variants(account_id.clone(), filename, content_type, data).await?;
Self::create_variants(id, account_id.clone(), filename, content_type, data).await?;

run_transaction!(session => {
match tx_try!(Account::exists_with_session(&*account_id, &mut session).await) {
Expand Down
1 change: 1 addition & 0 deletions rs/packages/db/src/models/station_picture_variant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl Model for StationPictureVariant {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, TS)]
#[ts(export, export_to = "../../../defs/db/")]
#[serde(rename_all = "kebab-case")]
#[macros::keys]
pub enum StationPictureVariantFormat {
Webp,
Png,
Expand Down

0 comments on commit bd8e3db

Please sign in to comment.