Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Merge #2053 #2074
Browse files Browse the repository at this point in the history
2053: Steal connection r=bonomat a=bonomat


Resolves #2007

If the same taker tries to connect twice we prioritize the second connection. This can be useful if the taker thinks he is disconnected from the maker but the maker does not


2074: Revert changes to `prepare_db.sh` r=luckysori a=luckysori

With patch b899831 we modified the `prepare_db.sh` script in order to be able to define `sqlx` queries under `cfg(test)`.

This was convenient, but it turns out that the changes cause the script to trigger _complete_ recompilation of the project every time we run it.

There may be an alternative, but for the time being we revert this change. The effect is very minor: we move some test queries to a
`sqlx_test_utils` annotated with `#[allow(dead_code)]`.

Co-authored-by: Philipp Hoenisch <[email protected]>
Co-authored-by: Lucas Soriano del Pino <[email protected]>
  • Loading branch information
3 people authored May 20, 2022
3 parents 01aaac2 + c79f973 + 117a360 commit 7da1254
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 64 deletions.
2 changes: 1 addition & 1 deletion daemon/prepare_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ trap 'rm -f $TEMPDB' EXIT
DATABASE_URL=sqlite:$TEMPDB cargo sqlx migrate run

# prepare the sqlx-data.json rust mappings
DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB SQLX_OFFLINE=true cargo sqlx prepare --merged -- --all-targets
DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB SQLX_OFFLINE=true cargo sqlx prepare
129 changes: 69 additions & 60 deletions daemon/src/db/time_to_first_position.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,79 @@ impl Connection {
}
}

// We cannot hide this under the `test` compilation flag because it
// makes it much less convenient to call `cargo sqlx prepare`.
#[allow(dead_code)]
mod sqlx_test_utils {
use super::*;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;

pub(crate) async fn load_first_seen_timestamp(
conn: &mut PoolConnection<Sqlite>,
taker_id: Identity,
) -> Result<Option<OffsetDateTime>> {
let row = sqlx::query!(
r#"
SELECT
first_seen_timestamp
FROM
time_to_first_position
WHERE
taker_id = $1
"#,
taker_id
)
.fetch_optional(&mut *conn)
.await?;

let timestamp = row
.map(|row| {
row.first_seen_timestamp
.map(OffsetDateTime::from_unix_timestamp)
})
.unwrap_or(None)
.transpose()?;

Ok(timestamp)
}

pub(crate) async fn load_first_position_timestamp(
conn: &mut PoolConnection<Sqlite>,
taker_id: Identity,
) -> Result<Option<OffsetDateTime>> {
let row = sqlx::query!(
r#"
SELECT
first_position_timestamp
FROM
time_to_first_position
WHERE
taker_id = $1
"#,
taker_id
)
.fetch_optional(&mut *conn)
.await?;

let timestamp = row
.map(|row| {
row.first_position_timestamp
.map(OffsetDateTime::from_unix_timestamp)
})
.unwrap_or(None)
.transpose()?;

Ok(timestamp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::db::memory;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use sqlx_test_utils::load_first_position_timestamp;
use sqlx_test_utils::load_first_seen_timestamp;

#[tokio::test]
async fn given_inserted_first_seen_when_trying_to_insert_second_seen_then_timestamp_does_not_change(
Expand Down Expand Up @@ -141,64 +208,6 @@ mod tests {
assert_ne!(Some(second_inserted_timestamp), second_loaded_timestamp);
}

async fn load_first_seen_timestamp(
conn: &mut PoolConnection<Sqlite>,
taker_id: Identity,
) -> Result<Option<OffsetDateTime>> {
let row = sqlx::query!(
r#"
SELECT
first_seen_timestamp
FROM
time_to_first_position
WHERE
taker_id = $1
"#,
taker_id
)
.fetch_optional(&mut *conn)
.await?;

let timestamp = row
.map(|row| {
row.first_seen_timestamp
.map(OffsetDateTime::from_unix_timestamp)
})
.unwrap_or(None)
.transpose()?;

Ok(timestamp)
}

async fn load_first_position_timestamp(
conn: &mut PoolConnection<Sqlite>,
taker_id: Identity,
) -> Result<Option<OffsetDateTime>> {
let row = sqlx::query!(
r#"
SELECT
first_position_timestamp
FROM
time_to_first_position
WHERE
taker_id = $1
"#,
taker_id
)
.fetch_optional(&mut *conn)
.await?;

let timestamp = row
.map(|row| {
row.first_position_timestamp
.map(OffsetDateTime::from_unix_timestamp)
})
.unwrap_or(None)
.transpose()?;

Ok(timestamp)
}

fn dummy_identity() -> Identity {
Identity::new(x25519_dalek::PublicKey::from(
*b"hello world, oh what a beautiful",
Expand Down
9 changes: 6 additions & 3 deletions maker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct Connection {
write: wire::Write<wire::TakerToMaker, wire::MakerToTaker>,
wire_version: wire::Version,
daemon_version: String,
address: SocketAddr,
_tasks: Tasks,
}

Expand Down Expand Up @@ -421,12 +422,13 @@ impl Actor {
) {
let this = ctx.address().expect("we are alive");

if self.connections.contains_key(&identity) {
if let Some(connection) = self.connections.get(&identity) {
tracing::debug!(
taker_id = %identity,
"Refusing to accept 2nd connection from already connected taker!"
new_address = %address,
old_address = %connection.address,
"Received second connection from taker: overwriting existing connection with new!"
);
return;
}

let _: Result<(), xtra::Error> = self
Expand Down Expand Up @@ -478,6 +480,7 @@ impl Actor {
identity,
Connection {
taker: identity,
address,
write,
wire_version: wire_version.clone(),
daemon_version: daemon_version.clone(),
Expand Down

0 comments on commit 7da1254

Please sign in to comment.