Skip to content

Commit

Permalink
fix: storage quota not being decremented on file deletion (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored May 21, 2024
2 parents b065405 + 0ebbfc4 commit ad6b994
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 3 deletions.
18 changes: 17 additions & 1 deletion rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use api::ws_stats::WsStatsServer;
use clap::{Parser, Subcommand};
use config::Config;
use db::access_token::{AccessToken, GeneratedBy};
use db::Model;
use db::account::recalculate_storage_quota;
use db::{run_transaction, Model};
use db::admin::Admin;
use db::registry::Registry;
use drop_tracer::DropTracer;
Expand Down Expand Up @@ -583,6 +584,21 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
tokio::spawn(db::station_picture::upgrade_images_if_needed());
tokio::spawn(media::health::health_shutdown_job());
tokio::spawn(db::probe::start_probe_background_job());
tokio::spawn(async {
info!(target: "start", "recalculating storage quota");

let r = async {
run_transaction!(session => {
tx_try!(recalculate_storage_quota(&mut session).await);
Ok::<(), mongodb::error::Error>(())
})
}.await;

match r {
Ok(_) => info!(target: "start", "recalculating storage quota done"),
Err(e) => error!(target: "start", "recalculating storage quota error: {}", e),
};
});

tokio::spawn({
let shutdown = shutdown.clone();
Expand Down
37 changes: 37 additions & 0 deletions rs/packages/db/src/models/account/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::audio_file::AudioFile;
use crate::station::Station;
use crate::stream_connection::lite::StreamConnectionLite;
use crate::Model;
Expand Down Expand Up @@ -224,6 +225,42 @@ pub async fn recalculate_used_listeners_quota(
Ok(())
}

pub async fn recalculate_storage_quota(
session: &mut ClientSession,
) -> Result<(), mongodb::error::Error> {
let mut stations = Station::cl().find_with_session(None, None, session).await?;
let mut station_account_map = HashMap::<String, String>::new();
{
while let Some(station) = stations.next(session).await.transpose()? {
station_account_map.insert(station.id.clone(), station.account_id.clone());
}
}

let mut account_used_map = HashMap::<String, u64>::new();

let mut files = AudioFile::cl()
.find_with_session(None, None, session)
.await?;

while let Some(file) = files.next(session).await.transpose()? {
let account_id = station_account_map.get(&file.station_id);
if let Some(account_id) = account_id {
*account_used_map.entry(account_id.to_string()).or_insert(0) += file.len;
}
}

for (account_id, v) in account_used_map.into_iter() {
const KEY_LIMITS_STORAGE_USED: &str =
crate::key!(Account::KEY_LIMITS, Limits::KEY_STORAGE, Limit::KEY_USED);

let update = doc! { "$set": { KEY_LIMITS_STORAGE_USED: v as f64 } };

Account::update_by_id_with_session(&account_id, update, session).await?;
}

Ok(())
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
12 changes: 10 additions & 2 deletions rs/packages/db/src/models/audio_file/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
account::{Account, Limit, Limits},
audio_chunk::AudioChunk,
run_transaction, Model,
run_transaction, station, Model,
};
use mongodb::{
bson::{doc, Document},
Expand Down Expand Up @@ -147,6 +147,14 @@ impl AudioFile {
}
};

let station =
match station::Station::get_by_id_with_session(&audio_file.station_id, session).await? {
None => return Ok(None),
Some(station) => station,
};

let account_id = station.account_id.clone();

// delete chunks
AudioChunk::delete_by_audio_file_id_with_session(&audio_file.id, session).await?;

Expand All @@ -156,7 +164,7 @@ impl AudioFile {
// update station
const KEY: &str = crate::key!(Account::KEY_LIMITS, Limits::KEY_STORAGE, Limit::KEY_USED);
let update = doc! { "$inc": { KEY: (audio_file.len as f64) * -1.0 } };
Account::update_by_id_with_session(&audio_file.station_id, update, session).await?;
Account::update_by_id_with_session(&account_id, update, session).await?;

Ok(Some(audio_file))
}
Expand Down

0 comments on commit ad6b994

Please sign in to comment.