From 200b476d02deb96764bb43e4e7573855e2af5c92 Mon Sep 17 00:00:00 2001 From: StephanGerbeth Date: Tue, 26 Nov 2024 20:14:48 +0100 Subject: [PATCH] fix(operators): cleanup & optimization --- package-lock.json | 4 +- packages/observables/index.js | 0 packages/observables/package.json | 2 +- packages/observables/src/index.js | 1 + .../fixtures/paginatedFetchFixture.js | 12 --- packages/operators/package.json | 2 +- packages/operators/src/{request => }/cache.js | 0 .../operators/src/{request => }/cache.test.js | 5 +- packages/operators/src/index.js | 2 +- packages/operators/src/json.js | 35 --------- packages/operators/src/json/replacer.js | 10 --- packages/operators/src/json/reviver.js | 26 ------- .../operators/src/json/reworker/default.js | 30 -------- packages/operators/src/json/reworker/token.js | 43 ----------- packages/operators/src/log.js | 75 +++++++++++++------ packages/operators/src/log.test.js | 11 +++ .../operators/src/{request => }/request.http | 0 .../operators/src/{request => }/request.js | 4 +- .../src/{request => }/request.test.js | 45 ++++++++--- .../operators/src/request/autoPagination.js | 2 +- .../src/request/autoPagination.test.js | 9 ++- .../src/request/concurrentRequest.js | 2 +- .../src/request/concurrentRequest.test.js | 10 ++- .../src/request/lazyPagination.test.js | 6 +- packages/operators/src/request/polling.js | 4 +- .../operators/src/request/polling.test.js | 6 +- packages/operators/src/request/retry.js | 49 ------------ .../operators/src/{request => }/response.js | 0 .../src/{request => }/response.test.js | 17 ++++- packages/operators/src/retry.js | 44 +++++++++++ .../operators/src/{request => }/retry.test.js | 9 ++- packages/operators/src/when.js | 6 ++ packages/operators/src/when.test.js | 42 +++++++++++ vitest.config.js | 4 +- 34 files changed, 245 insertions(+), 272 deletions(-) delete mode 100644 packages/observables/index.js create mode 100644 packages/observables/src/index.js delete mode 100644 packages/operators/fixtures/paginatedFetchFixture.js rename packages/operators/src/{request => }/cache.js (100%) rename packages/operators/src/{request => }/cache.test.js (87%) delete mode 100644 packages/operators/src/json.js delete mode 100644 packages/operators/src/json/replacer.js delete mode 100644 packages/operators/src/json/reviver.js delete mode 100644 packages/operators/src/json/reworker/default.js delete mode 100644 packages/operators/src/json/reworker/token.js create mode 100644 packages/operators/src/log.test.js rename packages/operators/src/{request => }/request.http (100%) rename packages/operators/src/{request => }/request.js (90%) rename packages/operators/src/{request => }/request.test.js (74%) delete mode 100644 packages/operators/src/request/retry.js rename packages/operators/src/{request => }/response.js (100%) rename packages/operators/src/{request => }/response.test.js (82%) create mode 100644 packages/operators/src/retry.js rename packages/operators/src/{request => }/retry.test.js (80%) create mode 100644 packages/operators/src/when.js create mode 100644 packages/operators/src/when.test.js diff --git a/package-lock.json b/package-lock.json index 08078d1..5a17c09 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12535,7 +12535,7 @@ }, "packages/observables": { "name": "@rxjs-collection/observables", - "version": "1.0.5", + "version": "1.0.6", "license": "MIT", "dependencies": { "@rxjs-collection/operators": "*", @@ -12544,7 +12544,7 @@ }, "packages/operators": { "name": "@rxjs-collection/operators", - "version": "1.0.6", + "version": "1.0.10", "license": "MIT", "dependencies": { "@rxjs-collection/observables": "*", diff --git a/packages/observables/index.js b/packages/observables/index.js deleted file mode 100644 index e69de29..0000000 diff --git a/packages/observables/package.json b/packages/observables/package.json index 2c225b2..16bc281 100644 --- a/packages/observables/package.json +++ b/packages/observables/package.json @@ -10,7 +10,7 @@ } ], "type": "module", - "main": "index.js", + "main": "src/index.js", "files": [ "./src/*" ], diff --git a/packages/observables/src/index.js b/packages/observables/src/index.js new file mode 100644 index 0000000..9888181 --- /dev/null +++ b/packages/observables/src/index.js @@ -0,0 +1 @@ +export { connectionObservable } from './dom/window'; diff --git a/packages/operators/fixtures/paginatedFetchFixture.js b/packages/operators/fixtures/paginatedFetchFixture.js deleted file mode 100644 index 58c2c81..0000000 --- a/packages/operators/fixtures/paginatedFetchFixture.js +++ /dev/null @@ -1,12 +0,0 @@ -const test = await fetch('https://dummyjson.com/products?limit=10&skip=0&select=title,price'); -console.log(test); -const a = new Blob([test]); -console.log(a); - -const fr = new FileReader(); - -fr.onload = function () { - console.log(JSON.parse(this.result)); -}; - -fr.readAsText(a); diff --git a/packages/operators/package.json b/packages/operators/package.json index 1047abf..9acaa52 100644 --- a/packages/operators/package.json +++ b/packages/operators/package.json @@ -10,7 +10,7 @@ } ], "type": "module", - "main": "index.js", + "main": "src/index.js", "files": [ "./src/*" ], diff --git a/packages/operators/src/request/cache.js b/packages/operators/src/cache.js similarity index 100% rename from packages/operators/src/request/cache.js rename to packages/operators/src/cache.js diff --git a/packages/operators/src/request/cache.test.js b/packages/operators/src/cache.test.js similarity index 87% rename from packages/operators/src/request/cache.test.js rename to packages/operators/src/cache.test.js index 6287682..7332398 100644 --- a/packages/operators/src/request/cache.test.js +++ b/packages/operators/src/cache.test.js @@ -3,6 +3,7 @@ import { TestScheduler } from 'rxjs/testing'; import { beforeEach, describe, expect, test } from 'vitest'; import { cache } from './cache'; +import { log } from './log'; describe('cache', () => { let testScheduler; @@ -22,7 +23,9 @@ describe('cache', () => { testScheduler.run(({ cold, expectObservable }) => { const stream = cold('a', { a: () => triggerVal.shift() }).pipe( map(fn => fn()), - cache({ ttl: 2 }) + log('operators:cache:default:input'), + cache({ ttl: 2 }), + log('operators:cache:default:output') ); const unsubA = '-^!'; diff --git a/packages/operators/src/index.js b/packages/operators/src/index.js index 88e67f7..a2a6701 100644 --- a/packages/operators/src/index.js +++ b/packages/operators/src/index.js @@ -11,4 +11,4 @@ export { resolveBlob, distinctUntilResponseChanged } from './request/response'; -export { retryWhenError } from './request/retry'; +export { retryWhenRequestError } from './retry'; diff --git a/packages/operators/src/json.js b/packages/operators/src/json.js deleted file mode 100644 index c1c9a72..0000000 --- a/packages/operators/src/json.js +++ /dev/null @@ -1,35 +0,0 @@ -import { concatMap, map } from 'rxjs'; - -import { replaceTypes } from './json/replacer.js'; -import { reviveTypes } from './json/reviver.js'; -import { defaultDeserializeReworker, defaultSerializeReworker } from './json/reworker/default.js'; -import { signJSON, verifyJSON } from './sign.js'; - -export const serialize = (reworker, signer) => source => - source.pipe( - rework(reworker || defaultSerializeReworker), - map(data => JSON.stringify(data, replaceTypes)), - signJSON(signer) - ); - -export const deserialize = (reworker, signAddress) => source => - source.pipe( - verifyJSON(signAddress), - map(data => JSON.parse(data, reviveTypes)), - rework(reworker || defaultDeserializeReworker) - ); - -const rework = reworker => source => - source.pipe( - concatMap(data => reworkEntry('data', data, reworker)), - map(data => Object.fromEntries(data).data) - ); - -const reworkEntry = async (key, value, reworker) => { - const { transform } = reworker.find(({ type }) => type === value?.constructor) || {}; - if (transform) { - return await transform(key, value, reworkEntry, reworker); - } else { - return [[key, value]]; - } -}; diff --git a/packages/operators/src/json/replacer.js b/packages/operators/src/json/replacer.js deleted file mode 100644 index 2918342..0000000 --- a/packages/operators/src/json/replacer.js +++ /dev/null @@ -1,10 +0,0 @@ -export const replaceTypes = (key, value) => { - if (value?.constructor && value.constructor === Date) { - return value.toISOString(); - } - - if (value?.constructor && value.constructor === BigInt) { - return value.toString(); - } - return value; -}; diff --git a/packages/operators/src/json/reviver.js b/packages/operators/src/json/reviver.js deleted file mode 100644 index 63b618d..0000000 --- a/packages/operators/src/json/reviver.js +++ /dev/null @@ -1,26 +0,0 @@ -export const reviveTypes = (key, value) => { - if (isValidUrl(value)) { - return new URL(value); - } - if (isValidISODateString(value)) { - return new Date(value); - } - if (isBigInt(value)) { - return BigInt(value); - } - return value; -}; - -const isValidUrl = value => { - return URL.canParse(value) && /^[\w]+:\/\/\S+$/gm.test(value); -}; - -function isValidISODateString(value) { - if (!/\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z/.test(value)) return false; - const d = new Date(value); - return d instanceof Date && !isNaN(d.getTime()) && d.toISOString() === value; // valid date -} - -function isBigInt(value) { - return value?.constructor === String && /^\d+$/.test(value); -} diff --git a/packages/operators/src/json/reworker/default.js b/packages/operators/src/json/reworker/default.js deleted file mode 100644 index 42a0c4a..0000000 --- a/packages/operators/src/json/reworker/default.js +++ /dev/null @@ -1,30 +0,0 @@ -export const defaultReworker = [ - { - type: Array, - transform: async (key, value, rework, reworker) => [ - [ - key, - Array.from( - await Promise.all(value.map((entry, key) => rework(key, entry, reworker))), - ([[, value]]) => value - ) - ] - ] - }, - { - type: Object, - transform: async (key, value, rework, reworker) => [ - [ - key, - Object.fromEntries( - await Object.entries(value).reduce(async (accumulator, [key, value]) => { - return [...(await accumulator), ...(await rework(key, await value, reworker))]; - }, Promise.resolve([])) - ) - ] - ] - } -]; - -export const defaultSerializeReworker = defaultReworker; -export const defaultDeserializeReworker = defaultReworker; diff --git a/packages/operators/src/json/reworker/token.js b/packages/operators/src/json/reworker/token.js deleted file mode 100644 index 16a3f3b..0000000 --- a/packages/operators/src/json/reworker/token.js +++ /dev/null @@ -1,43 +0,0 @@ -import { defaultDeserializeReworker, defaultSerializeReworker } from './default.js'; - -const serializeMapping = { - tokenData: { name: 'tokenURI', preserve: false }, - contractData: { name: 'contractURI', preserve: false } -}; -export const serializeReworker = resolve => - defaultSerializeReworker.concat([ - { - type: Blob, - transform: async (key, value) => transform(key, value, serializeMapping[String(key)], resolve) - } - ]); - -const deserializeMapping = { - tokenURI: { name: 'tokenData', preserve: true }, - contractURI: { name: 'contractData', preserve: true } -}; -export const deserializeReworker = resolve => - defaultDeserializeReworker.concat([ - { - type: URL, - transform: async (key, value) => - transform(key, value, deserializeMapping[String(key)], resolve) - } - ]); - -const transform = async (key, value, config, resolve) => { - const result = await resolve(normalizeUrl(value), key); - if (config?.preserve) { - result.tokenURI = value.toString(); - } - return [ - !config || config.preserve ? [key, value] : undefined, - [config?.name || key, result] - ].filter(Boolean); -}; - -const normalizeUrl = url => { - const regex = /^(?\w+:)\/\/(?\w+)\/*(?[\w.]+)*$/gm; - const { groups } = regex.exec(url); - return url + (groups.filename ? '' : '0'); -}; diff --git a/packages/operators/src/log.js b/packages/operators/src/log.js index e23b698..57a4925 100644 --- a/packages/operators/src/log.js +++ b/packages/operators/src/log.js @@ -1,6 +1,8 @@ import { bgGreen } from 'ansi-colors'; import debug from 'debug'; -import { connectable, finalize, Observable, Subject } from 'rxjs'; +import { connectable, finalize, Subject, tap } from 'rxjs'; + +import { pipeWhen } from './when'; export const enableLog = tag => { debug.enable(tag); @@ -8,31 +10,26 @@ export const enableLog = tag => { export const log = tag => { var logger = debug(tag); - logger.log = console.log.bind(console); + logger.log = global.console.log.bind(console); var error = debug(`${tag}:error`); - if (debug.enabled(tag)) { - return source => { - return new Observable(observer => { - return source.subscribe({ - next: val => { - logger(val); - observer.next(val); - }, - error: err => { - error(err); - observer.error(err); - }, - complete: () => { - logger(bgGreen.bold('Complete!')); - observer.complete(); - } - }); - }); - }; - } else { - return source => source; - } + return source => + source.pipe( + pipeWhen( + () => debug.enabled(tag), + source => + source.pipe( + tap({ + subscribe: () => logger('subscribed'), + unsubscribe: () => logger('unsubscribed'), + finalize: () => logger('finalize'), + next: val => logger(val), + error: err => error(err), + complete: () => logger(bgGreen.bold('complete!')) + }) + ) + ) + ); }; export const logResult = (tag, observable) => { @@ -46,3 +43,33 @@ export const logResult = (tag, observable) => { ).connect(); }); }; + +// export const log = tag => { +// var logger = debug(tag); +// logger.log = global.console.log.bind(console); +// var error = debug(`${tag}:error`); + +// if (debug.enabled(tag)) { +// return source => +// new Observable(observer => { +// source.subscribe({ +// subscribe: () => logger('subscribed'), +// unsubscribe: () => logger('unsubscribed'), +// finalize: () => logger('finalize'), +// next: val => { +// logger(val); +// observer.next(val); +// }, +// error: err => { +// error(err); +// observer.error(err); +// }, +// complete: () => { +// logger(bgGreen.bold('complete!')); +// observer.complete(); +// } +// }); +// }); +// } +// return source => source; +// }; diff --git a/packages/operators/src/log.test.js b/packages/operators/src/log.test.js new file mode 100644 index 0000000..4a2866f --- /dev/null +++ b/packages/operators/src/log.test.js @@ -0,0 +1,11 @@ +import { describe, test } from 'vitest'; + +describe('log', () => { + test('default', () => { + //TODO: add test + }); + + test('logResult', () => { + //TODO: add test + }); +}); diff --git a/packages/operators/src/request/request.http b/packages/operators/src/request.http similarity index 100% rename from packages/operators/src/request/request.http rename to packages/operators/src/request.http diff --git a/packages/operators/src/request/request.js b/packages/operators/src/request.js similarity index 90% rename from packages/operators/src/request/request.js rename to packages/operators/src/request.js index f693a7d..eeca128 100644 --- a/packages/operators/src/request/request.js +++ b/packages/operators/src/request.js @@ -2,7 +2,7 @@ import { concatMap, from, throwError } from 'rxjs'; import { cache } from './cache'; import { resolveBlob, resolveJSON, resolveText } from './response'; -import { retryWhenError } from './retry'; +import { retryWhenRequestError } from './retry'; export const request = ({ retry, cache: cacheOptions } = {}) => { return source => @@ -14,7 +14,7 @@ export const request = ({ retry, cache: cacheOptions } = {}) => { return throwError(() => new Error('Failed to fetch: resource not valid')); } }), - retryWhenError(retry), + retryWhenRequestError(retry), cache(cacheOptions) ); }; diff --git a/packages/operators/src/request/request.test.js b/packages/operators/src/request.test.js similarity index 74% rename from packages/operators/src/request/request.test.js rename to packages/operators/src/request.test.js index 13f16c6..fd4d7d4 100644 --- a/packages/operators/src/request/request.test.js +++ b/packages/operators/src/request.test.js @@ -5,7 +5,7 @@ import { of } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { test, describe, beforeEach, expect, vi, afterAll, beforeAll } from 'vitest'; -import { log, logResult } from '../log.js'; +import { log, logResult } from './log.js'; import { resolveJSON } from './response.js'; describe('request', () => { @@ -30,8 +30,8 @@ describe('request', () => { const expectedVal = { a: new Error('NO CONNECTION'), - b: { ok: false }, - c: { ok: true } + b: { status: 500, ok: false }, + c: { status: 200, ok: true } }; const triggerVal = [ @@ -43,7 +43,11 @@ describe('request', () => { ]; testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', { a: () => triggerVal.shift()() }).pipe(request()); + const stream = cold('a|', { a: () => triggerVal.shift()() }).pipe( + log('operators:request:dynamicTimeout:request'), + request(), + log('operators:request:dynamicTimeout:response') + ); expectObservable(stream).toBe('5000ms c|', expectedVal); }); }); @@ -53,8 +57,8 @@ describe('request', () => { const expectedVal = { a: new Error('NO CONNECTION'), - b: { ok: false }, - c: { ok: true } + b: { status: 500, ok: false }, + c: { status: 200, ok: true } }; const triggerVal = [ @@ -67,7 +71,9 @@ describe('request', () => { testScheduler.run(({ cold, expectObservable }) => { const stream = cold('a|', { a: () => triggerVal.shift()() }).pipe( - request({ retry: { timeout: () => 5 } }) + log('operators:request:staticTimeout:request'), + request({ retryableStatuses: [500], retry: { timeout: () => 5 } }), + log('operators:request:staticTimeout:response') ); expectObservable(stream).toBe('----------c|', expectedVal); }); @@ -84,7 +90,11 @@ describe('request', () => { }; testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', triggerVal).pipe(requestJSON()); + const stream = cold('a|', triggerVal).pipe( + log('operators:request:resolveJSON:request'), + requestJSON(), + log('operators:request:resolveJSON:response') + ); expectObservable(stream).toBe('a|', expectedVal); }); }); @@ -100,7 +110,11 @@ describe('request', () => { }; testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', triggerVal).pipe(requestText()); + const stream = cold('a|', triggerVal).pipe( + log('operators:request:resolveText:request'), + requestText(), + log('operators:request:resolveText:response') + ); expectObservable(stream).toBe('a|', expectedVal); }); }); @@ -117,7 +131,11 @@ describe('request', () => { // TODO: correctly compare blob - currently successful test, while blob content is different testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', triggerVal).pipe(requestBlob()); + const stream = cold('a|', triggerVal).pipe( + log('operators:request:resolveBlob:request'), + requestBlob(), + log('operators:request:resolveBlob:response') + ); expectObservable(stream).toBe('a|', expectedVal); }); }); @@ -148,7 +166,12 @@ describe.skip('request - demo ', () => { await logResult( 'demo', - of(req).pipe(log('request:upload'), request(), log('request:upload:response'), resolveJSON()) + of(req).pipe( + log('operators:request:upload'), + request(), + log('operators:request:upload:response'), + resolveJSON() + ) ); }); }); diff --git a/packages/operators/src/request/autoPagination.js b/packages/operators/src/request/autoPagination.js index 345ac5a..eab9f3d 100644 --- a/packages/operators/src/request/autoPagination.js +++ b/packages/operators/src/request/autoPagination.js @@ -1,6 +1,6 @@ import { concatMap, expand, filter, from, map } from 'rxjs'; -import { request } from './request'; +import { request } from '../request'; export const autoPagination = ({ resolveRoute }) => { return source => diff --git a/packages/operators/src/request/autoPagination.test.js b/packages/operators/src/request/autoPagination.test.js index 1c16ef5..365f63b 100644 --- a/packages/operators/src/request/autoPagination.test.js +++ b/packages/operators/src/request/autoPagination.test.js @@ -5,7 +5,7 @@ import { TestScheduler } from 'rxjs/testing'; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; import { log, logResult } from '../log'; -import { resolveJSON } from './response'; +import { resolveJSON } from '../response'; describe('auto pagination', () => { let testScheduler; @@ -45,7 +45,8 @@ describe('auto pagination', () => { testScheduler.run(({ cold, expectObservable }) => { expectObservable( - cold('-a-------------------', { a: 'a' }).pipe( + cold('-a|', { a: 'a' }).pipe( + log('operators:request:autoPagination:input'), autoPagination({ resolveRoute: (url, resp) => { if (resp) { @@ -55,9 +56,9 @@ describe('auto pagination', () => { } }), resolveJSON(), - log('marble:result') + log('operators:request:autoPagination:output') ) - ).toBe('---a----b--cd---e----', expectedVal); + ).toBe('---a----b--cd---(e|)', expectedVal); }); }); }); diff --git a/packages/operators/src/request/concurrentRequest.js b/packages/operators/src/request/concurrentRequest.js index 7fe7d60..5434805 100644 --- a/packages/operators/src/request/concurrentRequest.js +++ b/packages/operators/src/request/concurrentRequest.js @@ -1,6 +1,6 @@ import { mergeMap, of } from 'rxjs'; -import { request } from './request'; +import { request } from '../request'; export const concurrentRequest = (concurrent = 1) => { return source => source.pipe(mergeMap(url => of(url).pipe(request()), concurrent)); diff --git a/packages/operators/src/request/concurrentRequest.test.js b/packages/operators/src/request/concurrentRequest.test.js index 67e46ca..aeb1595 100644 --- a/packages/operators/src/request/concurrentRequest.test.js +++ b/packages/operators/src/request/concurrentRequest.test.js @@ -5,7 +5,7 @@ import { TestScheduler } from 'rxjs/testing'; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; import { log, logResult } from '../log'; -import { resolveJSON, resolveText } from './response'; +import { resolveJSON, resolveText } from '../response'; describe('concurrent request', () => { let testScheduler; @@ -37,11 +37,13 @@ describe('concurrent request', () => { testScheduler.run(({ cold, expectObservable }) => { expectObservable( - cold('-a-b-(cd)-e----', triggerVal).pipe( + cold('-a-b-(cd)-e|', triggerVal).pipe( + log('operators:request:concurrent:input'), concurrentRequest(Object.keys(triggerVal).length), - resolveText() + resolveText(), + log('operators:request:concurrent:output') ) - ).toBe('---a--c-(bd)--e'); + ).toBe('---a--c-(bd)--(e|)'); }); }); }); diff --git a/packages/operators/src/request/lazyPagination.test.js b/packages/operators/src/request/lazyPagination.test.js index 1c32ced..3533313 100644 --- a/packages/operators/src/request/lazyPagination.test.js +++ b/packages/operators/src/request/lazyPagination.test.js @@ -5,7 +5,7 @@ import { TestScheduler } from 'rxjs/testing'; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; import { log, logResult } from '../log'; -import { resolveJSON } from './response'; +import { resolveJSON } from '../response'; describe('lazy pagination', () => { let testScheduler; @@ -56,12 +56,14 @@ describe('lazy pagination', () => { testScheduler.run(({ cold, expectObservable }) => { expectObservable( of('https://example.com').pipe( + log('operators:request:lazyPagination:input'), lazyPagination({ pager, concurrent: 5, resolveRoute: (url, { value }) => responseVal[String(value)] }), - resolveJSON() + resolveJSON(), + log('operators:request:lazyPagination:output') ) ).toBe('--daceb--------', expectedVal); expectObservable(cold('-(abcde)--------', triggerVal).pipe(tap(fn => fn()))); diff --git a/packages/operators/src/request/polling.js b/packages/operators/src/request/polling.js index a2986a4..364f324 100644 --- a/packages/operators/src/request/polling.js +++ b/packages/operators/src/request/polling.js @@ -1,7 +1,7 @@ import { delay, expand, of } from 'rxjs'; -import { request } from './request'; -import { distinctUntilResponseChanged } from './response'; +import { request } from '../request'; +import { distinctUntilResponseChanged } from '../response'; export const polling = (timeout = 1000) => { return source => diff --git a/packages/operators/src/request/polling.test.js b/packages/operators/src/request/polling.test.js index e1bc614..6cf86ab 100644 --- a/packages/operators/src/request/polling.test.js +++ b/packages/operators/src/request/polling.test.js @@ -4,6 +4,8 @@ import { concatMap } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; +import { log } from '../log'; + describe('polling', () => { let testScheduler; @@ -46,8 +48,10 @@ describe('polling', () => { testScheduler.run(({ cold, expectObservable }) => { const stream = cold('a------------', { a: 'a' }).pipe( + log('operators:request:polling:input'), polling(2), - concatMap(e => e.arrayBuffer()) + concatMap(e => e.arrayBuffer()), + log('operators:request:polling:output') ); const unsubA = '^------------!'; diff --git a/packages/operators/src/request/retry.js b/packages/operators/src/request/retry.js deleted file mode 100644 index d1d459e..0000000 --- a/packages/operators/src/request/retry.js +++ /dev/null @@ -1,49 +0,0 @@ -import { connectionObservable } from '#observables/dom/window.js'; -import { - combineLatest, - concatMap, - delay, - filter, - map, - merge, - of, - partition, - retry, - tap, - throwError -} from 'rxjs'; - -const defaultTimeout = count => Math.min(60000, Math.pow(count, 2) * 1000); - -export const retryWhenError = ({ timeout = defaultTimeout, count } = {}) => { - let counter = 0; - - return source => { - return source.pipe( - concatMap(resp => { - const [success, error] = partition(of(resp), resp => resp.ok); - return merge( - success, - error.pipe(concatMap(() => throwError(() => new Error('invalid request')))) - ); - }), - retry({ - count, - delay: () => determineDelayWhenOnline(timeout, ++counter) - }) - ); - }; -}; - -const determineDelayWhenOnline = (timeout, counter) => { - return combineLatest([connectionObservable]).pipe( - // all defined observables have to be valid - map(values => values.every(v => v === true)), - // reset counter if one observable is invalid - tap(valid => (counter = counter * valid)), - // continue only if all observables are valid - filter(valid => valid), - tap(() => console.log(`retry: request - next: ${counter} in ${timeout(counter)}ms`)), - delay(timeout(counter)) - ); -}; diff --git a/packages/operators/src/request/response.js b/packages/operators/src/response.js similarity index 100% rename from packages/operators/src/request/response.js rename to packages/operators/src/response.js diff --git a/packages/operators/src/request/response.test.js b/packages/operators/src/response.test.js similarity index 82% rename from packages/operators/src/request/response.test.js rename to packages/operators/src/response.test.js index a825c9f..4e8932b 100644 --- a/packages/operators/src/request/response.test.js +++ b/packages/operators/src/response.test.js @@ -3,7 +3,7 @@ import { concatMap } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { afterEach, test, describe, beforeEach, expect, vi, beforeAll } from 'vitest'; -import { log } from '../log'; +import { log } from './log'; import { distinctUntilResponseChanged, resolveJSON, resolveText } from './response'; describe('response', () => { @@ -30,7 +30,11 @@ describe('response', () => { }; testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', triggerVal).pipe(resolveJSON()); + const stream = cold('a|', triggerVal).pipe( + log('operators:response:resolveJSON:input'), + resolveJSON(), + log('operators:response:resolveJSON:output') + ); expectObservable(stream).toBe('a|', expectedVal); }); }); @@ -44,7 +48,11 @@ describe('response', () => { }; testScheduler.run(({ cold, expectObservable }) => { - const stream = cold('a|', triggerVal).pipe(resolveText()); + const stream = cold('a|', triggerVal).pipe( + log('operators:response:resolveText:input'), + resolveText(), + log('operators:response:resolveText:output') + ); expectObservable(stream).toBe('a|', expectedVal); }); }); @@ -75,9 +83,10 @@ describe('response', () => { testScheduler.run(({ cold, expectObservable }) => { expectObservable( cold('-a-b-c-d-e-f-g-h-|', triggerValues).pipe( + log('operators:response:change:input'), distinctUntilResponseChanged(), concatMap(resp => resp.arrayBuffer()), - log('marble:result') + log('operators:response:change:output') ) ).toBe('-a---c---e-f---h-|', expectedVal); }); diff --git a/packages/operators/src/retry.js b/packages/operators/src/retry.js new file mode 100644 index 0000000..8ff236c --- /dev/null +++ b/packages/operators/src/retry.js @@ -0,0 +1,44 @@ +import { connectionObservable } from '#observables/dom/window.js'; +import { debug } from 'debug'; +import { combineLatest, concatMap, delay, filter, map, retry, tap, throwError } from 'rxjs'; + +import { pipeWhen } from './when'; + +const defaultTimeout = count => Math.min(60000, Math.pow(count, 2) * 1000); + +export const retryWhenRequestError = ({ + retryableStatuses, + timeout = defaultTimeout, + count +} = {}) => { + let counter = 0; + + return source => { + return source.pipe( + pipeWhen( + resp => (retryableStatuses && retryableStatuses.includes(resp.status)) || !resp.ok, + source => source.pipe(concatMap(() => throwError(() => new Error('invalid request')))) + ), + retry({ count, delay: () => determineDelayWhenOnline(timeout, ++counter) }) + ); + }; +}; + +const determineDelayWhenOnline = (timeout, counter) => { + return combineLatest([connectionObservable]) + .pipe( + // all defined observables have to be valid + map(values => values.every(v => v === true)), + // reset counter if one observable is invalid + tap(valid => (counter = counter * valid)), + // continue only if all observables are valid + filter(valid => valid), + tap({ + next: () => { + const logger = debug('retry'); + logger(`request - next: ${counter} in ${timeout(counter)}ms`); + } + }) + ) + .pipe(delay(timeout(counter))); +}; diff --git a/packages/operators/src/request/retry.test.js b/packages/operators/src/retry.test.js similarity index 80% rename from packages/operators/src/request/retry.test.js rename to packages/operators/src/retry.test.js index 1a5a23c..7dbe3cd 100644 --- a/packages/operators/src/request/retry.test.js +++ b/packages/operators/src/retry.test.js @@ -2,8 +2,8 @@ import { map } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { beforeEach, describe, expect, test } from 'vitest'; -import { log } from '../log'; -import { retryWhenError } from './retry'; +import { log } from './log'; +import { retryWhenRequestError } from './retry'; describe('request retry', () => { let testScheduler; @@ -26,8 +26,9 @@ describe('request retry', () => { // if you define a delay, you have to add the delay to the subscribe multiple times (num retries) const stream = cold('a|', { a: () => triggerVal.shift() }).pipe( map(fn => fn()), - retryWhenError({ timeout: () => 5 }), - log('marble:result') + log('operators:retry:default:input'), + retryWhenRequestError({ retryableStatuses: [500], timeout: () => 5 }), + log('operators:retry:default:output') ); const unsubA = '^-----------'; diff --git a/packages/operators/src/when.js b/packages/operators/src/when.js new file mode 100644 index 0000000..26e1398 --- /dev/null +++ b/packages/operators/src/when.js @@ -0,0 +1,6 @@ +import { merge, partition, share } from 'rxjs'; + +export const pipeWhen = (condition, fn) => source => { + const [success, fail] = partition(source.pipe(share()), condition); + return merge(fn(success), fail); +}; diff --git a/packages/operators/src/when.test.js b/packages/operators/src/when.test.js new file mode 100644 index 0000000..0f3e1b5 --- /dev/null +++ b/packages/operators/src/when.test.js @@ -0,0 +1,42 @@ +import { map } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; +import { beforeEach, describe, expect, test } from 'vitest'; + +import { log } from './log'; +import { pipeWhen } from './when'; + +describe('when', () => { + let testScheduler; + + beforeEach(() => { + testScheduler = new TestScheduler((actual, expected) => expect(actual).deep.equal(expected)); + }); + + test('default', () => { + const triggerVal = { + a: 1, + b: 2, + c: 3, + d: 4 + }; + + const expectedVal = { + a: 1, + b: 4, + c: 3, + d: 16 + }; + + testScheduler.run(({ cold, expectObservable }) => { + const stream = cold('a-b-c-d|', triggerVal).pipe( + log('operators:when:pipe:input'), + pipeWhen( + v => !(v % 2), + s => s.pipe(map(v => v * v)) + ), + log('operators:when:pipe:output') + ); + expectObservable(stream).toBe('a-b-c-d|', expectedVal); + }); + }); +}); diff --git a/vitest.config.js b/vitest.config.js index ad7d4d7..235745c 100644 --- a/vitest.config.js +++ b/vitest.config.js @@ -10,7 +10,9 @@ export default defineConfig({ ...configDefaults.exclude, 'commitlint.config.js', 'eslint.ignores.js', - 'vitest.workspace.js' + 'vitest.workspace.js', + './packages/observables/src/index.js', + './packages/operators/src/index.js' ] }, include: ['**/packages/**/*.test.js']