From 02ef5d5e3ebff621ff155cd393f0bf67d260d5d3 Mon Sep 17 00:00:00 2001 From: Siyuan Wang Date: Tue, 12 Dec 2023 19:54:37 +0800 Subject: [PATCH] fix(market-data-collector): task retry subscription complete (#357) --- apps/market-data-collector/src/index.ts | 43 +++++++++++-------- .../2023-12-12-11-44.json | 10 +++++ 2 files changed, 34 insertions(+), 19 deletions(-) create mode 100644 common/changes/@yuants/app-market-data-collector/2023-12-12-11-44.json diff --git a/apps/market-data-collector/src/index.ts b/apps/market-data-collector/src/index.ts index bddd68ac..123ec73b 100644 --- a/apps/market-data-collector/src/index.ts +++ b/apps/market-data-collector/src/index.ts @@ -221,7 +221,6 @@ const runTask = (psr: IPullSourceRelation) => }; // Metrics State - // TODO: optimize the performance of Metric.set subs.push(interval(10_000).subscribe(reportStatus)); // Wait for current_back_off_time to start @@ -296,25 +295,30 @@ const runTask = (psr: IPullSourceRelation) => copyDataAction$ .pipe( mergeMap(() => - term.copyDataRecords({ - type: 'period', - tags: { - datasource_id: psr.datasource_id, - product_id: psr.product_id, - period_in_sec: '' + psr.period_in_sec, - }, - time_range: [lastTime, Date.now()], - receiver_terminal_id: STORAGE_TERMINAL_ID, - }), + term + .copyDataRecords({ + type: 'period', + tags: { + datasource_id: psr.datasource_id, + product_id: psr.product_id, + period_in_sec: '' + psr.period_in_sec, + }, + time_range: [lastTime, Date.now()], + receiver_terminal_id: STORAGE_TERMINAL_ID, + }) + .pipe( + tap(() => { + taskComplete$.next(); + }), + // ISSUE: catch error will replace the whole stream with EMPTY, therefore it must be placed inside mergeMap + // so that the outer stream subscription will not be affected + catchError((e) => { + err = e; + taskError$.next(); + return EMPTY; + }), + ), ), - tap(() => { - taskComplete$.next(); - }), - catchError((e, caught$) => { - err = e; - taskError$.next(); - return EMPTY; - }), ) .subscribe(), ); @@ -355,6 +359,7 @@ const runTask = (psr: IPullSourceRelation) => subs.push( taskFinalize$.subscribe(() => { if (status === 'error') { + console.info(formatTime(Date.now()), `TaskRetry`, title); timer(current_back_off_time).subscribe(() => { taskScheduled$.next(); }); diff --git a/common/changes/@yuants/app-market-data-collector/2023-12-12-11-44.json b/common/changes/@yuants/app-market-data-collector/2023-12-12-11-44.json new file mode 100644 index 00000000..d06ce8ee --- /dev/null +++ b/common/changes/@yuants/app-market-data-collector/2023-12-12-11-44.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/app-market-data-collector", + "comment": "fix: task error retry", + "type": "patch" + } + ], + "packageName": "@yuants/app-market-data-collector" +} \ No newline at end of file