Skip to content

Commit

Permalink
chore(drip): fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
cor committed Oct 18, 2024
1 parent a34b9f6 commit a39e82b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 93 deletions.
3 changes: 3 additions & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ qypy
rabby
randao
rapide
rarray
ratelimit
ratelimited
rawfile
Expand Down Expand Up @@ -1069,6 +1070,7 @@ relpath
remappings
repellendus
repr
reqs
reqwest
resizer
restaking
Expand Down Expand Up @@ -1406,6 +1408,7 @@ viem
virtualisation
vjgsyknnhmr
vmdebug
vtab
vuex
wagmi
wakey
Expand Down
9 changes: 7 additions & 2 deletions drip/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,34 @@

Faucet for Cosmos chains: [app.union.build/faucet]. Supports multiple chains and multiple denoms per chains.

[app.union.build/faucet]: https://app.union.build/faucet

## Example usage

Commands are ran from repo root

Tab 1, Union Devnet:

```sh
nix run .#devnet-union -L

```

Tab 2, Stargaze Devnet (optional, multi-chain demo):

```sh
nix run .#devnet-stargaze -L
```

Tab 3, Drip:

```sh
nix run .#drip -- -c ./drip/config.json
```

Tab 4, Request:

```sh
cat ./drip/example-requests/union-devnet.json | http POST localhost:8000
cat ./drip/example-requests/stargaze-devnet.json | http POST localhost:8000
```

[app.union.build/faucet]: https://app.union.build/faucet
200 changes: 109 additions & 91 deletions drip/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,106 +119,124 @@ async fn main() {
info!("spawning worker thread");
let chain = chain.clone();
let handle = tokio::spawn(async move {
// recreate each time so that if this task panics, the keyring gets rebuilt
// make sure to panic *here* so that the tokio task will catch the panic!
info!(chain_id=chain.id, "creating chain client");
let chain_client = ChainClient::new(&chain).await;
info!(chain_id=chain.id, "entering worker poll loop");
loop {
let chain = chain.clone();
let requests: Vec<SendRequest> = pool
.conn(move |conn| {
// recreate each time so that if this task panics, the keyring gets rebuilt
// make sure to panic *here* so that the tokio task will catch the panic!
info!(chain_id = chain.id, "creating chain client");
let chain_client = ChainClient::new(&chain).await;
info!(chain_id = chain.id, "entering worker poll loop");
loop {
let chain = chain.clone();
let requests: Vec<SendRequest> = pool
.conn(move |conn| {
let mut stmt = conn
.prepare_cached(
"SELECT id, denom, address FROM requests
WHERE tx_hash IS NULL AND chain_id IS ?1 LIMIT ?2",
)
.expect("???");

let mut rows = stmt
.query((&chain.id, batch_size as i64))
.expect("can't query rows");

let mut requests = vec![];

while let Some(row) = rows.next().expect("could not read row") {
let id: i64 = row.get(0).expect("could not read id");
let denom: String = row.get(1).expect("could not read denom");
let receiver: String =
row.get(2).expect("could not read address");

let Some(coin) =
chain.coins.iter().find(|coin| coin.denom == denom)
else {
error!(
%denom,
chain_id=&chain.id,
"dropping request for unknown denom");
break;
};

requests.push(SendRequest {
id,
receiver,
denom,
amount: coin.amount,
}); //todo fix amount
}

Ok(requests)
})
.await
.expect("pool error");

if requests.is_empty() {
debug!("no requests in queue");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
let mut i = 0;

// try sending batch 5 times
let result = loop {
let send_res = chain_client.send(&requests).await;

match send_res {
Err(err) => {
if i >= 5 {
break format!("ERROR: {}", ErrorReporter(err));
}
warn!(
err = %ErrorReporter(err),
attempt = i,
"unable to submit transaction"
);
i += 1;
}
// this will be displayed to users, print the hash in the same way that cosmos sdk does
Ok(tx_hash) => {
break tx_hash
.into_encoding::<HexUnprefixed>()
.to_string()
.to_uppercase()
}
};
};

pool.conn(move |conn| {
debug!("loading vtab array module required for `IN (1,42,76,...)`");
rusqlite::vtab::array::load_module(conn)
.expect("error loading vtab array module");

let mut stmt = conn
.prepare_cached(
"SELECT id, denom, address FROM requests WHERE tx_hash IS NULL AND chain_id IS ?1 LIMIT ?2",
"UPDATE requests SET tx_hash = ?1 WHERE id IN rarray(?2)",
)
.expect("???");

let mut rows = stmt.query((&chain.id, batch_size as i64)).expect("can't query rows");

let mut requests = vec![];


while let Some(row) = rows.next().expect("could not read row") {
let id: i64 = row.get(0).expect("could not read id");
let denom: String = row.get(1).expect("could not read denom");
let receiver: String = row.get(2).expect("could not read address");

let Some(coin) = chain.coins.iter().find(|coin| coin.denom == denom) else {
error!(%denom, chain_id=&chain.id, "dropping request for unknown denom");
break;
};


requests.push(SendRequest { id, receiver, denom, amount: coin.amount }); //todo fix amount
}

Ok(requests)
// https://docs.rs/rusqlite/latest/rusqlite/vtab/array/index.html
let rows_modified = stmt
.execute((
&result,
Rc::new(
requests
.iter()
.map(|req| req.id)
.map(rusqlite::types::Value::from)
.collect::<Vec<rusqlite::types::Value>>(),
),
))
.expect("can't query rows");

info!(rows_modified, "updated requests");

Ok(())
})
.await
.expect("pool error");

if requests.is_empty() {
debug!("no requests in queue");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
let mut i = 0;

// try sending batch 5 times
let result = loop {
let send_res = chain_client
.send(&requests).await;

match send_res {
Err(err) => {
if i >= 5 {
break format!("ERROR: {}", ErrorReporter(err));
}
warn!(
err = %ErrorReporter(err),
attempt = i,
"unable to submit transaction"
);
i += 1;
}
// this will be displayed to users, print the hash in the same way that cosmos sdk does
Ok(tx_hash) => break tx_hash.into_encoding::<HexUnprefixed>().to_string().to_uppercase(),
};
};

pool.conn(move |conn| {
debug!("loading vtab array module required for `IN (1,42,76,...)`");
rusqlite::vtab::array::load_module(conn).expect("error loading vtab array module");

let mut stmt = conn
.prepare_cached(
"UPDATE requests SET tx_hash = ?1 WHERE id IN rarray(?2)",
)
.expect("???");

// https://docs.rs/rusqlite/latest/rusqlite/vtab/array/index.html
let rows_modified = stmt
.execute((
&result,
Rc::new(requests.iter().map(|req| req.id)
.map(rusqlite::types::Value::from)
.collect::<Vec<rusqlite::types::Value>>())
))



.expect("can't query rows");

info!(rows_modified, "updated requests");

Ok(())
})
.await
.expect("pool error");
}
})
.await;
})
.await;

match handle {
Ok(()) => {}
Expand Down

0 comments on commit a39e82b

Please sign in to comment.