Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed duplication of listeners #236

Open
wants to merge 2 commits into
base: fix/reduction-flow-halt
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 15 additions & 43 deletions packages/deployment/src/queue/BullQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import {
Closeable,
InstantiatedQueue,
TaskQueue,
SequencerModule,
AbstractTaskQueue,
} from "@proto-kit/sequencer";

import { InstantiatedBullQueue } from "./InstantiatedBullQueue";

export interface BullQueueConfig {
redis: {
host: string;
Expand All @@ -23,7 +25,7 @@ export interface BullQueueConfig {
* TaskQueue implementation for BullMQ
*/
export class BullQueue
extends SequencerModule<BullQueueConfig>
extends AbstractTaskQueue<BullQueueConfig>
implements TaskQueue
{
private activePromise?: Promise<void>;
Expand All @@ -40,6 +42,8 @@ export class BullQueue
// This is by far not optimal - since it still picks up 1 task per queue but waits until
// computing them, so that leads to bad performance over multiple workers.
// For that we need to restructure tasks to be flowing through a single queue however

// TODO Use worker.pause()
while (this.activePromise !== undefined) {
// eslint-disable-next-line no-await-in-loop
await this.activePromise;
Expand Down Expand Up @@ -80,50 +84,18 @@ export class BullQueue
}

public async getQueue(queueName: string): Promise<InstantiatedQueue> {
const { retryAttempts, redis } = this.config;

const queue = new Queue<TaskPayload, TaskPayload>(queueName, {
connection: redis,
});
const events = new QueueEvents(queueName, { connection: redis });
return this.createOrGetQueue(queueName, (name) => {
log.debug(`Creating bull queue ${queueName}`);

await queue.drain();
const { redis } = this.config;

return {
name: queueName,

async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
log.debug("Adding task: ", payload);
const job = await queue.add(queueName, payload, {
attempts: retryAttempts ?? 2,
});
return { taskId: job.id! };
},
const queue = new Queue<TaskPayload, TaskPayload>(queueName, {
connection: redis,
});
const events = new QueueEvents(queueName, { connection: redis });

async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
events.on("completed", async (result) => {
log.debug("Completed task: ", result);
try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
await listener(result.returnvalue as unknown as TaskPayload);
} catch (e) {
// Catch error explicitly since this promise is dangling,
// therefore any error will be voided as well
log.error(e);
}
});
events.on("error", async (error) => {
log.error("Error in worker", error);
});
await events.waitUntilReady();
},

async close(): Promise<void> {
await events.close();
await queue.drain();
await queue.close();
},
};
return new InstantiatedBullQueue(name, queue, events, this.config);
});
}

public async start() {
Expand Down
70 changes: 70 additions & 0 deletions packages/deployment/src/queue/InstantiatedBullQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {
InstantiatedQueue,
ListenerList,
TaskPayload,
} from "@proto-kit/sequencer";
import { log } from "@proto-kit/common";
import { Queue, QueueEvents } from "bullmq";

export class InstantiatedBullQueue implements InstantiatedQueue {
public constructor(
public readonly name: string,
private readonly queue: Queue,
private readonly events: QueueEvents,
private readonly options: {
retryAttempts?: number;
}
) {}

initialized = false;

listeners = new ListenerList<TaskPayload>();

public async initialize() {
await this.queue.drain();
}

public async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
log.debug("Adding task: ", payload);
const job = await this.queue.add(this.name, payload, {
attempts: this.options.retryAttempts ?? 2,
});
return { taskId: job.id! };
}

async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
if (!this.initialized) {
await this.events.waitUntilReady();

this.events.on("completed", async (result) => {
log.debug("Completed task: ", result);
try {
await this.listeners.executeListeners(
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
result.returnvalue as unknown as TaskPayload
);
} catch (e) {
// Catch error explicitly since this promise is dangling,
// therefore any error will be voided as well
log.error(e);
}
});
this.events.on("error", async (error) => {
log.error("Error in worker", error);
});
this.initialized = true;
}

return this.listeners.pushListener(listener);
}

async offCompleted(listenerId: number) {
this.listeners.removeListener(listenerId);
}

async close(): Promise<void> {
await this.events.close();
await this.queue.drain();
await this.queue.close();
}
}
2 changes: 2 additions & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export * from "./worker/flow/JSONTaskSerializer";
// export * from "./worker/queue/BullQueue";
export * from "./worker/queue/TaskQueue";
export * from "./worker/queue/LocalTaskQueue";
export * from "./worker/queue/ListenerList";
export * from "./worker/queue/AbstractTaskQueue";
export * from "./worker/worker/FlowTaskWorker";
export * from "./worker/worker/LocalTaskWorkerModule";
export * from "./worker/worker/TaskWorkerModule";
Expand Down
113 changes: 30 additions & 83 deletions packages/sequencer/src/worker/flow/Flow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { inject, injectable, Lifecycle, scoped } from "tsyringe";
import { log } from "@proto-kit/common";
import { inject, injectable } from "tsyringe";
import { log, mapSequential } from "@proto-kit/common";

import { Closeable, InstantiatedQueue, TaskQueue } from "../queue/TaskQueue";

Expand All @@ -12,68 +12,6 @@ const errors = {
),
};

@injectable()
// ResolutionScoped => We want a new instance every time we resolve it
@scoped(Lifecycle.ResolutionScoped)
export class ConnectionHolder implements Closeable {
private queues: {
[key: string]: InstantiatedQueue;
} = {};

private listeners: {
[key: string]: {
[key: string]: (payload: TaskPayload) => Promise<void>;
};
} = {};

public constructor(
@inject("TaskQueue") private readonly queueImpl: TaskQueue
) {}

public registerListener(
flowId: string,
queue: string,
listener: (payload: TaskPayload) => Promise<void>
) {
if (this.listeners[queue] === undefined) {
this.listeners[queue] = {};
}
this.listeners[queue][flowId] = listener;
}

public unregisterListener(flowId: string, queue: string) {
delete this.listeners[queue][flowId];
}

private async openQueue(name: string): Promise<InstantiatedQueue> {
const queue = await this.queueImpl.getQueue(name);
await queue.onCompleted(async (payload) => {
await this.onCompleted(name, payload);
});
return queue;
}

private async onCompleted(name: string, payload: TaskPayload) {
const listener = this.listeners[name]?.[payload.flowId];
if (listener !== undefined) {
await listener(payload);
}
}

public async getQueue(name: string) {
if (this.queues[name] !== undefined) {
return this.queues[name];
}
const queue = await this.openQueue(name);
this.queues[name] = queue;
return queue;
}

async close() {
// TODO
}
}

interface CompletedCallback<Input, Result> {
(result: Result, originalInput: Input): Promise<any>;
}
Expand All @@ -83,7 +21,10 @@ export class Flow<State> implements Closeable {
// therefore cancelled
private erroredOut = false;

private readonly registeredListeners: string[] = [];
private readonly registeredListeners: {
queueName: string;
listenerId: number;
}[] = [];

private resultsPending: {
[key: string]: (payload: TaskPayload) => Promise<void>;
Expand All @@ -98,28 +39,28 @@ export class Flow<State> implements Closeable {
public tasksInProgress = 0;

public constructor(
private readonly connectionHolder: ConnectionHolder,
private readonly queueImpl: TaskQueue,
public readonly flowId: string,
public state: State
) {}

private waitForResult(
queue: string,
private async waitForResult(
queue: InstantiatedQueue,
taskId: string,
callback: (payload: TaskPayload) => Promise<void>
) {
this.resultsPending[taskId] = callback;

if (!this.registeredListeners.includes(queue)) {
// Open Listener onto Connectionhandler
this.connectionHolder.registerListener(
this.flowId,
queue,
async (payload) => {
if (!this.registeredListeners.find((l) => l.queueName === queue.name)) {
const listenerId = await queue.onCompleted(async (payload) => {
if (payload.flowId === this.flowId) {
await this.resolveResponse(payload);
}
);
this.registeredListeners.push(queue);
});
this.registeredListeners.push({
queueName: queue.name,
listenerId,
});
}
}

Expand Down Expand Up @@ -167,7 +108,7 @@ export class Flow<State> implements Closeable {
): Promise<void> {
const queueName = task.name;
const taskName = overrides?.taskName ?? task.name;
const queue = await this.connectionHolder.getQueue(queueName);
const queue = await this.queueImpl.getQueue(queueName);

const payload = await task.inputSerializer().toJSON(input);

Expand Down Expand Up @@ -197,7 +138,7 @@ export class Flow<State> implements Closeable {
this.tasksInProgress -= 1;
return await completed?.(decoded, input);
};
this.waitForResult(queueName, taskId, callback);
await this.waitForResult(queue, taskId, callback);
}

public async forEach<Type>(
Expand All @@ -222,17 +163,23 @@ export class Flow<State> implements Closeable {
}

public async close() {
this.registeredListeners.forEach((queue) => {
this.connectionHolder.unregisterListener(this.flowId, queue);
});
await mapSequential(
this.registeredListeners,
async ({ queueName, listenerId }) => {
const queue = await this.queueImpl.getQueue(queueName);
queue.offCompleted(listenerId);
}
);
}
}

@injectable()
export class FlowCreator {
public constructor(private readonly connectionHolder: ConnectionHolder) {}
public constructor(
@inject("TaskQueue") private readonly queueImpl: TaskQueue
) {}

public createFlow<State>(flowId: string, state: State): Flow<State> {
return new Flow(this.connectionHolder, flowId, state);
return new Flow(this.queueImpl, flowId, state);
}
}
19 changes: 19 additions & 0 deletions packages/sequencer/src/worker/queue/AbstractTaskQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { SequencerModule } from "../../sequencer/builder/SequencerModule";

import type { InstantiatedQueue } from "./TaskQueue";

export abstract class AbstractTaskQueue<
Config,
> extends SequencerModule<Config> {
protected queues: Record<string, InstantiatedQueue> = {};

protected createOrGetQueue(
name: string,
creator: (name: string) => InstantiatedQueue
): InstantiatedQueue {
if (this.queues[name] === undefined) {
this.queues[name] = creator(name);
}
return this.queues[name];
}
}
Loading