diff --git a/common/changes/@yuants/protocol/2023-12-10-14-00.json b/common/changes/@yuants/protocol/2023-12-10-14-00.json new file mode 100644 index 00000000..fb9a0bdd --- /dev/null +++ b/common/changes/@yuants/protocol/2023-12-10-14-00.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/protocol", + "comment": "refactor(protocol): compile method validator only if it changed", + "type": "patch" + } + ], + "packageName": "@yuants/protocol" +} \ No newline at end of file diff --git a/libraries/protocol/src/terminal.ts b/libraries/protocol/src/terminal.ts index 41beae7e..cbcc41c2 100644 --- a/libraries/protocol/src/terminal.ts +++ b/libraries/protocol/src/terminal.ts @@ -12,15 +12,18 @@ import { catchError, combineLatest, concatMap, + debounceTime, defer, delayWhen, distinct, + distinctUntilChanged, filter, first, from, groupBy, interval, map, + mergeAll, mergeMap, of, pairwise, @@ -30,7 +33,6 @@ import { shareReplay, takeWhile, tap, - throttleTime, timeout, timer, toArray, @@ -639,29 +641,57 @@ export class Terminal { ); private _mapTerminalIdAndMethodToValidator$: Observable>> = - this.terminalInfos$.pipe( - // - throttleTime(30_000), - mergeMap((x) => - from(x).pipe( - mergeMap((terminalInfo) => - from(Object.entries(terminalInfo.serviceInfo || {})).pipe( - map(([method, serviceInfo]): [string, ValidateFunction] => [ - method, - new Ajv({ strict: false }).compile(serviceInfo.schema), - ]), - toArray(), - map((arr) => Object.fromEntries(arr)), - map((mapMethodToValidator): [string, Record] => [ - terminalInfo.terminal_id, - mapMethodToValidator, - ]), + new Observable>>((subscriber) => { + const mapTerminalIdAndMethodToValidator: Record> = {}; + const update$ = new Subject(); + const sub1 = this.terminalInfos$ + .pipe( + // + mergeAll(), + groupBy((v) => v.terminal_id), + mergeMap((groupByTerminalId) => + groupByTerminalId.pipe( + // + tap(() => { + const terminal_id = groupByTerminalId.key; + mapTerminalIdAndMethodToValidator[terminal_id] ??= {}; + }), + mergeMap((terminalInfo) => Object.entries(terminalInfo.serviceInfo || {})), + groupBy(([method]) => method), + mergeMap((groupByMethod) => + groupByMethod.pipe( + distinctUntilChanged( + ([, { schema: schema1 }], [, { schema: schema2 }]) => + JSON.stringify(schema1) === JSON.stringify(schema2), + ), + tap(([, { schema }]) => { + const validator = new Ajv({ strict: false }).compile(schema); + const terminal_id = groupByTerminalId.key; + const method = groupByMethod.key; + mapTerminalIdAndMethodToValidator[terminal_id][method] = validator; + update$.next(); + }), + ), + ), ), ), - toArray(), - map((arr) => Object.fromEntries(arr)), - ), - ), + ) + .subscribe(); + + const sub2 = update$ + .pipe( + debounceTime(200), + map(() => mapTerminalIdAndMethodToValidator), + ) + .subscribe(subscriber); + + return () => { + sub1.unsubscribe(); + sub2.unsubscribe(); + update$.complete(); + }; + }).pipe( + // shareReplay(1), );