Skip to content

Commit

Permalink
Companion stream upload unknown size files (#5489)
Browse files Browse the repository at this point in the history
* stream upload unknown size files

behind a new option streamingUploadSizeless
COMPANION_STREAMING_UPLOAD_SIZELESS
for tus

* allow for all upload protocols

seems to be working
closes #5305

* refactor

and fix bug where progress was not always emitted

* fix type

* fix progress throttling

only do it on total progress

* Improve progress in UI

- only show progress percent and total bytes for files that we know the size of. (but all files will still be included in number of files)
- use `null` as an unknown value for progress and ETA, allowing us to remove ETA from UI when unknown
- `percentage` make use of `undefined` when progress is not yet known - don't show percentage in UI when unknown
- add a new state field `progress` that's the same as `totalProgress` but can also be `null`

* fix build error

* format

* fix progress when upload complete

* use execa for companion load balancer

if not, then it leaves zombie companion instances running in the background when e2e stops
have to be manually killed before running e2e again

* update docs and tests for new state.progress

* revert progress/totalProgress

* improve doc

* remove option streamingUploadSizeless

we agreed that this can be considered not a breaking change

* change progress the to "of unknown"

* revert

* remove companion doc

* add e2e test
  • Loading branch information
mifi authored Dec 7, 2024
1 parent e07c83b commit 24fd415
Show file tree
Hide file tree
Showing 18 changed files with 402 additions and 196 deletions.
12 changes: 12 additions & 0 deletions e2e/cypress/integration/dashboard-xhr.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
interceptCompanionUrlMetaRequest,
interceptCompanionUrlRequest,
runRemoteUrlImageUploadTest,
runRemoteUnsplashUploadTest,
} from './reusable-tests.ts'
Expand Down Expand Up @@ -57,6 +58,17 @@ describe('Dashboard with XHR', () => {
})
})

it('should upload unknown size files', () => {
cy.get('[data-cy="Url"]').click()
cy.get('.uppy-Url-input').type('http://localhost:4678/unknown-size')
cy.get('.uppy-Url-importButton').click()
interceptCompanionUrlRequest()
cy.get('.uppy-StatusBar-actionBtn--upload').click()
cy.wait('@url').then(() => {
cy.get('.uppy-StatusBar-statusPrimary').should('contain', 'Complete')
})
})

it('should upload remote image with Unsplash plugin', () => {
runRemoteUnsplashUploadTest()
})
Expand Down
2 changes: 1 addition & 1 deletion e2e/cypress/integration/reusable-tests.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* global cy */

const interceptCompanionUrlRequest = () =>
export const interceptCompanionUrlRequest = () =>
cy
.intercept({ method: 'POST', url: 'http://localhost:3020/url/get' })
.as('url')
Expand Down
25 changes: 25 additions & 0 deletions e2e/mock-server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@ const requestListener = (req, res) => {
}
case '/file-no-headers':
break

case '/unknown-size': {
res.setHeader('Content-Type', 'text/html; charset=UTF-8');
res.setHeader('Transfer-Encoding', 'chunked');
const chunkSize = 1e5;
if (req.method === 'GET') {
let i = 0;
const interval = setInterval(() => {
if (i >= 10) { // 1MB
clearInterval(interval);
res.end();
return;
}
res.write(Buffer.from(Array.from({ length: chunkSize }, () => '1').join('')));
res.write('\n');
i++;
}, 10);
} else if (req.method === 'HEAD') {
res.end();
} else {
throw new Error('Unhandled method')
}
}
break;

default:
res.writeHead(404).end('Unhandled request')
}
Expand Down
53 changes: 20 additions & 33 deletions e2e/start-companion-with-load-balancer.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env node

import { spawn } from 'node:child_process'
import http from 'node:http'
import httpProxy from 'http-proxy'
import process from 'node:process'
import { execaNode } from 'execa';


const numInstances = 3
const lbPort = 3020
Expand Down Expand Up @@ -49,41 +50,27 @@ function createLoadBalancer (baseUrls) {
const isWindows = process.platform === 'win32'
const isOSX = process.platform === 'darwin'

const startCompanion = ({ name, port }) => {
const cp = spawn(process.execPath, [
const startCompanion = ({ name, port }) => execaNode('packages/@uppy/companion/src/standalone/start-server.js', {
nodeOptions: [
'-r', 'dotenv/config',
// Watch mode support is limited to Windows and macOS at the time of writing.
...(isWindows || isOSX ? ['--watch-path', 'packages/@uppy/companion/src', '--watch'] : []),
'./packages/@uppy/companion/src/standalone/start-server.js',
], {
cwd: new URL('../', import.meta.url),
stdio: 'inherit',
env: {
// Note: these env variables will override anything set in .env
...process.env,
COMPANION_PORT: port,
COMPANION_SECRET: 'development', // multi instance will not work without secret set
COMPANION_PREAUTH_SECRET: 'development', // multi instance will not work without secret set
COMPANION_ALLOW_LOCAL_URLS: 'true',
COMPANION_ENABLE_URL_ENDPOINT: 'true',
COMPANION_LOGGER_PROCESS_NAME: name,
COMPANION_CLIENT_ORIGINS: 'true',
},
})
// Adding a `then` property so the return value is awaitable:
return Object.defineProperty(cp, 'then', {
__proto__: null,
writable: true,
configurable: true,
value: Promise.prototype.then.bind(new Promise((resolve, reject) => {
cp.on('exit', (code) => {
if (code === 0) resolve(cp)
else reject(new Error(`Non-zero exit code: ${code}`))
})
cp.on('error', reject)
})),
})
}
],
cwd: new URL('../', import.meta.url),
stdio: 'inherit',
env: {
// Note: these env variables will override anything set in .env
...process.env,
COMPANION_PORT: port,
COMPANION_SECRET: 'development', // multi instance will not work without secret set
COMPANION_PREAUTH_SECRET: 'development', // multi instance will not work without secret set
COMPANION_ALLOW_LOCAL_URLS: 'true',
COMPANION_ENABLE_URL_ENDPOINT: 'true',
COMPANION_LOGGER_PROCESS_NAME: name,
COMPANION_CLIENT_ORIGINS: 'true',
},
})


const hosts = Array.from({ length: numInstances }, (_, index) => {
const port = companionStartPort + index
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"eslint-plugin-react": "^7.22.0",
"eslint-plugin-react-hooks": "^4.2.0",
"eslint-plugin-unicorn": "^53.0.0",
"execa": "^9.5.1",
"github-contributors-list": "^1.2.4",
"glob": "^8.0.0",
"jsdom": "^24.0.0",
Expand Down
21 changes: 20 additions & 1 deletion packages/@uppy/companion-client/src/RequestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import pRetry, { AbortError } from 'p-retry'

import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'
import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'
import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
import getSocketHost from '@uppy/utils/lib/getSocketHost'

import type Uppy from '@uppy/core'
Expand Down Expand Up @@ -81,6 +80,26 @@ async function handleJSONResponse<ResJson>(res: Response): Promise<ResJson> {
throw new HttpError({ statusCode: res.status, message: errMsg })
}

function emitSocketProgress(
uploader: { uppy: Uppy<any, any> },
progressData: {
progress: string // pre-formatted percentage number as a string
bytesTotal: number
bytesUploaded: number
},
file: UppyFile<any, any>,
): void {
const { progress, bytesUploaded, bytesTotal } = progressData
if (progress) {
uploader.uppy.log(`Upload progress: ${progress}`)
uploader.uppy.emit('upload-progress', file, {
uploadStarted: file.progress.uploadStarted ?? 0,
bytesUploaded,
bytesTotal,
})
}
}

export default class RequestClient<M extends Meta, B extends Body> {
static VERSION = packageJson.version

Expand Down
19 changes: 12 additions & 7 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,14 @@ class Uploader {
if (this.readStream) this.readStream.destroy(err)
}

async _uploadByProtocol(req) {
_getUploadProtocol() {
// todo a default protocol should not be set. We should ensure that the user specifies their protocol.
// after we drop old versions of uppy client we can remove this
const protocol = this.options.protocol || PROTOCOLS.multipart
return this.options.protocol || PROTOCOLS.multipart
}

async _uploadByProtocol(req) {
const protocol = this._getUploadProtocol()

switch (protocol) {
case PROTOCOLS.multipart:
Expand Down Expand Up @@ -264,8 +268,8 @@ class Uploader {
this.readStream = fileStream
}

_needDownloadFirst() {
return !this.options.size || !this.options.companionOptions.streamingUpload
_canStream() {
return this.options.companionOptions.streamingUpload
}

/**
Expand All @@ -281,7 +285,8 @@ class Uploader {
this.#uploadState = states.uploading

this.readStream = stream
if (this._needDownloadFirst()) {

if (!this._canStream()) {
logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
// Some streams need to be downloaded entirely first, because we don't know their size from the provider
// This is true for zoom and drive (exported files) or some URL downloads.
Expand Down Expand Up @@ -429,7 +434,7 @@ class Uploader {
// If fully downloading before uploading, combine downloaded and uploaded bytes
// This will make sure that the user sees half of the progress before upload starts (while downloading)
let combinedBytes = bytesUploaded
if (this._needDownloadFirst()) {
if (!this._canStream()) {
combinedBytes = Math.floor((combinedBytes + (this.downloadedBytes || 0)) / 2)
}

Expand Down Expand Up @@ -606,7 +611,7 @@ class Uploader {

const response = await runRequest(url, reqOptions)

if (bytesUploaded !== this.size) {
if (this.size != null && bytesUploaded !== this.size) {
const errMsg = `uploaded only ${bytesUploaded} of ${this.size} with status: ${response.statusCode}`
logger.error(errMsg, 'upload.multipart.mismatch.error')
throw new Error(errMsg)
Expand Down
30 changes: 16 additions & 14 deletions packages/@uppy/core/src/Uppy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ describe('src/Core', () => {
core.addUploader((fileIDs) => {
fileIDs.forEach((fileID) => {
const file = core.getFile(fileID)
if (/bar/.test(file.name)) {
if (file.name != null && /bar/.test(file.name)) {
// @ts-ignore
core.emit(
'upload-error',
Expand Down Expand Up @@ -1701,6 +1701,9 @@ describe('src/Core', () => {

const fileId = Object.keys(core.getState().files)[0]
const file = core.getFile(fileId)

core.emit('upload-start', [core.getFile(fileId)])

// @ts-ignore
core.emit('upload-progress', file, {
bytesUploaded: 12345,
Expand All @@ -1711,7 +1714,7 @@ describe('src/Core', () => {
bytesUploaded: 12345,
bytesTotal: 17175,
uploadComplete: false,
uploadStarted: null,
uploadStarted: expect.any(Number),
})

// @ts-ignore
Expand All @@ -1720,14 +1723,12 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateProgress.flush()

expect(core.getFile(fileId).progress).toEqual({
percentage: 100,
bytesUploaded: 17175,
bytesTotal: 17175,
uploadComplete: false,
uploadStarted: null,
uploadStarted: expect.any(Number),
})
})

Expand Down Expand Up @@ -1762,7 +1763,8 @@ describe('src/Core', () => {
data: {},
})

core.calculateTotalProgress()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

const uploadPromise = core.upload()
await Promise.all([
Expand All @@ -1774,7 +1776,6 @@ describe('src/Core', () => {
bytesUploaded: 0,
// null indicates unsized
bytesTotal: null,
percentage: 0,
})

// @ts-ignore
Expand Down Expand Up @@ -1844,10 +1845,11 @@ describe('src/Core', () => {
data: {},
})

core.calculateTotalProgress()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

// foo.jpg at 35%, bar.jpg at 0%
expect(core.getState().totalProgress).toBe(18)
// foo.jpg at 35%, bar.jpg has unknown size and will not be counted
expect(core.getState().totalProgress).toBe(36)

core.destroy()
})
Expand Down Expand Up @@ -1893,8 +1895,8 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateTotalProgress()
core.calculateProgress.flush()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

expect(core.getState().totalProgress).toEqual(66)
})
Expand Down Expand Up @@ -1937,8 +1939,8 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateTotalProgress()
core.calculateProgress.flush()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

expect(core.getState().totalProgress).toEqual(66)
expect(core.getState().allowNewUpload).toEqual(true)
Expand Down
Loading

0 comments on commit 24fd415

Please sign in to comment.