Skip to content

Commit

Permalink
Stop swaps on "stop" RPC call. Add swaps file locks. #626 (#628)
Browse files Browse the repository at this point in the history
* Stop swaps when "stop" RPC is invoked WIP

* Wait for swaps to stop before returning from lp_init.

* WIP.

* Implemented FileLock.

* Adding FileLock to run_maker_swap and run_taker_swap WIP.

* WIP.

* Check whether the swap was finished on kick start when lock is acquired.

* Treat empty or broken file lock as expired. Switch to float for time comparisons.
Move swaps file lock related docker tests to separate module.
  • Loading branch information
artemii235 authored May 4, 2020
1 parent aee8d84 commit 0ef0212
Show file tree
Hide file tree
Showing 13 changed files with 1,047 additions and 141 deletions.
201 changes: 188 additions & 13 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ debug = true
debug = false

[dependencies]
async-std = {version = "1.5", features = ["unstable"]}
atomic = "0.4"
bigdecimal = { version = "0.1", features = ["serde"] }
bitcrypto = { git = "https://github.com/artemii235/parity-bitcoin.git" }
Expand Down
2 changes: 1 addition & 1 deletion etomic_build/client/buy_ONE_ANOTHER
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
source userpass
curl --url "http://127.0.0.1:7783" --data "{\"userpass\":\"$userpass\",\"method\":\"buy\",\"base\":\"$1\",\"rel\":\"$2\",\"volume\":\"0.1\",\"price\":\"2\"}"
curl --url "http://127.0.0.1:7783" --data "{\"userpass\":\"$userpass\",\"method\":\"buy\",\"base\":\"$1\",\"rel\":\"$2\",\"volume\":\"0.1\",\"price\":\"3\"}"
1 change: 1 addition & 0 deletions mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod jsonrpc_client;
#[macro_use]
pub mod log;

pub mod file_lock;
#[cfg(feature = "native")]
pub mod for_c;
pub mod custom_futures;
Expand Down
123 changes: 123 additions & 0 deletions mm2src/common/file_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use crate::{now_ms, now_float};
use std::path::Path;

pub struct FileLock<T: AsRef<Path>> {
/// Filesystem path of the lock file.
lock_path: T,
/// The time in seconds after which an outdated lock file can be removed.
ttl_sec: f64,
}

/// Records timestamp to a file contents.
fn touch(path: &dyn AsRef<Path>, timestamp: u64) -> Result<(), String> {
std::fs::write(path.as_ref(), timestamp.to_string()).map_err(|e| ERRL!("{:?}", e))
}

/// Attempts to read timestamp recorded to a file
fn read_timestamp(path: &dyn AsRef<Path>) -> Result<Option<u64>, String> {
match std::fs::read_to_string(path) {
Ok(content) => Ok(content.parse().ok()),
Err(e) => ERR!("{:?}", e)
}
}

impl<T: AsRef<Path>> FileLock<T> {
pub fn lock(lock_path: T, ttl_sec: f64) -> Result<Option<FileLock<T>>, String> {
match std::fs::OpenOptions::new().write(true).create_new(true).open(lock_path.as_ref()) {
Ok(_) => {
let file_lock = FileLock { lock_path, ttl_sec };
try_s!(file_lock.touch());
Ok(Some(file_lock))
},
Err(ref ie) if ie.kind() == std::io::ErrorKind::AlreadyExists => {
// See if the existing lock is old enough to be discarded.
match read_timestamp(&lock_path) {
Ok(Some(lm)) => if now_float() - lm as f64 > ttl_sec {
let file_lock = FileLock { lock_path, ttl_sec };
try_s!(file_lock.touch());
Ok(Some(file_lock))
} else {
Ok(None)
},
Ok(None) => {
let file_lock = FileLock { lock_path, ttl_sec };
try_s!(file_lock.touch());
Ok(Some(file_lock))
},
Err(ie) => ERR!("Error checking {:?}: {}", lock_path.as_ref(), ie)
}
},
Err(ie) => ERR!("Error creating {:?}: {}", lock_path.as_ref(), ie)
}
}

pub fn touch(&self) -> Result<(), String> {
touch(&self.lock_path, now_ms() / 1000)
}
}

impl<T: AsRef<Path>> Drop for FileLock<T> {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.lock_path);
}
}

#[cfg(test)]
mod file_lock_tests {
use std::{
thread::sleep,
time::Duration,
};
use super::*;

#[test]
fn test_file_lock_should_create_file_and_record_timestamp_and_then_delete_on_drop() {
let now = now_ms() / 1000;
let path = Path::new("test1.lock");
let lock = FileLock::lock(&path, 1000.).unwrap().unwrap();
assert!(path.exists());
let timestamp = read_timestamp(&path).unwrap().unwrap();
assert!(timestamp >= now);
drop(lock);
assert!(!path.exists());
}

#[test]
fn test_file_lock_should_return_none_if_lock_acquired() {
let path = Path::new("test2.lock");
let _lock = FileLock::lock(&path, 1000.).unwrap().unwrap();
let new_lock = FileLock::lock(&path, 1000.).unwrap();
assert!(new_lock.is_none());
}

#[test]
fn test_file_lock_should_acquire_if_ttl_expired_and_update_timestamp() {
let path = Path::new("test3.lock");
let _lock = FileLock::lock(&path, 1.).unwrap().unwrap();
sleep(Duration::from_secs(2));
let old_timestamp = read_timestamp(&path).unwrap();
let _new_lock = FileLock::lock(&path, 1.).unwrap().unwrap();
let new_timestamp = read_timestamp(&path).unwrap();
assert!(new_timestamp > old_timestamp);
}

#[test]
fn test_file_lock_should_acquire_if_file_is_empty() {
let now = now_ms() / 1000;
let path = Path::new("test4.lock");
std::fs::write(&path, &[]).unwrap();
let _new_lock = FileLock::lock(&path, 1.).unwrap().unwrap();
let timestamp = read_timestamp(&path).unwrap().unwrap();
assert!(timestamp >= now);
}

#[test]
fn test_file_lock_should_acquire_if_file_does_not_contain_parsable_timestamp() {
let now = now_ms() / 1000;
let path = Path::new("test5.lock");
std::fs::write(&path, &[12, 13]).unwrap();
let _new_lock = FileLock::lock(&path, 1.).unwrap().unwrap();
let timestamp = read_timestamp(&path).unwrap().unwrap();
assert!(timestamp >= now);
}
}
64 changes: 52 additions & 12 deletions mm2src/common/for_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ impl MarketMakerIt {
ip
};

// Use a separate (unique) temporary folder for each MM.
// (We could also remove the old folders after some time in order not to spam the temporary folder.
// Though we don't always want to remove them right away, allowing developers to check the files).
let now = super::now_ms();
let now = Local.timestamp ((now / 1000) as i64, (now % 1000) as u32 * 1000000);
let folder = format! ("mm2_{}_{}", now.format ("%Y-%m-%d_%H-%M-%S-%3f"), ip);
let folder = super::temp_dir().join (folder);
let db_dir = folder.join ("DB");
conf["dbdir"] = unwrap! (db_dir.to_str()) .into();
let folder = new_mm2_temp_folder_path(Some(ip));
let db_dir = match conf["dbdir"].as_str() {
Some(path) => path.into(),
None => {
let dir = folder.join("DB");
conf["dbdir"] = unwrap!(dir.to_str()).into();
dir
}
};

#[cfg(not(feature = "native"))] {
let ctx = MmCtxBuilder::new().with_conf (conf) .into_mm_arc();
Expand All @@ -192,9 +192,19 @@ impl MarketMakerIt {

#[cfg(feature = "native")] {
try_s! (fs::create_dir (&folder));
try_s! (fs::create_dir (db_dir));
let log_path = folder.join ("mm2.log");
conf["log"] = unwrap! (log_path.to_str()) .into();
match fs::create_dir (db_dir) {
Ok(_) => (),
Err(ref ie) if ie.kind() == std::io::ErrorKind::AlreadyExists => (),
Err(e) => return ERR!("{}", e),
};
let log_path = match conf["log"].as_str() {
Some(path) => path.into(),
None => {
let path = folder.join("mm2.log");
conf["log"] = unwrap!(path.to_str()).into();
path
}
};

// If `local` is provided
// then instead of spawning a process we start the MarketMaker in a local thread,
Expand Down Expand Up @@ -244,6 +254,22 @@ impl MarketMakerIt {
}
}

/// Busy-wait on the log until the `pred` returns `true` or `timeout_sec` expires.
/// The difference from standard wait_for_log is this function keeps working
/// after process is stopped
#[cfg(feature = "native")]
pub async fn wait_for_log_after_stop<F> (&mut self, timeout_sec: f64, pred: F) -> Result<(), String>
where F: Fn (&str) -> bool {
let start = now_float();
let ms = 50 .min ((timeout_sec * 1000.) as u64 / 20 + 10);
loop {
let mm_log = try_s! (self.log_as_utf8());
if pred (&mm_log) {return Ok(())}
if now_float() - start > timeout_sec {return ERR! ("Timeout expired waiting for a log condition")}
Timer::sleep (ms as f64 / 1000.) .await
}
}

/// Busy-wait on the instance in-memory log until the `pred` returns `true` or `timeout_sec` expires.
#[cfg(not(feature = "native"))]
pub async fn wait_for_log<F> (&mut self, timeout_sec: f64, pred: F) -> Result<(), String>
Expand Down Expand Up @@ -500,3 +526,17 @@ pub async fn enable_native(mm: &MarketMakerIt, coin: &str, urls: Vec<&str>) -> J
assert_eq! (native.0, StatusCode::OK, "'enable' failed: {}", native.1);
unwrap!(json::from_str(&native.1))
}

/// Use a separate (unique) temporary folder for each MM.
/// We could also remove the old folders after some time in order not to spam the temporary folder.
/// Though we don't always want to remove them right away, allowing developers to check the files).
/// Appends IpAddr if it is pre-known
pub fn new_mm2_temp_folder_path(ip: Option<IpAddr>) -> PathBuf {
let now = super::now_ms();
let now = Local.timestamp ((now / 1000) as i64, (now % 1000) as u32 * 1000000);
let folder = match ip {
Some(ip) => format! ("mm2_{}_{}", now.format ("%Y-%m-%d_%H-%M-%S-%3f"), ip),
None => format! ("mm2_{}", now.format ("%Y-%m-%d_%H-%M-%S-%3f")),
};
super::temp_dir().join (folder)
}
93 changes: 92 additions & 1 deletion mm2src/docker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ fn main() {

#[cfg(all(test, feature = "native"))]
mod docker_tests {
mod swaps_file_lock_tests;

use bitcrypto::ChecksumType;
use common::block_on;
use common::for_tests::{enable_native, MarketMakerIt, mm_dump};
use common::{
file_lock::FileLock,
for_tests::{enable_native, MarketMakerIt, new_mm2_temp_folder_path, mm_dump}
};
use coins::{FoundSwapTxSpend, MarketCoinOps, SwapOps};
use coins::utxo::{coin_daemon_data_dir, dhash160, utxo_coin_from_conf_and_request, zcash_params_path, UtxoCoin};
use coins::utxo::rpc_clients::{UtxoRpcClientEnum, UtxoRpcClientOps};
use futures01::Future;
use gstuff::now_ms;
use keys::{KeyPair, Private};
use secp256k1::SecretKey;
use serde_json::{self as json, Value as Json};
use std::env;
Expand Down Expand Up @@ -535,4 +542,88 @@ mod docker_tests {
unwrap!(block_on(mm_bob.stop()));
unwrap!(block_on(mm_alice.stop()));
}

#[test]
fn swaps_should_stop_on_stop_rpc() {
let (_, bob_priv_key) = generate_coin_with_random_privkey("MYCOIN", 1000);
let (_, alice_priv_key) = generate_coin_with_random_privkey("MYCOIN1", 2000);
let coins = json! ([
{"coin":"MYCOIN","asset":"MYCOIN","txversion":4,"overwintered":1,"txfee":1000},
{"coin":"MYCOIN1","asset":"MYCOIN1","txversion":4,"overwintered":1,"txfee":1000},
]);
let mut mm_bob = unwrap! (MarketMakerIt::start (
json! ({
"gui": "nogui",
"netid": 9000,
"dht": "on", // Enable DHT without delay.
"passphrase": format!("0x{}", hex::encode(bob_priv_key)),
"coins": coins,
"rpc_password": "pass",
"i_am_seed": true,
}),
"pass".to_string(),
None,
));
let (_bob_dump_log, _bob_dump_dashboard) = mm_dump (&mm_bob.log_path);
unwrap! (block_on (mm_bob.wait_for_log (22., |log| log.contains (">>>>>>>>> DEX stats "))));

let mut mm_alice = unwrap! (MarketMakerIt::start (
json! ({
"gui": "nogui",
"netid": 9000,
"dht": "on", // Enable DHT without delay.
"passphrase": format!("0x{}", hex::encode(alice_priv_key)),
"coins": coins,
"rpc_password": "pass",
"seednodes": vec![format!("{}", mm_bob.ip)],
}),
"pass".to_string(),
None,
));
let (_alice_dump_log, _alice_dump_dashboard) = mm_dump (&mm_alice.log_path);
unwrap! (block_on (mm_alice.wait_for_log (22., |log| log.contains (">>>>>>>>> DEX stats "))));

log!([block_on(enable_native(&mm_bob, "MYCOIN", vec![]))]);
log!([block_on(enable_native(&mm_bob, "MYCOIN1", vec![]))]);
log!([block_on(enable_native(&mm_alice, "MYCOIN", vec![]))]);
log!([block_on(enable_native(&mm_alice, "MYCOIN1", vec![]))]);
let rc = unwrap! (block_on (mm_bob.rpc (json! ({
"userpass": mm_bob.userpass,
"method": "setprice",
"base": "MYCOIN",
"rel": "MYCOIN1",
"price": 1,
"max": true,
}))));
assert! (rc.0.is_success(), "!setprice: {}", rc.1);
let mut uuids = Vec::with_capacity(3);

for _ in 0..3 {
let rc = unwrap!(block_on (mm_alice.rpc (json! ({
"userpass": mm_alice.userpass,
"method": "buy",
"base": "MYCOIN",
"rel": "MYCOIN1",
"price": 1,
"volume": "1",
}))));
assert!(rc.0.is_success(), "!buy: {}", rc.1);
let buy: Json = json::from_str(&rc.1).unwrap();
uuids.push(buy["result"]["uuid"].as_str().unwrap().to_owned());
}
for uuid in uuids.iter() {
unwrap!(block_on (mm_bob.wait_for_log (22.,
|log| log.contains (&format!("Entering the maker_swap_loop MYCOIN/MYCOIN1 with uuid: {}", uuid))
)));
unwrap!(block_on (mm_alice.wait_for_log (22.,
|log| log.contains (&format!("Entering the taker_swap_loop MYCOIN/MYCOIN1 with uuid: {}", uuid))
)));
}
unwrap!(block_on(mm_bob.stop()));
unwrap!(block_on(mm_alice.stop()));
for uuid in uuids {
unwrap!(block_on (mm_bob.wait_for_log_after_stop (22., |log| log.contains (&format!("swap {} stopped", uuid)))));
unwrap!(block_on (mm_alice.wait_for_log_after_stop (22., |log| log.contains (&format!("swap {} stopped", uuid)))));
}
}
}
Loading

0 comments on commit 0ef0212

Please sign in to comment.