From 2aec778e240a202242772d3e6cec3766b62ce1a7 Mon Sep 17 00:00:00 2001 From: j-mendez Date: Tue, 28 Nov 2023 20:03:10 -0500 Subject: [PATCH] feat(subscription): add event subscribe --- __test__/index.spec.ts | 20 +++++++++++++ book/src/crawl.md | 22 +++++++++++++++ index.d.ts | 5 ++++ src/lib.rs | 64 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 110 insertions(+), 1 deletion(-) diff --git a/__test__/index.spec.ts b/__test__/index.spec.ts index 512ec38..cdd8fb0 100644 --- a/__test__/index.spec.ts +++ b/__test__/index.spec.ts @@ -107,3 +107,23 @@ test.skip("new website native cron", async (t) => { // should be valid unless new pages and routes are created. t.assert(links.length > 1, "should be more than one page"); }); + + +test("new website native with subscriptions", async (t) => { + const website = new Website(TEST_URL); + + const links: NPage[] = []; + + const onPageEvent = (_err: Error | null, value: NPage) => { + links.push(value); + }; + + const id = website.subscribe(onPageEvent); + + await website.crawl(); + + website.unsubscribe(id); + + // should be valid unless new pages and routes are created. + t.assert(links.length > 1, "should be more than one page"); +}); diff --git a/book/src/crawl.md b/book/src/crawl.md index 7ef5dc9..88a6d93 100644 --- a/book/src/crawl.md +++ b/book/src/crawl.md @@ -46,3 +46,25 @@ const onPageEvent = (err, value) => { await website.crawl(onPageEvent, true); // this will run instantly as the crawl is in the background ``` + + +## Subscriptions + +You can setup many subscriptions to run events when a crawl happens. + +```ts +import { Website } from "@spider-rs/spider-rs"; + +const website = new Website("https://rsseau.fr"); + +const onPageEvent = (err, value) => { + console.log(value); +}; + +const subscriptionID = website.subscribe(onPageEvent); + +await website.crawl(onPageEvent); + +website.unsubscribe(subscriptionID); +// this will run instantly as the crawl is in the background +``` diff --git a/index.d.ts b/index.d.ts index 59fa421..e5f657e 100644 --- a/index.d.ts +++ b/index.d.ts @@ -21,7 +21,12 @@ export class NWebsite { } /** a website holding the inner spider::website::Website from Rust fit for nodejs */ export class Website { + /** a new website */ constructor(url: string) + /** subscribe and add an event listener */ + subscribe(onPageEvent: (err: Error | null, value: NPage) => any): number + /** remove a subscription listener */ + unsubscribe(id?: number | undefined | null): boolean /** crawl a website */ crawl(onPageEvent?: (err: Error | null, value: NPage) => any | undefined | null, background?: boolean | undefined | null): Promise /** run the cron */ diff --git a/src/lib.rs b/src/lib.rs index ee7b4fb..f9012d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,7 @@ use std::time::Duration; use compact_str::CompactString; use napi::{bindgen_prelude::Object, tokio::task::JoinHandle}; -use spider::lazy_static::lazy_static; +use spider::{hashbrown::HashMap, lazy_static::lazy_static}; lazy_static! { pub static ref BUFFER: usize = (num_cpus::get() * 20).max(88); @@ -78,16 +78,78 @@ pub async fn crawl(url: String) -> NWebsite { pub struct Website { /// the website from spider inner: spider::website::Website, + /// spawn subscription handles + subscription_handles: HashMap>, } #[napi] impl Website { #[napi(constructor)] + /// a new website pub fn new(url: String) -> Self { Website { inner: spider::website::Website::new(&url), + subscription_handles: HashMap::new(), } } + + #[napi] + /// subscribe and add an event listener + pub fn subscribe( + &mut self, + on_page_event: napi::threadsafe_function::ThreadsafeFunction, + ) -> u32 { + let mut rx2 = self + .inner + .subscribe(*BUFFER / 2) + .expect("sync feature should be enabled"); + + let handle = spider::tokio::spawn(async move { + while let Ok(res) = rx2.recv().await { + on_page_event.call( + Ok(NPage { + url: res.get_url().into(), + content: res.get_html().into(), + }), + napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking, + ); + } + }); + + let id = self.subscription_handles.len() as u32; + + self.subscription_handles.insert(id, handle); + + id + } + + #[napi] + /// remove a subscription listener + pub fn unsubscribe(&mut self, id: Option) -> bool { + match id { + Some(id) => { + let handle = self.subscription_handles.get(&id); + + match handle { + Some(h) => { + h.abort(); + self.subscription_handles.remove_entry(&id); + true + } + _ => false, + } + } + // we may want to get all subs and remove them + _ => { + let keys = self.subscription_handles.len(); + for k in self.subscription_handles.drain() { + k.1.abort(); + } + keys > 0 + } + } + } + #[napi] /// crawl a website pub async unsafe fn crawl(