diff --git a/projects/observable-webworker/src/lib/from-worker-pool.spec.ts b/projects/observable-webworker/src/lib/from-worker-pool.spec.ts index 3bf840e..6903e19 100644 --- a/projects/observable-webworker/src/lib/from-worker-pool.spec.ts +++ b/projects/observable-webworker/src/lib/from-worker-pool.spec.ts @@ -60,6 +60,17 @@ describe('fromWorkerPool', () => { sub.unsubscribe(); }); + it('does not send input close notification to ensure the workers are kept alive', () => { + const subscriptionSpy = jasmine.createSpy('subscriptionSpy'); + const sub = stubbedWorkerStream.subscribe(subscriptionSpy); + + input$.next(1); + + expect(stubbedWorkers[0].postMessage).not.toHaveBeenCalledWith(jasmine.objectContaining({ kind: 'C' })); + + sub.unsubscribe(); + }); + it('shuts down workers when subscriber unsubscribes', () => { const subscriptionSpy = jasmine.createSpy('subscriptionSpy'); const sub = stubbedWorkerStream.subscribe(subscriptionSpy); diff --git a/projects/observable-webworker/src/lib/from-worker-pool.ts b/projects/observable-webworker/src/lib/from-worker-pool.ts index 93afcf3..b957b5b 100644 --- a/projects/observable-webworker/src/lib/from-worker-pool.ts +++ b/projects/observable-webworker/src/lib/from-worker-pool.ts @@ -1,4 +1,4 @@ -import { Observable, ObservableInput, of, Subject, zip } from 'rxjs'; +import { concat, NEVER, Observable, ObservableInput, of, Subject, zip } from 'rxjs'; import { finalize, map, mergeAll, tap } from 'rxjs/operators'; import { fromWorker } from './from-worker'; @@ -69,7 +69,7 @@ export function fromWorkerPool( }), map( ([worker, unitWork]): Observable => { - return fromWorker(() => worker.factory(), of(unitWork), selectTransferables, { + return fromWorker(() => worker.factory(), concat(of(unitWork), NEVER), selectTransferables, { terminateOnComplete: false, }).pipe( finalize(() => { diff --git a/projects/observable-webworker/src/lib/run-worker.ts b/projects/observable-webworker/src/lib/run-worker.ts index 3ed481d..877d750 100644 --- a/projects/observable-webworker/src/lib/run-worker.ts +++ b/projects/observable-webworker/src/lib/run-worker.ts @@ -30,9 +30,10 @@ export function getWorkerResult( dematerialize(), ); - return workerIsUnitType(worker) - ? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize()))) - : worker.work(input$).pipe(materialize()); + return (workerIsUnitType(worker) + ? input$.pipe(concatMap(input => from(worker.workUnit(input)))) + : worker.work(input$) + ).pipe(materialize()); } export function runWorker(workerConstructor: ObservableWorkerConstructor): Subscription {