Skip to content

Commit

Permalink
Merge pull request #7 from rustaceanrob/new-db-integration
Browse files Browse the repository at this point in the history
node: update node with new peer db
  • Loading branch information
rustaceanrob authored Jun 19, 2024
2 parents aab4152 + f0a3254 commit 5886ec4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 275 deletions.
4 changes: 2 additions & 2 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ async fn main() {
addresses.insert(address);
// Add preferred peers to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, mut client) = builder
// Add the peers
.add_peers(vec![(peer, 38333), (peer_2, 38333)])
.add_peers(vec![(peer, 38333)])
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
Expand Down
199 changes: 4 additions & 195 deletions src/db/sqlite/peer_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use bitcoin::p2p::{Address, ServiceFlags};
use bitcoin::p2p::ServiceFlags;
use bitcoin::Network;
use rusqlite::params;
use rusqlite::{Connection, Result};
Expand All @@ -12,31 +12,6 @@ use tokio::sync::Mutex;
use crate::db::error::DatabaseError;
use crate::db::traits::PeerStore;
use crate::db::PersistedPeer;
use crate::prelude::default_port_from_network;

const MAXIMUM_TABLE_SIZE: i64 = 256;

const NEW_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers (
ip_addr TEXT PRIMARY KEY,
port INTEGER NOT NULL,
last_seen INTEGER
)";

const TRIED_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers (
ip_addr TEXT PRIMARY KEY,
services INTEGER NOT NULL,
port INTEGER NOT NULL
)";

const CPF_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS cpfpeers (
ip_addr TEXT PRIMARY KEY,
port INTEGER NOT NULL
)";

const BAN_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS banned (
ip_addr TEXT PRIMARY KEY,
banned_until INTEGER
)";

const PEER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers (
ip_addr TEXT PRIMARY KEY,
Expand All @@ -48,176 +23,10 @@ const PEER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers (

#[derive(Debug)]
pub(crate) struct SqlitePeerDb {
default_port: u16,
conn_new: Arc<Mutex<Connection>>,
conn_tried: Arc<Mutex<Connection>>,
conn_evict: Arc<Mutex<Connection>>,
conn_cpf: Arc<Mutex<Connection>>,
conn_ban: Arc<Mutex<Connection>>,
}

impl SqlitePeerDb {
pub fn new(network: Network, path: Option<PathBuf>) -> Result<Self> {
let default_port = default_port_from_network(&network);
let mut path = path.unwrap_or_else(|| PathBuf::from("."));
path.push("data");
path.push(network.to_string());
if !path.exists() {
fs::create_dir_all(&path).unwrap();
}
let conn_new = Connection::open(path.join("peers_new.db"))?;
let conn_tried = Connection::open(path.join("peers_tried.db"))?;
let conn_evict = Connection::open(path.join("peers_evict.db"))?;
let conn_cpf = Connection::open(path.join("peers_cpf.db"))?;
let conn_ban = Connection::open(path.join("peers_ban.db"))?;
conn_new.execute(NEW_SCHEMA, [])?;
conn_evict.execute(NEW_SCHEMA, [])?;
conn_tried.execute(TRIED_SCHEMA, [])?;
conn_cpf.execute(CPF_SCHEMA, [])?;
conn_ban.execute(BAN_SCHEMA, [])?;
Ok(Self {
default_port,
conn_new: Arc::new(Mutex::new(conn_new)),
conn_tried: Arc::new(Mutex::new(conn_tried)),
conn_evict: Arc::new(Mutex::new(conn_evict)),
conn_cpf: Arc::new(Mutex::new(conn_cpf)),
conn_ban: Arc::new(Mutex::new(conn_ban)),
})
}

pub async fn add_new(
&mut self,
ip: IpAddr,
port: Option<u16>,
last_seen: Option<u32>,
) -> Result<()> {
let lock = self.conn_new.lock().await;
let new_count: i64 = lock.query_row("SELECT COUNT(*) FROM peers", [], |row| row.get(0))?;
let peer_banned = self.is_banned(&ip).await?;

if new_count < MAXIMUM_TABLE_SIZE && !peer_banned {
lock.execute(
"INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) VALUES (?1, ?2, ?3)",
params![
ip.to_string(),
port.unwrap_or(self.default_port),
last_seen.unwrap_or(0),
],
)?;
} else if !peer_banned {
self.evict_random_peer().await?;
lock.execute(
"INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) VALUES (?1, ?2, ?3)",
params![
ip.to_string(),
port.unwrap_or(self.default_port),
last_seen.unwrap_or(0),
],
)?;
}
Ok(())
}

pub async fn add_cpf_peers(&mut self, peers: Vec<Address>) -> Result<()> {
let mut lock = self.conn_cpf.lock().await;
let tx = lock.transaction()?;
for peer in peers {
let ip = peer
.socket_addr()
.expect("peers should have been screened")
.ip()
.to_string();
tx.execute(
"INSERT OR IGNORE INTO cpfpeers (ip_addr, port) VALUES (?1, ?2)",
params![ip, peer.port],
)?;
}
tx.commit()?;
Ok(())
}

async fn is_banned(&self, ip_addr: &IpAddr) -> Result<bool> {
let lock = self.conn_ban.lock().await;
let mut stmt = lock.prepare("SELECT COUNT(*) FROM banned WHERE ip_addr = ?1")?;
let count: i64 = stmt.query_row(params![ip_addr.to_string()], |row| row.get(0))?;
Ok(count > 0)
}

async fn evict_random_peer(&self) -> Result<()> {
let lock = self.conn_new.lock().await;
let mut stmt = lock.prepare("SELECT ip_addr FROM peers ORDER BY RANDOM() LIMIT 1")?;
let ip_addr: String = stmt.query_row([], |row| row.get(0))?;

lock.execute(
"INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) SELECT ip_addr, port, last_seen FROM peers WHERE ip_addr = ?1",
params![ip_addr],
)?;

lock.execute("DELETE FROM peers WHERE ip_addr = ?1", params![ip_addr])?;

Ok(())
}

pub async fn get_random_new(&mut self) -> Result<Option<(IpAddr, u16)>> {
if let Ok(Some(peer)) = self.get_random_from_evict().await {
return Ok(Some(peer));
}
let lock = self.conn_new.lock().await;
let mut stmt = lock.prepare("SELECT ip_addr, port FROM peers ORDER BY RANDOM() LIMIT 1")?;
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
let ip_addr: String = row.get(0)?;
let port: u16 = row.get(1)?;
let ip = ip_addr
.parse::<IpAddr>()
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(Some((ip, port)))
} else {
Ok(None)
}
}

async fn get_random_from_evict(&mut self) -> Result<Option<(IpAddr, u16)>> {
let lock = self.conn_evict.lock().await;
let mut stmt = lock.prepare("SELECT ip_addr, port FROM peers ORDER BY RANDOM() LIMIT 1")?;
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
let ip_addr: String = row.get(0)?;
let port: u16 = row.get(1)?;
lock.execute("DELETE FROM peers WHERE ip_addr = ?1", [&ip_addr])?;
let ip = ip_addr
.parse::<IpAddr>()
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(Some((ip, port)))
} else {
Ok(None)
}
}

pub async fn get_random_cpf_peer(&mut self) -> Result<Option<(IpAddr, u16)>> {
let lock = self.conn_cpf.lock().await;
let mut stmt =
lock.prepare("SELECT ip_addr, port FROM cpfpeers ORDER BY RANDOM() LIMIT 1")?;
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
let ip_addr: String = row.get(0)?;
let port: u16 = row.get(1)?;
let ip = ip_addr
.parse::<IpAddr>()
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(Some((ip, port)))
} else {
Ok(None)
}
}
}

#[derive(Debug)]
pub(crate) struct SqlPeerDb {
conn: Arc<Mutex<Connection>>,
}

impl SqlPeerDb {
impl SqlitePeerDb {
pub fn new(network: Network, path: Option<PathBuf>) -> Result<Self, DatabaseError> {
let mut path = path.unwrap_or_else(|| PathBuf::from("."));
path.push("data");
Expand All @@ -227,7 +36,7 @@ impl SqlPeerDb {
}
let conn =
Connection::open(path.join("peers.db")).map_err(|_| DatabaseError::WriteError)?;
conn.execute(NEW_SCHEMA, [])
conn.execute(PEER_SCHEMA, [])
.map_err(|_| DatabaseError::WriteError)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
Expand All @@ -236,7 +45,7 @@ impl SqlPeerDb {
}

#[async_trait]
impl PeerStore for SqlPeerDb {
impl PeerStore for SqlitePeerDb {
async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError> {
let lock = self.conn.lock().await;
let stmt = if !peer.tried {
Expand Down
Loading

0 comments on commit 5886ec4

Please sign in to comment.