Skip to content

Commit

Permalink
Merge pull request #119 from resonatehq/main
Browse files Browse the repository at this point in the history
Release 0.5.3
  • Loading branch information
dfarr authored May 22, 2024
2 parents d4e506e + c3f1166 commit 26afe8a
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 339 deletions.
13 changes: 7 additions & 6 deletions lib/core/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ export enum ErrorCodes {

// store
STORE = 40,
STORE_PAYLOAD = 41,
STORE_FORBIDDEN = 42,
STORE_NOT_FOUND = 43,
STORE_ALREADY_EXISTS = 44,
STORE_INVALID_STATE = 45,
STORE_ENCODER = 46,
STORE_UNAUTHORIZED = 41,
STORE_PAYLOAD = 42,
STORE_FORBIDDEN = 43,
STORE_NOT_FOUND = 44,
STORE_ALREADY_EXISTS = 45,
STORE_INVALID_STATE = 46,
STORE_ENCODER = 47,
}

export class ResonateError extends Error {
Expand Down
59 changes: 57 additions & 2 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ import { IStore } from "./store";
* Resonate configuration options.
*/
export type ResonateOptions = {
/**
* Store authentication options.
*/
auth: AuthOptions;

/**
* An encoder instance used for encoding and decoding values
* returned (or thrown) by registered functions. If not provided,
* a default JSON encoder will be used.
*/
encoder: IEncoder<unknown, string | undefined>;

/**
* The frequency in ms to heartbeat locks.
*/
heartbeat: number;

/**
* A process id that can be used to uniquely identify this Resonate
* instance. If not provided a default value will be generated.
Expand Down Expand Up @@ -43,8 +53,8 @@ export type ResonateOptions = {
tags: Record<string, string>;

/**
* A store instance, if provided this will take precedence over a
* remote store.
* A store instance, if provided will take predence over the
* default store.
*/
store: IStore;

Expand Down Expand Up @@ -124,3 +134,48 @@ export type PartialOptions = Partial<Options> & { __resonate: true };
export function isOptions(o: unknown): o is PartialOptions {
return typeof o === "object" && o !== null && (o as PartialOptions).__resonate === true;
}

export type StoreOptions = {
/**
* The store authentication options.
*/
auth: AuthOptions;

/**
* The store encoder, defaults to a base64 encoder.
*/
encoder: IEncoder<string, string>;

/**
* The frequency in ms to heartbeat locks.
*/
heartbeat: number;

/**
* A logger instance, if not provided a default logger will be
* used.
*/
logger: ILogger;

/**
* A process id that can be used to uniquely identify this Resonate
* instance. If not provided a default value will be generated.
*/
pid: string;

/**
* Number of retries to attempt before throwing an error. If not
* provided, a default value will be used.
*/
retries: number;
};

export type AuthOptions = {
/**
* Basic auth credentials.
*/
basic: {
password: string;
username: string;
};
};
10 changes: 7 additions & 3 deletions lib/core/promises/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,19 +410,23 @@ export class DurablePromise<T> {
await this.poll();

// set timeout promise
let timeoutId: NodeJS.Timeout | undefined;
const timeoutPromise =
timeout === Infinity
? new Promise(() => {}) // wait forever
: new Promise((resolve) => setTimeout(resolve, timeout));
: new Promise((resolve) => (timeoutId = setTimeout(resolve, timeout)));

// await either:
// - completion of the promise
// - timeout
await Promise.any([this.completed, timeoutPromise]);
await Promise.race([this.completed, timeoutPromise]);

// stop polling interval
// clear polling interval
clearInterval(this.interval);

// clear timeout
clearTimeout(timeoutId);

// throw error if timeout occcured
if (this.pending) {
throw new Error("Timeout occured while waiting for promise to complete");
Expand Down
25 changes: 17 additions & 8 deletions lib/core/stores/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as cronParser from "cron-parser";
import { ErrorCodes, ResonateError } from "../errors";
import { ILogger } from "../logger";
import { Logger } from "../loggers/logger";
import { StoreOptions } from "../options";
import {
DurablePromise,
PendingPromise,
Expand All @@ -26,18 +27,22 @@ export class LocalStore implements IStore {
public schedules: LocalScheduleStore;
public locks: LocalLockStore;

public readonly logger: ILogger;

private toSchedule: Schedule[] = [];
private next: number | undefined = undefined;

constructor(
private logger: ILogger = new Logger(),
opts: Partial<StoreOptions> = {},
promiseStorage: IStorage<DurablePromise> = new WithTimeout(new MemoryStorage<DurablePromise>()),
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
) {
this.promises = new LocalPromiseStore(promiseStorage);
this.schedules = new LocalScheduleStore(scheduleStorage, this);
this.locks = new LocalLockStore(lockStorage);
this.promises = new LocalPromiseStore(this, promiseStorage);
this.schedules = new LocalScheduleStore(this, scheduleStorage);
this.locks = new LocalLockStore(this, lockStorage);

this.logger = opts.logger ?? new Logger();

this.init();
}
Expand Down Expand Up @@ -115,7 +120,10 @@ export class LocalStore implements IStore {
}

export class LocalPromiseStore implements IPromiseStore {
constructor(private storage: IStorage<DurablePromise> = new MemoryStorage<DurablePromise>()) {}
constructor(
private store: LocalStore,
private storage: IStorage<DurablePromise>,
) {}

async create(
id: string,
Expand Down Expand Up @@ -327,8 +335,8 @@ export class LocalPromiseStore implements IPromiseStore {

export class LocalScheduleStore implements IScheduleStore {
constructor(
private storage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
private store: LocalStore | undefined = undefined,
private store: LocalStore,
private storage: IStorage<Schedule>,
) {}

async create(
Expand Down Expand Up @@ -456,7 +464,8 @@ export class LocalScheduleStore implements IScheduleStore {

export class LocalLockStore implements ILockStore {
constructor(
private storage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
private store: LocalStore,
private storage: IStorage<{ id: string; eid: string }>,
) {}

async tryAcquire(id: string, eid: string): Promise<boolean> {
Expand Down
Loading

0 comments on commit 26afe8a

Please sign in to comment.