diff --git a/gax/package.json b/gax/package.json index 42bb68c0b..234bbce85 100644 --- a/gax/package.json +++ b/gax/package.json @@ -66,7 +66,6 @@ "webpack": "^4.0.0", "webpack-cli": "^4.0.0" }, - "scripts": { "docs": "jsdoc -c .jsdoc.js", "pretest": "npm run prepare", diff --git a/gax/test/unit/streaming.ts b/gax/test/unit/streaming.ts index 8365421f7..4c291abe9 100644 --- a/gax/test/unit/streaming.ts +++ b/gax/test/unit/streaming.ts @@ -19,7 +19,7 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; import {afterEach, describe, it} from 'mocha'; -import {PassThrough} from 'stream'; +import {PassThrough, Transform, pipeline} from 'stream'; import { GaxCallStream, @@ -297,6 +297,7 @@ describe('streaming', () => { done(); }); }); + it('cancels in the middle when new retries are enabled', done => { function schedulePush(s: CancellableStream, c: number) { const intervalId = setInterval(() => { @@ -441,6 +442,7 @@ describe('streaming', () => { assert.strictEqual(responseCallback.callCount, 1); }); }); + it('emit response when stream received metadata event and new gax retries is enabled', done => { const responseMetadata = {metadata: true}; const expectedStatus = {code: 0, metadata: responseMetadata}; @@ -734,6 +736,7 @@ describe('streaming', () => { done(); }); }); + it('emit parsed GoogleError when new retries are enabled', done => { const errorInfoObj = { reason: 'SERVICE_DISABLED', @@ -898,6 +901,7 @@ describe('streaming', () => { done(); }); }); + it('emit transient error message if neither maxRetries nor totaltimeout are defined when new retries are enabled', done => { const errorInfoObj = { reason: 'SERVICE_DISABLED', @@ -982,6 +986,7 @@ describe('streaming', () => { done(); }); }); + it('emit transient error on second or later error when new retries are enabled', done => { const errorInfoObj = { reason: 'SERVICE_DISABLED', @@ -1103,6 +1108,7 @@ describe('streaming', () => { done(); }); }); + it('emit error and retry once', done => { const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { code: 14, @@ -1258,6 +1264,7 @@ describe('streaming', () => { done(); }); }); + it('retries using resumption request function ', done => { const receivedData: string[] = []; const error = Object.assign(new GoogleError('test error'), { @@ -1371,6 +1378,7 @@ describe('streaming', () => { done(); }); }); + it('errors when there is a resumption request function an gaxStreamingRetries is not enabled', done => { const error = Object.assign(new GoogleError('test error'), { code: 14, @@ -1466,6 +1474,138 @@ describe('streaming', () => { done(); }); }); + + it('properly emits the end event at the end of a pipeline transformation synchronously', done => { + const spy = sinon.spy((...args: Array<{}>) => { + assert.strictEqual(args.length, 3); + const s = new PassThrough({ + objectMode: true, + }); + s.push({resources: [1, 2]}); + s.push(null); + setImmediate(() => { + s.emit('metadata'); + }); + setImmediate(() => { + s.emit('status'); + }); + + return s; + }); + + // Initial stream. + const apiCall = createApiCallStreaming( + spy, + streaming.StreamType.SERVER_STREAMING, + false, + true // new retry behavior enabled + ); + const s1 = apiCall({}, undefined); + + // Transform stream. + const transform = new Transform({ + objectMode: true, + transform: (data, _encoding, callback) => { + callback( + null, + data.resources.map((element: number) => element + 1) + ); + }, + }); + + // Final stream. + const s2 = new PassThrough({ + objectMode: true, + }); + + const finalResults: Array<{resources: Array}> = []; + + s2.on('data', data => { + finalResults.push(data); + }); + s2.on('end', () => { + assert.strictEqual( + JSON.stringify(finalResults), + JSON.stringify([[2, 3]]) + ); + done(); + }); + + pipeline(s1, transform, s2, err => { + if (err) { + throw new Error( + 'pipeline in properly emits the end event at the end of a pipeline transformation test failed' + ); + } + }); + }); + + it('properly emits the end event at the end of a pipeline transformation asynchronously', done => { + const spy = sinon.spy((...args: Array<{}>) => { + assert.strictEqual(args.length, 3); + const s = new PassThrough({ + objectMode: true, + }); + s.push({resources: [1, 2]}); + s.push(null); + setImmediate(() => { + s.emit('metadata'); + }); + setImmediate(() => { + s.emit('status'); + }); + + return s; + }); + + // Initial stream. + const apiCall = createApiCallStreaming( + spy, + streaming.StreamType.SERVER_STREAMING, + false, + true // new retry behavior enabled + ); + const s1 = apiCall({}, undefined); + + // Transform stream. + const transform = new Transform({ + objectMode: true, + transform: (data, _encoding, callback) => { + setTimeout(() => { + callback( + null, + data.resources.map((element: number) => element + 1) + ); + }, 10); + }, + }); + + // Final stream. + const s2 = new PassThrough({ + objectMode: true, + }); + + const finalResults: Array<{resources: Array}> = []; + + s2.on('data', data => { + finalResults.push(data); + }); + s2.on('end', () => { + assert.strictEqual( + JSON.stringify(finalResults), + JSON.stringify([[2, 3]]) + ); + done(); + }); + + pipeline(s1, transform, s2, err => { + if (err) { + throw new Error( + 'pipeline in properly emits the end event at the end of a pipeline transformation test failed' + ); + } + }); + }); }); describe('handles server streaming retries in gax when gaxStreamingRetries is enabled', () => { @@ -1536,6 +1676,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } }); }); + it('server streaming call retries until exceeding max retries and surfaces underlying error in note', done => { const retrySpy = sinon.spy( streaming.StreamProxy.prototype, @@ -1600,6 +1741,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } }); }); + it('does not retry when there is no shouldRetryFn and retryCodes is an empty array', done => { // we don't call the timeout/max retry check on non retryable error codes const retrySpy = sinon.spy( @@ -1715,6 +1857,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } }); }); + it('allows custom CallOptions.retry settings with shouldRetryFn instead of retryCodes and new retry behavior', done => { sinon .stub(streaming.StreamProxy.prototype, 'eventForwardHelper') @@ -1758,6 +1901,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } ); }); + it('allows custom CallOptions.retry settings with retryCodes and new retry behavior', done => { sinon .stub(streaming.StreamProxy.prototype, 'eventForwardHelper') @@ -1794,6 +1938,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } ); }); + it('allows the user to pass a custom resumption strategy', done => { sinon // typecasting to any is a workaround for stubbing private functions in sinon @@ -1849,6 +1994,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } ); }); + it('throws an error when both totalTimeoutMillis and maxRetries are passed at call time when new retry behavior is enabled', done => { const status = {code: 4, message: 'test'}; const error = Object.assign(new GoogleError('test error'), { @@ -1909,6 +2055,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en } }); }); + it('throws an error when both retryRequestoptions and retryOptions are passed at call time when new retry behavior is enabled', done => { //if this is reached, it means the settings merge in createAPICall did not fail properly sinon.stub(StreamingApiCaller.prototype, 'call').callsFake(() => { @@ -1967,6 +2114,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en done(); } }); + it('throws a warning and converts retryRequestOptions for new retry behavior', done => { const warnStub = sinon.stub(warnings, 'warn'); sinon @@ -2065,6 +2213,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en ) ); }); + it('throws a warning and converts retryRequestOptions for new retry behavior - zero/falsiness check', done => { const warnStub = sinon.stub(warnings, 'warn'); sinon @@ -2163,6 +2312,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en ) ); }); + it('throws a warning and converts retryRequestOptions for new retry behavior - no maxRetries', done => { const warnStub = sinon.stub(warnings, 'warn'); sinon @@ -2260,6 +2410,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en ) ); }); + it('throws a warning and converts retryRequestOptions for new retry behavior - no maxRetries zero/falsiness check', done => { const warnStub = sinon.stub(warnings, 'warn'); sinon @@ -2358,6 +2509,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en ); }); }); + describe('warns/errors about server streaming retry behavior when gaxStreamingRetries is disabled', () => { afterEach(() => { // restore 'call' stubs and 'warn' stubs @@ -2398,6 +2550,7 @@ describe('REST streaming apiCall return StreamArrayParser', () => { const UserService = root.lookupService('UserService'); UserService.resolveAll(); const streamMethod = UserService.methods['RunQuery']; + it('forwards data, end event', done => { const spy = sinon.spy((...args: Array<{}>) => { assert.strictEqual(args.length, 3);