Skip to content

Commit

Permalink
feat(subscription): add event subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Nov 29, 2023
1 parent 302a328 commit 2aec778
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 1 deletion.
20 changes: 20 additions & 0 deletions __test__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,23 @@ test.skip("new website native cron", async (t) => {
// should be valid unless new pages and routes are created.
t.assert(links.length > 1, "should be more than one page");
});


test("new website native with subscriptions", async (t) => {
const website = new Website(TEST_URL);

const links: NPage[] = [];

const onPageEvent = (_err: Error | null, value: NPage) => {
links.push(value);
};

const id = website.subscribe(onPageEvent);

await website.crawl();

website.unsubscribe(id);

// should be valid unless new pages and routes are created.
t.assert(links.length > 1, "should be more than one page");
});
22 changes: 22 additions & 0 deletions book/src/crawl.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,25 @@ const onPageEvent = (err, value) => {
await website.crawl(onPageEvent, true);
// this will run instantly as the crawl is in the background
```


## Subscriptions

You can setup many subscriptions to run events when a crawl happens.

```ts
import { Website } from "@spider-rs/spider-rs";

const website = new Website("https://rsseau.fr");

const onPageEvent = (err, value) => {
console.log(value);
};

const subscriptionID = website.subscribe(onPageEvent);

await website.crawl(onPageEvent);

website.unsubscribe(subscriptionID);
// this will run instantly as the crawl is in the background
```
5 changes: 5 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ export class NWebsite {
}
/** a website holding the inner spider::website::Website from Rust fit for nodejs */
export class Website {
/** a new website */
constructor(url: string)
/** subscribe and add an event listener */
subscribe(onPageEvent: (err: Error | null, value: NPage) => any): number
/** remove a subscription listener */
unsubscribe(id?: number | undefined | null): boolean
/** crawl a website */
crawl(onPageEvent?: (err: Error | null, value: NPage) => any | undefined | null, background?: boolean | undefined | null): Promise<void>
/** run the cron */
Expand Down
64 changes: 63 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;

use compact_str::CompactString;
use napi::{bindgen_prelude::Object, tokio::task::JoinHandle};
use spider::lazy_static::lazy_static;
use spider::{hashbrown::HashMap, lazy_static::lazy_static};

lazy_static! {
pub static ref BUFFER: usize = (num_cpus::get() * 20).max(88);
Expand Down Expand Up @@ -78,16 +78,78 @@ pub async fn crawl(url: String) -> NWebsite {
pub struct Website {
/// the website from spider
inner: spider::website::Website,
/// spawn subscription handles
subscription_handles: HashMap<u32, JoinHandle<()>>,
}

#[napi]
impl Website {
#[napi(constructor)]
/// a new website
pub fn new(url: String) -> Self {
Website {
inner: spider::website::Website::new(&url),
subscription_handles: HashMap::new(),
}
}

#[napi]
/// subscribe and add an event listener
pub fn subscribe(
&mut self,
on_page_event: napi::threadsafe_function::ThreadsafeFunction<NPage>,
) -> u32 {
let mut rx2 = self
.inner
.subscribe(*BUFFER / 2)
.expect("sync feature should be enabled");

let handle = spider::tokio::spawn(async move {
while let Ok(res) = rx2.recv().await {
on_page_event.call(
Ok(NPage {
url: res.get_url().into(),
content: res.get_html().into(),
}),
napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking,
);
}
});

let id = self.subscription_handles.len() as u32;

self.subscription_handles.insert(id, handle);

id
}

#[napi]
/// remove a subscription listener
pub fn unsubscribe(&mut self, id: Option<u32>) -> bool {
match id {
Some(id) => {
let handle = self.subscription_handles.get(&id);

match handle {
Some(h) => {
h.abort();
self.subscription_handles.remove_entry(&id);
true
}
_ => false,
}
}
// we may want to get all subs and remove them
_ => {
let keys = self.subscription_handles.len();
for k in self.subscription_handles.drain() {
k.1.abort();
}
keys > 0
}
}
}

#[napi]
/// crawl a website
pub async unsafe fn crawl(
Expand Down

0 comments on commit 2aec778

Please sign in to comment.