Skip to content

Commit

Permalink
fix(market-data-collector): task retry subscription complete (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
Thrimbda authored Dec 12, 2023
1 parent 5077d80 commit 02ef5d5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
43 changes: 24 additions & 19 deletions apps/market-data-collector/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ const runTask = (psr: IPullSourceRelation) =>
};

// Metrics State
// TODO: optimize the performance of Metric.set
subs.push(interval(10_000).subscribe(reportStatus));

// Wait for current_back_off_time to start
Expand Down Expand Up @@ -296,25 +295,30 @@ const runTask = (psr: IPullSourceRelation) =>
copyDataAction$
.pipe(
mergeMap(() =>
term.copyDataRecords({
type: 'period',
tags: {
datasource_id: psr.datasource_id,
product_id: psr.product_id,
period_in_sec: '' + psr.period_in_sec,
},
time_range: [lastTime, Date.now()],
receiver_terminal_id: STORAGE_TERMINAL_ID,
}),
term
.copyDataRecords({
type: 'period',
tags: {
datasource_id: psr.datasource_id,
product_id: psr.product_id,
period_in_sec: '' + psr.period_in_sec,
},
time_range: [lastTime, Date.now()],
receiver_terminal_id: STORAGE_TERMINAL_ID,
})
.pipe(
tap(() => {
taskComplete$.next();
}),
// ISSUE: catch error will replace the whole stream with EMPTY, therefore it must be placed inside mergeMap
// so that the outer stream subscription will not be affected
catchError((e) => {
err = e;
taskError$.next();
return EMPTY;
}),
),
),
tap(() => {
taskComplete$.next();
}),
catchError((e, caught$) => {
err = e;
taskError$.next();
return EMPTY;
}),
)
.subscribe(),
);
Expand Down Expand Up @@ -355,6 +359,7 @@ const runTask = (psr: IPullSourceRelation) =>
subs.push(
taskFinalize$.subscribe(() => {
if (status === 'error') {
console.info(formatTime(Date.now()), `TaskRetry`, title);
timer(current_back_off_time).subscribe(() => {
taskScheduled$.next();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/app-market-data-collector",
"comment": "fix: task error retry",
"type": "patch"
}
],
"packageName": "@yuants/app-market-data-collector"
}

0 comments on commit 02ef5d5

Please sign in to comment.