Skip to content

Commit

Permalink
chore: more experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Sep 9, 2024
1 parent a1fd96a commit 873d345
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 49 deletions.
124 changes: 88 additions & 36 deletions rust/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#![deny(clippy::all)]

use std::{
net::{IpAddr, SocketAddr}, sync::Arc, vec
cell::UnsafeCell, net::{IpAddr, SocketAddr}, sync::Arc, vec
};

use napi::{bindgen_prelude::*, JsArrayBufferValue, JsBuffer, JsNumber, JsObject, JsTypedArray, JsUndefined, Ref};
use napi::{bindgen_prelude::*, JsArrayBufferValue, JsBuffer, JsBufferValue, JsNumber, JsObject, JsTypedArray, JsUndefined, Ref};
use napi_derive::napi;
use quinn::SendStream;
use quinn::{RecvStream, SendStream};
use tokio::sync::Mutex;

mod config;
Expand Down Expand Up @@ -206,15 +206,15 @@ impl Connection {
#[napi]
pub struct Stream {
send: Arc<Mutex<quinn::SendStream>>,
recv: quinn::RecvStream,
recv: Arc<Mutex<quinn::RecvStream>>,
// send: Arc<quinn::SendStream>,
// recv: Arc<quinn::RecvStream>,
}

#[napi]
impl Stream {
pub fn new(send: quinn::SendStream, recv: quinn::RecvStream) -> Self {
Self { send: Arc::new(Mutex::new(send)), recv }
Self { send: Arc::new(Mutex::new(send)), recv: Arc::new(Mutex::new(recv)) }
}

#[napi]
Expand All @@ -230,7 +230,7 @@ impl Stream {

#[napi]
pub async unsafe fn read(&mut self, mut buf: Uint8Array) -> Result<Option<u32>> {
let chunk = self.recv.read(buf.as_mut()).await.map_err(to_err)?;
let chunk = self.recv.lock().await.read(buf.as_mut()).await.map_err(to_err)?;
match chunk {
Some(len) => Ok(Some(len as u32)),
None => Ok(None),
Expand All @@ -240,13 +240,54 @@ impl Stream {
#[napi]
pub async unsafe fn read2(&mut self) -> Result<Option<Uint8Array>> {
let mut buf = vec![0u8; 1024];
let chunk = self.recv.read(buf.as_mut()).await.map_err(to_err)?;
let chunk = self.recv.lock().await.read(buf.as_mut()).await.map_err(to_err)?;
match chunk {
Some(len) => Ok(Some(Uint8Array::with_data_copied(&buf[..len as usize])),),
None => Ok(None),
}
}

#[napi(ts_return_type = "Promise<number | undefined>")]
pub fn read3(&mut self, env: Env, #[napi(ts_arg_type = "Buffer")] data: JsBuffer) -> Result<JsObject> {
let data = data.into_ref()?;
let recv = self.recv.clone();

env.execute_tokio_future(async move {
// unsafe, but we know the data is not going to be modified by JS
let d = data.as_ref();
let data_mut = unsafe {
let ptr = d.as_ptr() as *mut u8;
std::slice::from_raw_parts_mut(ptr, d.len())
};
let mut recv = recv.lock().await;
let chunk = recv.read(
data_mut
).await.map_err(to_err)?;
match chunk {
Some(len) => Ok((Some(len as u32), data)),
None => Ok((None, data)),
}
}, move |env, output| {
let (output, mut data) = output;

println!("{:?}", data.unref(*env)?);
if let Some(output) = output {
env.create_uint32(output).and_then(|n| Ok(n.into_unknown()))
} else {
env.get_undefined().and_then(|u| Ok(u.into_unknown()))
}
})
}

#[napi(ts_return_type = "Promise<number | undefined>")]
pub fn read4(&mut self, data: JsBuffer) -> AsyncTask<Read> {
let data = data.into_ref().unwrap();
AsyncTask::new(Read {
buf: data,
recv: self.recv.clone(),
})
}

#[napi]
pub fn write2(&mut self, #[napi(ts_arg_type = "Uint8Array")] data: JsTypedArray) -> Result<AsyncTask<Write>> {
let data = data.into_value()?;
Expand Down Expand Up @@ -289,8 +330,8 @@ impl Stream {
}

#[napi]
pub fn stop_read(&mut self) {
let _ = self.recv.stop(0u8.into());
pub async unsafe fn stop_read(&mut self) {
let _ = self.recv.lock().await.stop(0u8.into());
}
}

Expand Down Expand Up @@ -326,32 +367,43 @@ impl Task for Write {
}
}

// struct Read {
// buf: Uint8Array,
// stream: Stream,
// }

// impl Task for Read {
// type Output = Option<u32>;
// type JsValue = Either<JsNumber, JsUndefined>;

// fn compute(&mut self) -> Result<Self::Output> {
// block_on(async move {
// let chunk = self.stream.recv.read(self.buf.as_mut()).await.map_err(to_err)?;
// match chunk {
// Some(len) => Ok(Some(len as u32)),
// None => Ok(None),
// }
// })
// }

// fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
// if let Some(output) = output {
// env.create_uint32(output).map(Either::A)
// } else {
// env.get_undefined().map(Either::B)
// }
// }
// }
pub struct Read {
buf: Ref<JsBufferValue>,
recv: Arc<Mutex<RecvStream>>,
}

impl Task for Read {
type Output = Option<u32>;
type JsValue = Either<JsNumber, JsUndefined>;

fn compute(&mut self) -> Result<Self::Output> {
block_on(async move {
// unsafe, but we know the data is not going to be modified by JS
let d = self.buf.as_ref();
let data_mut = unsafe {
let ptr = d.as_ptr() as *mut u8;
std::slice::from_raw_parts_mut(ptr, d.len())
};
let chunk = self.recv.lock().await.read(data_mut).await.map_err(to_err)?;
match chunk {
Some(len) => Ok(Some(len as u32)),
None => Ok(None),
}
})
}

fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
if let Some(output) = output {
env.create_uint32(output).map(Either::A)
} else {
env.get_undefined().map(Either::B)
}
}

fn finally(&mut self, env: Env) -> Result<()> {
self.buf.unref(env)?;
Ok(())
}
}

// mod out;
4 changes: 3 additions & 1 deletion src/napi.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,11 @@ export declare class Stream {
write(data: Uint8Array): Promise<void>
read(buf: Uint8Array): Promise<number | null>
read2(): Promise<Uint8Array | null>
read3(data: Buffer): Promise<number | undefined>
read4(data: Buffer): Promise<number | undefined>
write2(data: Uint8Array): Promise<unknown>
write3(data: Uint8Array): Promise<undefined>
finishWrite(): Promise<void>
resetWrite(): Promise<void>
stopRead(): void
stopRead(): Promise<void>
}
24 changes: 13 additions & 11 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,22 @@ export class QuicStream implements Stream {
try {
while (true) {
this.log.trace('', this.id, 'reading')
// const chunk = Buffer.allocUnsafe(CHUNK_SIZE)
// const length = await this.#stream.read(chunk)
// if (length == null) {
// this.log.trace('', this.id, 'no more data')
// break
// }
// yield new Uint8ArrayList(chunk.subarray(0, length))
const chunk = await this.#stream.read2()
if (chunk == null) {
const chunk = Buffer.allocUnsafe(CHUNK_SIZE)
const length = await this.#stream.read4(chunk)
if (length == null) {
this.log.trace('', this.id, 'no more data')
break
}
yield new Uint8ArrayList(chunk)
this.log.trace('', this.id, 'read', chunk.length, 'bytes')
yield new Uint8ArrayList(chunk.subarray(0, length))
this.log.trace('', this.id, 'read', length, 'bytes')

// const chunk = await this.#stream.read2()
// if (chunk == null) {
// this.log.trace('', this.id, 'no more data')
// break
// }
// yield new Uint8ArrayList(chunk)
// this.log.trace('', this.id, 'read', chunk.length, 'bytes')
}
} catch (e) {
this.log.error('source error', this.id, e)
Expand Down
2 changes: 1 addition & 1 deletion test/compliance.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { quic } from '../src/index.js'
import { QuicTransport } from '../src/transport.js'
import { createComponents } from './util.js'

describe('Interface compliance tests', () => {
describe.only('Interface compliance tests', () => {
// Fails these listener tests:
// - close listener with connections, through timeout
// - should not handle connection if upgradeInbound throws
Expand Down

0 comments on commit 873d345

Please sign in to comment.