From 0ebbfc46ad6918564265f2c25ac273a77de75d85 Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Tue, 21 May 2024 16:30:22 -0300 Subject: [PATCH] fix: storage quota not being decremented on file deletion --- rs/bin/openstream/src/main.rs | 18 +++++++++- rs/packages/db/src/models/account/mod.rs | 37 +++++++++++++++++++++ rs/packages/db/src/models/audio_file/mod.rs | 12 +++++-- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/rs/bin/openstream/src/main.rs b/rs/bin/openstream/src/main.rs index 08e2c963..06645b5c 100644 --- a/rs/bin/openstream/src/main.rs +++ b/rs/bin/openstream/src/main.rs @@ -6,7 +6,8 @@ use api::ws_stats::WsStatsServer; use clap::{Parser, Subcommand}; use config::Config; use db::access_token::{AccessToken, GeneratedBy}; -use db::Model; +use db::account::recalculate_storage_quota; +use db::{run_transaction, Model}; use db::admin::Admin; use db::registry::Registry; use drop_tracer::DropTracer; @@ -583,6 +584,21 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> { tokio::spawn(db::station_picture::upgrade_images_if_needed()); tokio::spawn(media::health::health_shutdown_job()); tokio::spawn(db::probe::start_probe_background_job()); + tokio::spawn(async { + info!(target: "start", "recalculating storage quota"); + + let r = async { + run_transaction!(session => { + tx_try!(recalculate_storage_quota(&mut session).await); + Ok::<(), mongodb::error::Error>(()) + }) + }.await; + + match r { + Ok(_) => info!(target: "start", "recalculating storage quota done"), + Err(e) => error!(target: "start", "recalculating storage quota error: {}", e), + }; + }); tokio::spawn({ let shutdown = shutdown.clone(); diff --git a/rs/packages/db/src/models/account/mod.rs b/rs/packages/db/src/models/account/mod.rs index d480ebde..b1f148c8 100644 --- a/rs/packages/db/src/models/account/mod.rs +++ b/rs/packages/db/src/models/account/mod.rs @@ -1,3 +1,4 @@ +use crate::audio_file::AudioFile; use crate::station::Station; use crate::stream_connection::lite::StreamConnectionLite; use crate::Model; @@ -224,6 +225,42 @@ pub async fn recalculate_used_listeners_quota( Ok(()) } +pub async fn recalculate_storage_quota( + session: &mut ClientSession, +) -> Result<(), mongodb::error::Error> { + let mut stations = Station::cl().find_with_session(None, None, session).await?; + let mut station_account_map = HashMap::::new(); + { + while let Some(station) = stations.next(session).await.transpose()? { + station_account_map.insert(station.id.clone(), station.account_id.clone()); + } + } + + let mut account_used_map = HashMap::::new(); + + let mut files = AudioFile::cl() + .find_with_session(None, None, session) + .await?; + + while let Some(file) = files.next(session).await.transpose()? { + let account_id = station_account_map.get(&file.station_id); + if let Some(account_id) = account_id { + *account_used_map.entry(account_id.to_string()).or_insert(0) += file.len; + } + } + + for (account_id, v) in account_used_map.into_iter() { + const KEY_LIMITS_STORAGE_USED: &str = + crate::key!(Account::KEY_LIMITS, Limits::KEY_STORAGE, Limit::KEY_USED); + + let update = doc! { "$set": { KEY_LIMITS_STORAGE_USED: v as f64 } }; + + Account::update_by_id_with_session(&account_id, update, session).await?; + } + + Ok(()) +} + #[cfg(test)] mod test { use super::*; diff --git a/rs/packages/db/src/models/audio_file/mod.rs b/rs/packages/db/src/models/audio_file/mod.rs index 77a3462c..7c665f8b 100644 --- a/rs/packages/db/src/models/audio_file/mod.rs +++ b/rs/packages/db/src/models/audio_file/mod.rs @@ -1,7 +1,7 @@ use crate::{ account::{Account, Limit, Limits}, audio_chunk::AudioChunk, - run_transaction, Model, + run_transaction, station, Model, }; use mongodb::{ bson::{doc, Document}, @@ -147,6 +147,14 @@ impl AudioFile { } }; + let station = + match station::Station::get_by_id_with_session(&audio_file.station_id, session).await? { + None => return Ok(None), + Some(station) => station, + }; + + let account_id = station.account_id.clone(); + // delete chunks AudioChunk::delete_by_audio_file_id_with_session(&audio_file.id, session).await?; @@ -156,7 +164,7 @@ impl AudioFile { // update station const KEY: &str = crate::key!(Account::KEY_LIMITS, Limits::KEY_STORAGE, Limit::KEY_USED); let update = doc! { "$inc": { KEY: (audio_file.len as f64) * -1.0 } }; - Account::update_by_id_with_session(&audio_file.station_id, update, session).await?; + Account::update_by_id_with_session(&account_id, update, session).await?; Ok(Some(audio_file)) }