Skip to content

Commit

Permalink
fix: prettier
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 committed May 29, 2019
1 parent 6ce83e9 commit 55dfa99
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 79 deletions.
87 changes: 30 additions & 57 deletions lib/src/entities/saga-step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ export interface ISagaStep {
workerName: string;
status?: SagaStepStatus;
compensatorId?: string;
result?: any,
dependencyArgs?: any[],
result?: any;
dependencyArgs?: any[];
}

class SagaStep extends Entity<ISagaStep> {
Expand Down Expand Up @@ -85,15 +85,9 @@ class SagaStep extends Entity<ISagaStep> {
});

// Update the current step so that it points to the compensator
await update(
this.client,
this.namespace,
getSagaStepTableName(this.sagaId),
id,
{
compensatorId: newStep.getId(),
},
);
await update(this.client, this.namespace, getSagaStepTableName(this.sagaId), id, {
compensatorId: newStep.getId(),
});

return newStep;
}
Expand All @@ -111,29 +105,23 @@ class SagaStep extends Entity<ISagaStep> {
}

const stepValues = await this.getValues();
const dependencySteps = (stepValues.dependsOn && stepValues.dependsOn.length > 0)
? await getMultiple<ISagaStep>(
this.client, this.namespace, getSagaStepTableName(sagaId), stepValues.dependsOn
) : [];
const dependencySteps =
stepValues.dependsOn && stepValues.dependsOn.length > 0
? await getMultiple<ISagaStep>(this.client, this.namespace, getSagaStepTableName(sagaId), stepValues.dependsOn)
: [];

const errorDependency = dependencySteps.find(
dep => (dep.status !== SagaStepStatus.Finished && dep.status !== SagaStepStatus.RolledBack)
dep => dep.status !== SagaStepStatus.Finished && dep.status !== SagaStepStatus.RolledBack,
);
if (errorDependency) {
// This shouldn't happen as we make sure all deps are finished in tick()
throw Error('Unexpected error: Enqueueing saga step with an unfinished dependency');
}

await update(
this.client,
this.namespace,
getSagaStepTableName(sagaId),
id,
{
dependencyArgs: dependencySteps.map(dep => dep.result),
status: SagaStepStatus.Queued,
},
);
await update(this.client, this.namespace, getSagaStepTableName(sagaId), id, {
dependencyArgs: dependencySteps.map(dep => dep.result),
status: SagaStepStatus.Queued,
});
}

/**
Expand All @@ -147,16 +135,10 @@ class SagaStep extends Entity<ISagaStep> {
if (!sagaId || !id) {
throw Error('Finishing saga step for an uninitialized step');
}
await update(
this.client,
this.namespace,
getSagaStepTableName(sagaId),
id,
{
result,
status: SagaStepStatus.Finished,
},
);
await update(this.client, this.namespace, getSagaStepTableName(sagaId), id, {
result,
status: SagaStepStatus.Finished,
});
}

/**
Expand All @@ -170,15 +152,9 @@ class SagaStep extends Entity<ISagaStep> {
throw Error('Failing saga step for an uninitialized step');
}

await update(
this.client,
this.namespace,
getSagaStepTableName(sagaId),
id,
{
status: SagaStepStatus.Failed,
},
);
await update(this.client, this.namespace, getSagaStepTableName(sagaId), id, {
status: SagaStepStatus.Failed,
});
}

/**
Expand All @@ -192,22 +168,19 @@ class SagaStep extends Entity<ISagaStep> {
if (!sagaId || !id) {
throw Error('Rolling back saga step for an uninitialized step');
}
const {compensatorId} = await this.getValues();

await update(
this.client,
this.namespace,
getSagaStepTableName(sagaId),
id,
{
status: SagaStepStatus.RolledBack,
},
);
const { compensatorId } = await this.getValues();

await update(this.client, this.namespace, getSagaStepTableName(sagaId), id, {
status: SagaStepStatus.RolledBack,
});

// Get the compensator if any
if (compensatorId) {
const compensatorValues = await get<ISagaStep>(
this.client, this.namespace, getSagaStepTableName(sagaId), compensatorId
this.client,
this.namespace,
getSagaStepTableName(sagaId),
compensatorId,
);

// Enqueue the compensator
Expand Down
32 changes: 11 additions & 21 deletions lib/src/entities/saga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ class Saga extends Entity<ISaga> {
* @param args The arguments to pass to the worker.
* @param dependsOnSteps Defaults to []. If included, will not enqueue the step until all dependencies are complete. Also, will send the results of the dependencies to the worker when executing it.
*/
public async addStep(
workerName: string,
args: any[],
dependsOnSteps: string[] = [],
) {
public async addStep(workerName: string, args: any[], dependsOnSteps: string[] = []) {
const id = this.getId();
if (!id) {
throw Error('Cannot add a step for an uninitialized Saga');
Expand All @@ -63,9 +59,11 @@ class Saga extends Entity<ISaga> {
* @param stepId The id of the step that finished
* @param result The result of the execution of this step
*/
public async stepFinished(stepId: string, result?: any){
public async stepFinished(stepId: string, result?: any) {
try {
if(result) { JSON.parse(JSON.stringify(result)); }
if (result) {
JSON.parse(JSON.stringify(result));
}
} catch (error) {
throw Error('Error in stepFinished: `result` must be JSON encodable.');
}
Expand All @@ -75,9 +73,7 @@ class Saga extends Entity<ISaga> {
throw Error('Error in stepFinished: Saga is not initialized');
}

const stepValues = await get<ISagaStep>(
this.client, this.namespace, getSagaStepTableName(id), stepId
);
const stepValues = await get<ISagaStep>(this.client, this.namespace, getSagaStepTableName(id), stepId);
let step = new SagaStep(this.client, this.namespace);
step = step.instantiateFromSaga(id, stepValues);
await step.finished(result);
Expand All @@ -92,7 +88,7 @@ class Saga extends Entity<ISaga> {
* (using their compensators).
* @param stepId The id of the step that failed
*/
public async stepFailed(stepId: string){
public async stepFailed(stepId: string) {
const id = this.getId();
if (!id) {
throw Error('Error in stepFailed: Saga is not initialized');
Expand All @@ -108,9 +104,7 @@ class Saga extends Entity<ISaga> {
const stepsToRollback = allSteps.filter(currStep => currStep.status === SagaStepStatus.Finished);

// Mark current step as failed
const failedStepValues = await get<ISagaStep>(
this.client, this.namespace, getSagaStepTableName(id), stepId
);
const failedStepValues = await get<ISagaStep>(this.client, this.namespace, getSagaStepTableName(id), stepId);
let failedStep = new SagaStep(this.client, this.namespace);
failedStep = failedStep.instantiateFromSaga(id, failedStepValues);
await failedStep.fail();
Expand Down Expand Up @@ -152,9 +146,7 @@ class Saga extends Entity<ISaga> {
}

const allStepIds = await getIds(this.client, this.namespace, getSagaStepTableName(id));
const allSteps = await getMultiple<ISagaStep>(
this.client, this.namespace, getSagaStepTableName(id), allStepIds
);
const allSteps = await getMultiple<ISagaStep>(this.client, this.namespace, getSagaStepTableName(id), allStepIds);

return allSteps;
}
Expand All @@ -175,15 +167,13 @@ class Saga extends Entity<ISaga> {
const unqueuedSteps = allSteps.filter(step => step.status === SagaStepStatus.Created);

if (unqueuedSteps.length === 0) {
await update<ISaga>(
this.client, this.namespace, this.tableName, id, { status: SagaStatus.Finished }
);
await update<ISaga>(this.client, this.namespace, this.tableName, id, { status: SagaStatus.Finished });
return;
}

const stepsToExecute: ISagaStep[] = [];
unqueuedSteps.forEach(unqueuedStep => {
const dependencies = unqueuedStep.dependsOn || [];;
const dependencies = unqueuedStep.dependsOn || [];
const currDependentSteps = allSteps.filter(step => step.id && dependencies.includes(step.id));
let allFinished = false;
if (!currDependentSteps || currDependentSteps.length === 0) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "brokkr",
"version": "0.1.0-prerelease-1",
"version": "0.1.0-prerelease-2",
"description": "Background job processing and orchestrator for Node",
"main": "lib/dist/index.js",
"directories": {
Expand Down

0 comments on commit 55dfa99

Please sign in to comment.