From 4eaab42baf2be1bd53fde1ad0ccc2a6ce36ccc20 Mon Sep 17 00:00:00 2001 From: Robert Long Date: Wed, 4 Dec 2024 13:18:48 -0800 Subject: [PATCH 1/3] Add support for HTTP Streaming --- Cargo.lock | 1 + plugins/http/Cargo.toml | 1 + plugins/http/api-iife.js | 2 +- plugins/http/guest-js/index.ts | 49 ++++++++++++++++++++++------------ plugins/http/src/commands.rs | 26 ++++++++++++++++-- 5 files changed, 59 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 144ee6ab6..675a5e7b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6621,6 +6621,7 @@ name = "tauri-plugin-http" version = "2.0.4" dependencies = [ "data-url", + "futures-util", "http", "regex", "reqwest", diff --git a/plugins/http/Cargo.toml b/plugins/http/Cargo.toml index e4303f196..76c2cca20 100644 --- a/plugins/http/Cargo.toml +++ b/plugins/http/Cargo.toml @@ -42,6 +42,7 @@ reqwest = { version = "0.12", default-features = false } url = { workspace = true } data-url = "0.3" tracing = { workspace = true, optional = true } +futures-util = "0.3.31" [features] default = [ diff --git a/plugins/http/api-iife.js b/plugins/http/api-iife.js index 0cfeb063a..8d6dbfbbc 100644 --- a/plugins/http/api-iife.js +++ b/plugins/http/api-iife.js @@ -1 +1 @@ -if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request canceled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy);const d=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,c=new Request(e,n),u=await c.arrayBuffer(),f=0!==u.byteLength?Array.from(new Uint8Array(u)):null;for(const[e,t]of c.headers)d.get(e)||d.set(e,t);const _=(d instanceof Headers?Array.from(d.entries()):Array.isArray(d)?d:Object.entries(d)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const h=await t("plugin:http|fetch",{clientConfig:{method:c.method,url:c.url,headers:_,data:f,maxRedirections:o,connectTimeout:s,proxy:i}}),l=()=>t("plugin:http|fetch_cancel",{rid:h});if(a?.aborted)throw l(),new Error(r);a?.addEventListener("abort",(()=>{l()}));const{status:p,statusText:w,url:y,headers:T,rid:A}=await t("plugin:http|fetch_send",{rid:h}),g=await t("plugin:http|fetch_read_body",{rid:A}),R=new Response(g instanceof ArrayBuffer&&0!==g.byteLength?g:g instanceof Array&&g.length>0?new Uint8Array(g):null,{status:p,statusText:w});return Object.defineProperty(R,"url",{value:y}),Object.defineProperty(R,"headers",{value:new Headers(T)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} +if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("a"===r&&!n)throw new TypeError("Private accessor was defined without a getter");if("function"==typeof t?e!==t||!n:!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t?e!==t||!s:!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,o;"function"==typeof SuppressedError&&SuppressedError;const a="__TAURI_TO_IPC_KEY__";class i{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),o.set(this,{}),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:a})=>{if(a===t(this,s,"f")){r(this,s,a+1),t(this,n,"f").call(this,e);const i=Object.keys(t(this,o,"f"));if(i.length>0){let e=a+1;for(const r of i.sort()){if(parseInt(r)!==e)break;{const s=t(this,o,"f")[r];delete t(this,o,"f")[r],t(this,n,"f").call(this,s),e+=1}}r(this,s,e)}}else t(this,o,"f")[a.toString()]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,o=new WeakMap,a)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[a]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,o=t?.proxy;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy);const a=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),f=await h.arrayBuffer(),_=0!==f.byteLength?Array.from(new Uint8Array(f)):null;for(const[e,t]of h.headers)a.get(e)||a.set(e,t);const u=(a instanceof Headers?Array.from(a.entries()):Array.isArray(a)?a:Object.entries(a)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const l=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:u,data:_,maxRedirections:n,connectTimeout:s,proxy:o}}),w=()=>c("plugin:http|fetch_cancel",{rid:l});if(r?.aborted)throw w(),new Error(d);r?.addEventListener("abort",(()=>{w()}));const{status:p,statusText:y,url:m,headers:T,rid:g}=await c("plugin:http|fetch_send",{rid:l}),b=new ReadableStream({start(e){const t=new i;t.onmessage=t=>{const r=new Uint8Array(t);0===r.length?e.close():e.enqueue(r)};c("plugin:http|fetch_read_body",{rid:g,channel:t}).catch((t=>{console.error("error reading body",t),e.error(t)}))}}),A=new Response(b,{status:p,statusText:y});return Object.defineProperty(A,"url",{value:m}),Object.defineProperty(A,"headers",{value:new Headers(T)}),A},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} diff --git a/plugins/http/guest-js/index.ts b/plugins/http/guest-js/index.ts index 4362e893a..7d918883c 100644 --- a/plugins/http/guest-js/index.ts +++ b/plugins/http/guest-js/index.ts @@ -26,7 +26,7 @@ * @module */ -import { invoke } from '@tauri-apps/api/core' +import { invoke, Channel } from '@tauri-apps/api/core' /** * Configuration of a proxy that a Client should pass requests to. @@ -206,24 +206,39 @@ export async function fetch( rid }) - const body = await invoke( - 'plugin:http|fetch_read_body', - { - rid: responseRid + // Create ReadableStream from channel messages + const stream = new ReadableStream({ + start(controller) { + const channel = new Channel() + channel.onmessage = (arr) => { + const chunk = new Uint8Array(arr) + + // End the stream if the chunk is empty + if (chunk.length === 0) { + controller.close() + } else { + controller.enqueue(chunk) + } + } + + // Start reading body in background + const readPromise = invoke('plugin:http|fetch_read_body', { + rid: responseRid, + channel + }) + + // If the promise fails, make sure the stream is closed + readPromise.catch((e) => { + console.error('error reading body', e) + controller.error(e) + }) } - ) + }) - const res = new Response( - body instanceof ArrayBuffer && body.byteLength !== 0 - ? body - : body instanceof Array && body.length > 0 - ? new Uint8Array(body) - : null, - { - status, - statusText - } - ) + const res = new Response(stream, { + status, + statusText + }) // url and headers are read only properties // but seems like we can set them like this diff --git a/plugins/http/src/commands.rs b/plugins/http/src/commands.rs index 03c84adf5..b72dabe9b 100644 --- a/plugins/http/src/commands.rs +++ b/plugins/http/src/commands.rs @@ -4,6 +4,7 @@ use std::{future::Future, pin::Pin, str::FromStr, sync::Arc, time::Duration}; +use futures_util::StreamExt; use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode}; use reqwest::{redirect::Policy, NoProxy}; use serde::{Deserialize, Serialize}; @@ -386,13 +387,34 @@ pub async fn fetch_send( pub(crate) async fn fetch_read_body( webview: Webview, rid: ResourceId, -) -> crate::Result { + channel: tauri::ipc::Channel<&[u8]>, +) -> crate::Result<()> { let res = { let mut resources_table = webview.resources_table(); resources_table.take::(rid)? }; + let res = Arc::into_inner(res).unwrap().0; - Ok(tauri::ipc::Response::new(res.bytes().await?.to_vec())) + let mut stream = res.bytes_stream(); + + while let Some(chunk) = stream.next().await { + match chunk { + Ok(bytes) => { + // Skip empty chunks + if bytes.len() > 0 { + channel.send(&bytes)?; + } + } + Err(e) => { + return Err(e.into()); + } + } + } + + // Send an empty chunk to signal the end of the stream + channel.send(&[])?; + + Ok(()) } // forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers From e503a77cfb6e36ece8342e02b8bd157693fb2972 Mon Sep 17 00:00:00 2001 From: Robert Long Date: Wed, 4 Dec 2024 14:02:49 -0800 Subject: [PATCH 2/3] Fix linting errors --- plugins/http/src/commands.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/http/src/commands.rs b/plugins/http/src/commands.rs index b72dabe9b..965ddd09c 100644 --- a/plugins/http/src/commands.rs +++ b/plugins/http/src/commands.rs @@ -401,7 +401,7 @@ pub(crate) async fn fetch_read_body( match chunk { Ok(bytes) => { // Skip empty chunks - if bytes.len() > 0 { + if !bytes.is_empty() { channel.send(&bytes)?; } } From 3f9f3363596b5817308bfb9f3261fee07a71ee8f Mon Sep 17 00:00:00 2001 From: Robert Long Date: Thu, 5 Dec 2024 09:51:15 -0800 Subject: [PATCH 3/3] Clean up channel.onmessage when stream terminates --- plugins/http/api-iife.js | 2 +- plugins/http/guest-js/index.ts | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/plugins/http/api-iife.js b/plugins/http/api-iife.js index 8d6dbfbbc..bc80ea21b 100644 --- a/plugins/http/api-iife.js +++ b/plugins/http/api-iife.js @@ -1 +1 @@ -if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("a"===r&&!n)throw new TypeError("Private accessor was defined without a getter");if("function"==typeof t?e!==t||!n:!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t?e!==t||!s:!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,o;"function"==typeof SuppressedError&&SuppressedError;const a="__TAURI_TO_IPC_KEY__";class i{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),o.set(this,{}),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:a})=>{if(a===t(this,s,"f")){r(this,s,a+1),t(this,n,"f").call(this,e);const i=Object.keys(t(this,o,"f"));if(i.length>0){let e=a+1;for(const r of i.sort()){if(parseInt(r)!==e)break;{const s=t(this,o,"f")[r];delete t(this,o,"f")[r],t(this,n,"f").call(this,s),e+=1}}r(this,s,e)}}else t(this,o,"f")[a.toString()]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,o=new WeakMap,a)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[a]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,o=t?.proxy;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy);const a=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),f=await h.arrayBuffer(),_=0!==f.byteLength?Array.from(new Uint8Array(f)):null;for(const[e,t]of h.headers)a.get(e)||a.set(e,t);const u=(a instanceof Headers?Array.from(a.entries()):Array.isArray(a)?a:Object.entries(a)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const l=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:u,data:_,maxRedirections:n,connectTimeout:s,proxy:o}}),w=()=>c("plugin:http|fetch_cancel",{rid:l});if(r?.aborted)throw w(),new Error(d);r?.addEventListener("abort",(()=>{w()}));const{status:p,statusText:y,url:m,headers:T,rid:g}=await c("plugin:http|fetch_send",{rid:l}),b=new ReadableStream({start(e){const t=new i;t.onmessage=t=>{const r=new Uint8Array(t);0===r.length?e.close():e.enqueue(r)};c("plugin:http|fetch_read_body",{rid:g,channel:t}).catch((t=>{console.error("error reading body",t),e.error(t)}))}}),A=new Response(b,{status:p,statusText:y});return Object.defineProperty(A,"url",{value:m}),Object.defineProperty(A,"headers",{value:new Headers(T)}),A},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} +if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("a"===r&&!n)throw new TypeError("Private accessor was defined without a getter");if("function"==typeof t?e!==t||!n:!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t?e!==t||!s:!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,o;"function"==typeof SuppressedError&&SuppressedError;const a="__TAURI_TO_IPC_KEY__";class i{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),o.set(this,{}),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:a})=>{if(a===t(this,s,"f")){r(this,s,a+1),t(this,n,"f").call(this,e);const i=Object.keys(t(this,o,"f"));if(i.length>0){let e=a+1;for(const r of i.sort()){if(parseInt(r)!==e)break;{const s=t(this,o,"f")[r];delete t(this,o,"f")[r],t(this,n,"f").call(this,s),e+=1}}r(this,s,e)}}else t(this,o,"f")[a.toString()]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,o=new WeakMap,a)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[a]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,o=t?.proxy;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy);const a=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),f=await h.arrayBuffer(),_=0!==f.byteLength?Array.from(new Uint8Array(f)):null;for(const[e,t]of h.headers)a.get(e)||a.set(e,t);const u=(a instanceof Headers?Array.from(a.entries()):Array.isArray(a)?a:Object.entries(a)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const l=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:u,data:_,maxRedirections:n,connectTimeout:s,proxy:o}}),w=()=>c("plugin:http|fetch_cancel",{rid:l});if(r?.aborted)throw w(),new Error(d);r?.addEventListener("abort",(()=>{w()}));const{status:p,statusText:y,url:m,headers:g,rid:T}=await c("plugin:http|fetch_send",{rid:l}),b=new i,A=new ReadableStream({start(e){b.onmessage=t=>{const r=new Uint8Array(t);0===r.length?e.close():e.enqueue(r)};c("plugin:http|fetch_read_body",{rid:T,channel:b}).catch((t=>{console.error("error reading body",t),b.onmessage=()=>{},e.error(t)}))},cancel(){b.onmessage=()=>{}}}),R=new Response(A,{status:p,statusText:y});return Object.defineProperty(R,"url",{value:m}),Object.defineProperty(R,"headers",{value:new Headers(g)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} diff --git a/plugins/http/guest-js/index.ts b/plugins/http/guest-js/index.ts index 7d918883c..b0be6435d 100644 --- a/plugins/http/guest-js/index.ts +++ b/plugins/http/guest-js/index.ts @@ -206,10 +206,11 @@ export async function fetch( rid }) + const channel = new Channel() + // Create ReadableStream from channel messages const stream = new ReadableStream({ start(controller) { - const channel = new Channel() channel.onmessage = (arr) => { const chunk = new Uint8Array(arr) @@ -230,8 +231,12 @@ export async function fetch( // If the promise fails, make sure the stream is closed readPromise.catch((e) => { console.error('error reading body', e) + channel.onmessage = () => {} controller.error(e) }) + }, + cancel() { + channel.onmessage = () => {} } })