Skip to content

Commit

Permalink
Merge pull request #1189 from OneSignal/fix/externalIdHydration
Browse files Browse the repository at this point in the history
[Fix] External Id hydration
  • Loading branch information
shepherd-l authored Sep 30, 2024
2 parents 3cdf8a1 + 67b4d5b commit a49e8e7
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 46 deletions.
19 changes: 4 additions & 15 deletions __test__/unit/user/login.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,13 @@ jest.mock('../../../src/shared/libraries/Log');
describe('Login tests', () => {
beforeEach(() => {
jest.useFakeTimers();
test.stub(
PropertiesExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(
IdentityExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(
SubscriptionExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(PropertiesExecutor.prototype, 'getOperationsFromCache', []);
test.stub(IdentityExecutor.prototype, 'getOperationsFromCache', []);
test.stub(SubscriptionExecutor.prototype, 'getOperationsFromCache', []);
});

afterEach(() => {
jest.runOnlyPendingTimers();
jest.resetAllMocks();
});

Expand Down
8 changes: 7 additions & 1 deletion src/core/CoreModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { OSModelStoreFactory } from './modelRepo/OSModelStoreFactory';
import Log from '../shared/libraries/Log';
import { logMethodCall } from '../shared/utils/utils';
import { SupportedModel } from './models/SupportedModels';
import { NewRecordsState } from '../shared/models/NewRecordsState';

export default class CoreModule {
public modelRepo?: ModelRepo;
public operationRepo?: OperationRepo;
public initPromise: Promise<void>;
public newRecordsState?: NewRecordsState;

private modelCache: ModelCache;
private initResolver: () => void = () => null;
Expand All @@ -25,7 +27,11 @@ export default class CoreModule {
.then((allCachedOSModels) => {
const modelStores = OSModelStoreFactory.build(allCachedOSModels);
this.modelRepo = new ModelRepo(this.modelCache, modelStores);
this.operationRepo = new OperationRepo(this.modelRepo);
this.newRecordsState = new NewRecordsState();
this.operationRepo = new OperationRepo(
this.modelRepo,
this.newRecordsState,
);
this.initResolver();
})
.catch((e) => {
Expand Down
13 changes: 9 additions & 4 deletions src/core/CoreModuleDirector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ export class CoreModuleDirector {
await this.core.resetModelRepoAndCache();
}

public hydrateUser(user: UserData): void {
logMethodCall('CoreModuleDirector.hydrateUser', { user });
public hydrateUser(user: UserData, externalId?: string): void {
logMethodCall('CoreModuleDirector.hydrateUser', { user, externalId });
try {
const identity = this.getIdentityModel();
const properties = this.getPropertiesModel();

const { onesignal_id: onesignalId, external_id: externalId } =
user.identity;
const { onesignal_id: onesignalId } = user.identity;

if (!onesignalId) {
throw new OneSignalError('OneSignal ID is missing from user data');
Expand All @@ -83,6 +82,7 @@ export class CoreModuleDirector {
if (externalId) {
identity?.setExternalId(externalId);
properties?.setExternalId(externalId);
user.identity.external_id = externalId;
}

// identity and properties models are always single, so we hydrate immediately (i.e. replace existing data)
Expand All @@ -109,6 +109,8 @@ export class CoreModuleDirector {
): void {
logMethodCall('CoreModuleDirector._hydrateSubscriptions', {
subscriptions,
onesignalId,
externalId,
});

if (!subscriptions) {
Expand Down Expand Up @@ -175,6 +177,9 @@ export class CoreModuleDirector {
}

/* G E T T E R S */
public getNewRecordsState(): NewRecordsState | undefined {
return this.core.newRecordsState;
}

public getModelByTypeAndId(
modelName: ModelName,
Expand Down
25 changes: 23 additions & 2 deletions src/core/executors/ExecutorBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import { ExecutorResult } from './ExecutorResult';
import Log from '../../shared/libraries/Log';
import Database from '../../shared/services/Database';
import LocalStorage from '../../shared/utils/LocalStorage';
import { NewRecordsState } from '../../shared/models/NewRecordsState';

const RETRY_AFTER = 5_000;

export default abstract class ExecutorBase {
protected _deltaQueue: CoreDelta<SupportedModel>[] = [];
protected _operationQueue: Operation<SupportedModel>[] = [];
protected _newRecordsState: NewRecordsState;

protected _executeAdd?: (
operation: Operation<SupportedModel>,
Expand All @@ -31,7 +33,10 @@ export default abstract class ExecutorBase {
static OPERATIONS_BATCH_PROCESSING_TIME = 5;
static RETRY_COUNT = 5;

constructor(executorConfig: ExecutorConfig<SupportedModel>) {
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
setInterval(() => {
Log.debug('OneSignal: checking for operations to process from cache');
const cachedOperations = this.getOperationsFromCache();
Expand All @@ -48,6 +53,8 @@ export default abstract class ExecutorBase {
this._executeAdd = executorConfig.add;
this._executeUpdate = executorConfig.update;
this._executeRemove = executorConfig.remove;

this._newRecordsState = newRecordsState;
}

abstract processDeltaQueue(): void;
Expand Down Expand Up @@ -124,7 +131,7 @@ export default abstract class ExecutorBase {
if (operation) {
OperationCache.enqueue(operation);

if (this.onlineStatus) {
if (this._canExecute(operation)) {
this._processOperation(operation, ExecutorBase.RETRY_COUNT).catch(
(err) => {
Log.error(err);
Expand Down Expand Up @@ -187,4 +194,18 @@ export default abstract class ExecutorBase {
this._processOperationQueue.call(this);
}
}

private _canExecute(operation: Operation<SupportedModel>): boolean {
if (!this.onlineStatus) {
return false;
}

if (operation.applyToRecordId) {
if (!this._newRecordsState.canAccess(operation.applyToRecordId)) {
return false;
}
}

return true;
}
}
12 changes: 8 additions & 4 deletions src/core/executors/ExecutorFactory.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import { Executor } from '../models/Executor';
import { ExecutorConfig } from '../models/ExecutorConfig';
import { ModelName, SupportedModel } from '../models/SupportedModels';
Expand All @@ -6,14 +7,17 @@ import { PropertiesExecutor } from './PropertiesExecutor';
import { SubscriptionExecutor } from './SubscriptionExecutor';

export class ExecutorFactory {
static build(executorConfig: ExecutorConfig<SupportedModel>): Executor {
static build(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
): Executor {
switch (executorConfig.modelName) {
case ModelName.Identity:
return new IdentityExecutor(executorConfig);
return new IdentityExecutor(executorConfig, newRecordsState);
case ModelName.Properties:
return new PropertiesExecutor(executorConfig);
return new PropertiesExecutor(executorConfig, newRecordsState);
case ModelName.Subscriptions:
return new SubscriptionExecutor(executorConfig);
return new SubscriptionExecutor(executorConfig, newRecordsState);
}
}
}
5 changes: 3 additions & 2 deletions src/core/executors/ExecutorStore.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import { ModelName } from '../models/SupportedModels';
import OSExecutor from './ExecutorBase';
import { EXECUTOR_CONFIG_MAP } from './ExecutorConfigMap';
Expand All @@ -10,10 +11,10 @@ type ExecutorStoreInterface = {
export class ExecutorStore {
store: ExecutorStoreInterface = {};

constructor() {
constructor(newRecordsState: NewRecordsState) {
Object.values(ModelName).forEach((modelName) => {
const config = EXECUTOR_CONFIG_MAP[modelName as ModelName];
this.store[modelName] = ExecutorFactory.build(config);
this.store[modelName] = ExecutorFactory.build(config, newRecordsState);
});
}

Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/IdentityExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import OperationCache from '../caching/OperationCache';
import { CoreChangeType } from '../models/CoreChangeType';
import { PropertyDelta } from '../models/CoreDeltas';
Expand All @@ -8,8 +9,11 @@ import { isPropertyDelta } from '../utils/typePredicates';
import ExecutorBase from './ExecutorBase';

export class IdentityExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/PropertiesExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import ExecutorBase from './ExecutorBase';
import { Operation } from '../operationRepo/Operation';
import { CoreChangeType } from '../models/CoreChangeType';
Expand All @@ -6,8 +7,11 @@ import { ModelName, SupportedModel } from '../models/SupportedModels';
import OperationCache from '../caching/OperationCache';

export class PropertiesExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/SubscriptionExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import OperationCache from '../caching/OperationCache';
import { CoreChangeType } from '../models/CoreChangeType';
import { CoreDelta } from '../models/CoreDeltas';
Expand All @@ -7,8 +8,11 @@ import { Operation } from '../operationRepo/Operation';
import ExecutorBase from './ExecutorBase';

export class SubscriptionExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
3 changes: 3 additions & 0 deletions src/core/modelRepo/ModelRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
this.broadcast({
model: payload,
changeType: CoreChangeType.Add,
applyToRecordId: payload?.applyToRecordId,
});
}

Expand All @@ -77,6 +78,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
this.broadcast({
model: payload,
changeType: CoreChangeType.Remove,
applyToRecordId: payload?.applyToRecordId,
});
}

Expand All @@ -103,6 +105,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
property: payload.property,
oldValue: payload.oldValue,
newValue: payload.newValue,
applyToRecordId: payload.model?.applyToRecordId,
};
this.broadcast(delta);
}
Expand Down
7 changes: 6 additions & 1 deletion src/core/modelRepo/OSModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import { logMethodCall } from '../../shared/utils/utils';
export class OSModel<Model> extends Subscribable<ModelStoreChange<Model>> {
data: Model;
modelId: string;

onesignalId?: string;
awaitOneSignalIdAvailable: Promise<string>;
onesignalIdAvailableCallback?: (onesignalId: string) => void;
externalId?: string;
applyToRecordId?: string;

constructor(
readonly modelName: ModelName,
Expand Down Expand Up @@ -55,6 +55,11 @@ export class OSModel<Model> extends Subscribable<ModelStoreChange<Model>> {
this.externalId = externalId;
}

public setApplyToRecordId(applyToRecordId: string): void {
logMethodCall('setapplyToRecordId', { applyToRecordId });
this.applyToRecordId = applyToRecordId;
}

/**
* We use this method to update the model data.
* Results in a broadcasted update event.
Expand Down
1 change: 1 addition & 0 deletions src/core/models/CoreDeltas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { StringKeys } from './StringKeys';
export type ModelDelta<Model> = {
model: OSModel<Model>;
changeType: CoreChangeType;
applyToRecordId?: string;
};

export interface PropertyDelta<Model> extends ModelDelta<Model> {
Expand Down
2 changes: 2 additions & 0 deletions src/core/operationRepo/Operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class Operation<Model> {
timestamp: number;
payload?: Partial<SupportedModel>;
model?: OSModel<Model>;
applyToRecordId?: string;
jwtTokenAvailable: Promise<void>;
jwtToken?: string | null;

Expand All @@ -22,6 +23,7 @@ export class Operation<Model> {
this.operationId = Math.random().toString(36).substring(2);
this.payload = deltas ? this.getPayload(deltas) : undefined;
this.model = deltas ? deltas[deltas.length - 1].model : undefined;
this.applyToRecordId = deltas?.[deltas.length - 1]?.applyToRecordId;
this.timestamp = Date.now();
// eslint-disable-next-line no-async-promise-executor
this.jwtTokenAvailable = new Promise<void>(async (resolve) => {
Expand Down
10 changes: 8 additions & 2 deletions src/core/operationRepo/OperationRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ import { ExecutorStore } from '../executors/ExecutorStore';
import { CoreDelta } from '../models/CoreDeltas';
import { SupportedModel } from '../models/SupportedModels';
import { logMethodCall } from '../../shared/utils/utils';
import { NewRecordsState } from '../../shared/models/NewRecordsState';

export class OperationRepo {
public executorStore: ExecutorStore;
public newRecordsState: NewRecordsState;
private _unsubscribeFromModelRepo: () => void;
private _deltaQueue: CoreDelta<SupportedModel>[] = [];
static DELTAS_BATCH_PROCESSING_TIME = 1;

constructor(private modelRepo: ModelRepo) {
this.executorStore = new ExecutorStore();
constructor(
private modelRepo: ModelRepo,
newRecordsState: NewRecordsState,
) {
this.newRecordsState = newRecordsState;
this.executorStore = new ExecutorStore(this.newRecordsState);

this._unsubscribeFromModelRepo = this.modelRepo.subscribe(
(delta: CoreDelta<SupportedModel>) => {
Expand Down
9 changes: 8 additions & 1 deletion src/core/requestService/SubscriptionRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ export default class SubscriptionRequests {
logMethodCall('SubscriptionRequests.addSubscription', operation);

const appId = await MainHelper.getAppId();
const { subscription, aliasPair } = processSubscriptionOperation(operation);
const { subscription, aliasPair, subscriptionId } =
processSubscriptionOperation(operation);

const response = await RequestService.createSubscription(
{ appId },
aliasPair,
{ subscription },
);

const status = response.status;
if (status >= 200 && status < 300) {
OneSignal.coreDirector.getNewRecordsState().add(subscriptionId);
}

return SubscriptionRequests._processSubscriptionResponse(response);
}

Expand Down
Loading

0 comments on commit a49e8e7

Please sign in to comment.