Skip to content

Commit

Permalink
fix running_swap memory leak
Browse files Browse the repository at this point in the history
this field was convereted to a hashmap instead of a vector for easy access to the swap.
also we now manually delete the swap from running swaps when the swap is finished/inturepted (memory leak fix). as a consequence to manuallly deleting the swap from running_swaps, we can now store them as arcs instead of weakrefs, which simplifies a lot of .upgrade calls.
  • Loading branch information
mariocynicys committed Dec 23, 2024
1 parent e5c5f34 commit b02fced
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 55 deletions.
58 changes: 26 additions & 32 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::convert::TryFrom;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -529,8 +529,11 @@ struct LockedAmountInfo {
locked_amount: LockedAmount,
}

/// A running swap is the swap accompanied by the abort handle of the thread the swap is running on.
type RunningSwap = (Arc<dyn AtomicSwap>, AbortOnDropHandle);

struct SwapsContext {
running_swaps: Mutex<Vec<(Weak<dyn AtomicSwap>, AbortOnDropHandle)>>,
running_swaps: Mutex<HashMap<Uuid, RunningSwap>>,
active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand All @@ -546,7 +549,7 @@ impl SwapsContext {
fn from_ctx(ctx: &MmArc) -> Result<Arc<SwapsContext>, String> {
Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || {
Ok(SwapsContext {
running_swaps: Mutex::new(vec![]),
running_swaps: Mutex::new(HashMap::new()),
active_swaps_v2_infos: Mutex::new(HashMap::new()),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -631,11 +634,9 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

let mut locked = swap_lock
.iter()
.filter_map(|(swap, _)| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
let mut locked = swap_lock.values().flat_map(|(swap, _)| swap.locked_amount()).fold(
MmNumber::from(0),
|mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
Expand All @@ -645,7 +646,8 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
}
}
total_amount
});
},
);
drop(swap_lock);

let locked_amounts = swap_ctx.locked_amounts.lock().unwrap();
Expand All @@ -669,11 +671,8 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
/// Get number of currently running swaps
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, (swap, _)| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
let count = swap_ctx.running_swaps.lock().unwrap().len();
count as u64
}

/// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid
Expand All @@ -682,10 +681,9 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

swap_lock
.iter()
.filter_map(|(swap, _)| swap.upgrade())
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.values()
.filter(|(swap, _)| swap.uuid() != except_uuid)
.flat_map(|(swap, _)| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
Expand All @@ -703,11 +701,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
for (swap, _) in swaps.values() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
}
drop(swaps);
Expand All @@ -723,15 +719,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<

pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
}

drop(swaps);
let mut uuids: Vec<_> = swap_ctx
.running_swaps
.lock()
.unwrap()
.keys()
.map(|uuid| (*uuid, LEGACY_SWAP_TYPE))
.collect();

let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap();
uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type)));
Expand Down
16 changes: 11 additions & 5 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2090,10 +2090,10 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
};
}
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
let mut swap_fut = Box::pin(
let mut swap_fut = Box::pin({
let running_swap = running_swap.clone();
async move {
let mut events;
loop {
Expand Down Expand Up @@ -2150,8 +2150,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
}
}
}
.fuse(),
);
.fuse()
});
// Run the swap in an abortable task and wait for it to finish.
let (swap_ended_notifier, swap_ended_notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
Expand All @@ -2163,9 +2163,15 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
error!("Swap listener stopped listening!");
}
});
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
let uuid = running_swap.uuid;
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, (running_swap, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
swap_ended_notification.await.error_log_with_msg("Swap interrupted!");
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

pub struct MakerSwapPreparedParams {
Expand Down
17 changes: 6 additions & 11 deletions mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,10 @@ pub(crate) struct StopSwapResponse {

pub(crate) async fn stop_swap_rpc(ctx: MmArc, req: StopSwapRequest) -> MmResult<StopSwapResponse, StopSwapErr> {
let swap_ctx = SwapsContext::from_ctx(&ctx).map_err(StopSwapErr::Internal)?;
let mut running_swaps = swap_ctx.running_swaps.lock().unwrap();
let Some(position) = running_swaps.iter().position(|(swap, _)| swap.upgrade().map_or(true, |swap| swap.uuid() == &req.uuid)) else {
// By just removing the swap's abort handle from the running swaps map, the swap will terminate.
if swap_ctx.running_swaps.lock().unwrap().remove(&req.uuid).is_none() {
return MmError::err(StopSwapErr::NotRunning);
};
let (_swap, _abort_handle) = running_swaps.swap_remove(position);
}
Ok(StopSwapResponse {
result: "Success".to_string(),
})
Expand Down Expand Up @@ -582,12 +581,8 @@ pub(crate) async fn kickstart_swap_rpc(
// up with the same swap being kickstarted twice, but we have filesystem swap locks for that. This check is
// rather for convenience.
let swap_ctx = SwapsContext::from_ctx(&ctx).map_err(KickStartSwapErr::Internal)?;
for (swap, _) in swap_ctx.running_swaps.lock().unwrap().iter() {
if let Some(swap) = swap.upgrade() {
if swap.uuid() == &req.uuid {
return MmError::err(KickStartSwapErr::AlreadyRunning);
}
}
if swap_ctx.running_swaps.lock().unwrap().contains_key(&req.uuid) {
return MmError::err(KickStartSwapErr::AlreadyRunning);
}
// Load the swap from the DB.
let swap = match SavedSwap::load_my_swap_from_db(&ctx, req.uuid).await {
Expand Down Expand Up @@ -647,7 +642,7 @@ pub(crate) async fn kickstart_swap_rpc(
)));
},
};
// Kickstart the swap. A new aborthandle will show up shortly for the swap.
// Kickstart the swap. A new abort handle will show up shortly for the swap.
match swap {
SavedSwap::Maker(saved_swap) => ctx.spawner().spawn(run_maker_swap(
RunMakerSwapInput::KickStart {
Expand Down
23 changes: 16 additions & 7 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let uuid = swap.uuid.to_string();
let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy());
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
let mut swap_fut = Box::pin(
let mut swap_fut = Box::pin({
let running_swap = running_swap.clone();
async move {
let mut events;
loop {
Expand Down Expand Up @@ -516,8 +516,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}
}
}
.fuse(),
);
.fuse()
});
// Run the swap in an abortable task and wait for it to finish.
let (swap_ended_notifier, swap_ended_notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
Expand All @@ -529,9 +529,15 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
error!("Swap listener stopped listening!");
}
});
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
let uuid = running_swap.uuid;
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, (running_swap, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
swap_ended_notification.await.error_log_with_msg("Swap interrupted!");
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -3217,10 +3223,13 @@ mod taker_swap_tests {
.unwrap();
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
// Create a dummy abort handle as if it was a running swap.
let abortable_swap = spawn_abortable(async move {});
swaps_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
swaps_ctx
.running_swaps
.lock()
.unwrap()
.insert(arc.uuid, (arc, abortable_swap));

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down

0 comments on commit b02fced

Please sign in to comment.