Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Companion stream upload unknown size files #5489

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 20 additions & 33 deletions e2e/start-companion-with-load-balancer.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env node

import { spawn } from 'node:child_process'
import http from 'node:http'
import httpProxy from 'http-proxy'
import process from 'node:process'
import { execaNode } from 'execa';


const numInstances = 3
const lbPort = 3020
Expand Down Expand Up @@ -49,41 +50,27 @@ function createLoadBalancer (baseUrls) {
const isWindows = process.platform === 'win32'
const isOSX = process.platform === 'darwin'

const startCompanion = ({ name, port }) => {
const cp = spawn(process.execPath, [
const startCompanion = ({ name, port }) => execaNode('packages/@uppy/companion/src/standalone/start-server.js', {
nodeOptions: [
'-r', 'dotenv/config',
// Watch mode support is limited to Windows and macOS at the time of writing.
...(isWindows || isOSX ? ['--watch-path', 'packages/@uppy/companion/src', '--watch'] : []),
'./packages/@uppy/companion/src/standalone/start-server.js',
], {
cwd: new URL('../', import.meta.url),
stdio: 'inherit',
env: {
// Note: these env variables will override anything set in .env
...process.env,
COMPANION_PORT: port,
COMPANION_SECRET: 'development', // multi instance will not work without secret set
COMPANION_PREAUTH_SECRET: 'development', // multi instance will not work without secret set
COMPANION_ALLOW_LOCAL_URLS: 'true',
COMPANION_ENABLE_URL_ENDPOINT: 'true',
COMPANION_LOGGER_PROCESS_NAME: name,
COMPANION_CLIENT_ORIGINS: 'true',
},
})
// Adding a `then` property so the return value is awaitable:
return Object.defineProperty(cp, 'then', {
__proto__: null,
writable: true,
configurable: true,
value: Promise.prototype.then.bind(new Promise((resolve, reject) => {
cp.on('exit', (code) => {
if (code === 0) resolve(cp)
else reject(new Error(`Non-zero exit code: ${code}`))
})
cp.on('error', reject)
})),
})
}
],
cwd: new URL('../', import.meta.url),
stdio: 'inherit',
env: {
// Note: these env variables will override anything set in .env
...process.env,
COMPANION_PORT: port,
COMPANION_SECRET: 'development', // multi instance will not work without secret set
COMPANION_PREAUTH_SECRET: 'development', // multi instance will not work without secret set
COMPANION_ALLOW_LOCAL_URLS: 'true',
COMPANION_ENABLE_URL_ENDPOINT: 'true',
COMPANION_LOGGER_PROCESS_NAME: name,
COMPANION_CLIENT_ORIGINS: 'true',
},
})


const hosts = Array.from({ length: numInstances }, (_, index) => {
const port = companionStartPort + index
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"eslint-plugin-react": "^7.22.0",
"eslint-plugin-react-hooks": "^4.2.0",
"eslint-plugin-unicorn": "^53.0.0",
"execa": "^9.5.1",
"github-contributors-list": "^1.2.4",
"glob": "^8.0.0",
"jsdom": "^24.0.0",
Expand Down
21 changes: 20 additions & 1 deletion packages/@uppy/companion-client/src/RequestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import pRetry, { AbortError } from 'p-retry'

import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'
import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'
import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
import getSocketHost from '@uppy/utils/lib/getSocketHost'

import type Uppy from '@uppy/core'
Expand Down Expand Up @@ -81,6 +80,26 @@ async function handleJSONResponse<ResJson>(res: Response): Promise<ResJson> {
throw new HttpError({ statusCode: res.status, message: errMsg })
}

function emitSocketProgress(
uploader: { uppy: Uppy<any, any> },
progressData: {
progress: string // pre-formatted percentage number as a string
bytesTotal: number
bytesUploaded: number
},
file: UppyFile<any, any>,
): void {
const { progress, bytesUploaded, bytesTotal } = progressData
if (progress) {
uploader.uppy.log(`Upload progress: ${progress}`)
uploader.uppy.emit('upload-progress', file, {
uploadStarted: file.progress.uploadStarted ?? 0,
bytesUploaded,
bytesTotal,
})
}
}

export default class RequestClient<M extends Meta, B extends Body> {
static VERSION = packageJson.version

Expand Down
19 changes: 12 additions & 7 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,14 @@ class Uploader {
if (this.readStream) this.readStream.destroy(err)
}

async _uploadByProtocol(req) {
_getUploadProtocol() {
// todo a default protocol should not be set. We should ensure that the user specifies their protocol.
// after we drop old versions of uppy client we can remove this
const protocol = this.options.protocol || PROTOCOLS.multipart
return this.options.protocol || PROTOCOLS.multipart
}

async _uploadByProtocol(req) {
const protocol = this._getUploadProtocol()

switch (protocol) {
case PROTOCOLS.multipart:
Expand Down Expand Up @@ -264,8 +268,8 @@ class Uploader {
this.readStream = fileStream
}

_needDownloadFirst() {
return !this.options.size || !this.options.companionOptions.streamingUpload
_canStream() {
return this.options.companionOptions.streamingUpload
}

/**
Expand All @@ -281,7 +285,8 @@ class Uploader {
this.#uploadState = states.uploading

this.readStream = stream
if (this._needDownloadFirst()) {

if (!this._canStream()) {
logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
// Some streams need to be downloaded entirely first, because we don't know their size from the provider
// This is true for zoom and drive (exported files) or some URL downloads.
Expand Down Expand Up @@ -429,7 +434,7 @@ class Uploader {
// If fully downloading before uploading, combine downloaded and uploaded bytes
// This will make sure that the user sees half of the progress before upload starts (while downloading)
let combinedBytes = bytesUploaded
if (this._needDownloadFirst()) {
if (!this._canStream()) {
combinedBytes = Math.floor((combinedBytes + (this.downloadedBytes || 0)) / 2)
}

Expand Down Expand Up @@ -606,7 +611,7 @@ class Uploader {

const response = await runRequest(url, reqOptions)

if (bytesUploaded !== this.size) {
if (this.size != null && bytesUploaded !== this.size) {
const errMsg = `uploaded only ${bytesUploaded} of ${this.size} with status: ${response.statusCode}`
logger.error(errMsg, 'upload.multipart.mismatch.error')
throw new Error(errMsg)
Expand Down
30 changes: 16 additions & 14 deletions packages/@uppy/core/src/Uppy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ describe('src/Core', () => {
core.addUploader((fileIDs) => {
fileIDs.forEach((fileID) => {
const file = core.getFile(fileID)
if (/bar/.test(file.name)) {
if (file.name != null && /bar/.test(file.name)) {
// @ts-ignore
core.emit(
'upload-error',
Expand Down Expand Up @@ -1701,6 +1701,9 @@ describe('src/Core', () => {

const fileId = Object.keys(core.getState().files)[0]
const file = core.getFile(fileId)

core.emit('upload-start', [core.getFile(fileId)])

// @ts-ignore
core.emit('upload-progress', file, {
bytesUploaded: 12345,
Expand All @@ -1711,7 +1714,7 @@ describe('src/Core', () => {
bytesUploaded: 12345,
bytesTotal: 17175,
uploadComplete: false,
uploadStarted: null,
uploadStarted: expect.any(Number),
})

// @ts-ignore
Expand All @@ -1720,14 +1723,12 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateProgress.flush()

expect(core.getFile(fileId).progress).toEqual({
percentage: 100,
bytesUploaded: 17175,
bytesTotal: 17175,
uploadComplete: false,
uploadStarted: null,
uploadStarted: expect.any(Number),
})
})

Expand Down Expand Up @@ -1762,7 +1763,8 @@ describe('src/Core', () => {
data: {},
})

core.calculateTotalProgress()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

const uploadPromise = core.upload()
await Promise.all([
Expand All @@ -1774,7 +1776,6 @@ describe('src/Core', () => {
bytesUploaded: 0,
// null indicates unsized
bytesTotal: null,
percentage: 0,
})

// @ts-ignore
Expand Down Expand Up @@ -1844,10 +1845,11 @@ describe('src/Core', () => {
data: {},
})

core.calculateTotalProgress()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

// foo.jpg at 35%, bar.jpg at 0%
expect(core.getState().totalProgress).toBe(18)
// foo.jpg at 35%, bar.jpg has unknown size and will not be counted
expect(core.getState().totalProgress).toBe(36)

core.destroy()
})
Expand Down Expand Up @@ -1893,8 +1895,8 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateTotalProgress()
core.calculateProgress.flush()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

expect(core.getState().totalProgress).toEqual(66)
})
Expand Down Expand Up @@ -1937,8 +1939,8 @@ describe('src/Core', () => {
bytesTotal: 17175,
})

core.calculateTotalProgress()
core.calculateProgress.flush()
// @ts-ignore
core[Symbol.for('uppy test: updateTotalProgress')]()

expect(core.getState().totalProgress).toEqual(66)
expect(core.getState().allowNewUpload).toEqual(true)
Expand Down
Loading
Loading