forked from input-output-hk/cardano-js-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
withRolledBackEvents.ts
99 lines (93 loc) · 3.72 KB
/
withRolledBackEvents.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import { ChainSyncEventType } from '@cardano-sdk/core';
import { CustomError } from 'ts-custom-error';
import { EMPTY, Observable, map, scan, toArray } from 'rxjs';
import { ProjectorOperator, RollBackwardEvent, RollForwardEvent } from '../types';
import { WithStabilityWindow } from './withStabilityWindow';
import { blockingWithLatestFrom } from '@cardano-sdk/util-rxjs';
export type WithRolledBackEvents<ExtraRollForwardProps = {}> = {
/**
* In reverse order of that they were applied
*/
rolledBackEvents: RollForwardEvent<ExtraRollForwardProps>[];
};
type WithRolledBackEventsScan<TRollForwardEvent> = {
eventCache?: TRollForwardEvent[];
evt: TRollForwardEvent | (RollBackwardEvent & WithRolledBackEvents<TRollForwardEvent>);
};
export class InsufficientEventCacheError extends CustomError {}
const rollForward = <ExtraRollForwardProps extends WithStabilityWindow>(
evt: RollForwardEvent<ExtraRollForwardProps>,
eventCache: RollForwardEvent<ExtraRollForwardProps>[]
) => {
// clear blocks that are past stability window
const slotThreshold = evt.tip.slot - evt.stabilityWindowSlotsCount;
while (eventCache.length > 0 && eventCache[0].block.header.slot < slotThreshold) eventCache.shift();
// add current block to cache and return the event unchanged
eventCache.push(evt);
return { eventCache, evt };
};
const rollBackward = <ExtraRollForwardProps, ExtraRollBackwardProps>(
evt: RollBackwardEvent<ExtraRollBackwardProps>,
eventCache: RollForwardEvent<ExtraRollForwardProps>[]
) => {
const rollbackTo = evt.tip;
if (rollbackTo === 'origin') {
return {
eventCache: [],
evt: {
...evt,
rolledBackEvents: eventCache.reverse()
}
};
}
const rolledBackEvents = [] as RollForwardEvent<ExtraRollForwardProps>[];
while (eventCache.length > 0 && eventCache[eventCache.length - 1].block.header.hash !== rollbackTo.hash)
rolledBackEvents.push(eventCache.pop()!);
if (
rolledBackEvents.length > 0 &&
rolledBackEvents.length < rolledBackEvents[0].block.header.blockNo - rollbackTo.blockNo
) {
throw new InsufficientEventCacheError();
}
return { eventCache, evt: { ...evt, rolledBackEvents } };
};
/**
* Adds `rolledBackEvents` to RollBackward events.
* `rolledBackEvents` are in descending order (starting from tip going down to origin).
*
* @param evtCache$ observable that emits events up to first event emitted by source evt$ observable.
* It is used to build cache of events to be used in case a rollback happens.
* If syncing from origin, there's no need to pass it.
* Otherwise, it should emit all events up to source start within stability window.
*/
export const withRolledBackEvents =
<ExtraRollForwardPropsIn extends WithStabilityWindow, ExtraRollBackwardPropsIn>(
evtCache$: Observable<RollForwardEvent<ExtraRollForwardPropsIn>> = EMPTY
): ProjectorOperator<
ExtraRollForwardPropsIn,
ExtraRollBackwardPropsIn,
{},
WithRolledBackEvents<RollForwardEvent<ExtraRollForwardPropsIn>>
> =>
(evt$) =>
evt$.pipe(
blockingWithLatestFrom(evtCache$.pipe(toArray())),
scan(
(
{ eventCache },
[evt, initialEvtCache]
): WithRolledBackEventsScan<RollForwardEvent<ExtraRollForwardPropsIn>> => {
eventCache ||= initialEvtCache;
switch (evt.eventType) {
case ChainSyncEventType.RollForward:
return rollForward(evt, eventCache);
case ChainSyncEventType.RollBackward:
return rollBackward(evt, eventCache);
}
},
{
evt: {} as WithRolledBackEventsScan<RollForwardEvent<ExtraRollForwardPropsIn>>['evt']
} as WithRolledBackEventsScan<RollForwardEvent<ExtraRollForwardPropsIn>>
),
map(({ evt }) => evt)
);