From 0c86dc82c3c20cf5074afcdbf9febc68476cd25b Mon Sep 17 00:00:00 2001 From: Hidekazu Kobayashi Date: Mon, 16 Sep 2024 11:42:13 +0000 Subject: [PATCH] Perform deferred jobs on shutdown --- .../backend/src/core/NoteCreateService.ts | 11 +++--- packages/backend/src/misc/collapsed-queue.ts | 37 ++++++++++++++----- .../queue/processors/InboxProcessorService.ts | 22 ++++++++--- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index d16ebac2a0bf..613620840811 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1101,17 +1101,18 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - private performUpdateNotesCount(id: MiNote['id'], incrBy: number) { - this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); } @bindThis - public dispose(): void { + public async dispose(): Promise { this.#shutdownController.abort(); + await this.updateNotesCountQueue.performAllNow(); } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async onApplicationShutdown(signal?: string | undefined): Promise { + await this.dispose(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 731ff58d2840..dee6df32bd2e 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -1,24 +1,43 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +type Job = { + value: V; + timer: NodeJS.Timeout; +}; + export class CollapsedQueue { - private jobs: Map = new Map(); + private jobs: Map> = new Map(); constructor( private timeout: number, private collapse: (oldValue: V, newValue: V) => V, - private doJob: (key: K, value: V) => void, - ) { } + private perform: (key: K, value: V) => Promise, + ) {} enqueue(key: K, value: V) { if (this.jobs.has(key)) { const old = this.jobs.get(key)!; - const merged = this.collapse(old, value); - this.jobs.set(key, merged); + const merged = this.collapse(old.value, value); + this.jobs.set(key, { ...old, value: merged }); } else { - this.jobs.set(key, value); - setTimeout(() => { - const value = this.jobs.get(key)!; + const timer = setTimeout(() => { + const job = this.jobs.get(key)!; this.jobs.delete(key); - this.doJob(key, value); + this.perform(key, job.value); }, this.timeout); + this.jobs.set(key, { value, timer }); + } + } + + async performAllNow() { + const entries = [...this.jobs.entries()]; + this.jobs.clear(); + for (const [_key, job] of entries) { + clearTimeout(job.timer); } + await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); } } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index e87a29b54d43..3b9a479a64ce 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Injectable } from '@nestjs/common'; +import { Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -26,13 +26,13 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; private updateInstanceQueue: CollapsedQueue; @@ -219,10 +219,20 @@ export class InboxProcessorService { } @bindThis - public performUpdateInstance(id: string, value: Date) { - this.federatedInstanceService.update(id, { + public async performUpdateInstance(id: string, value: Date) { + await this.federatedInstanceService.update(id, { latestRequestReceivedAt: value, isNotResponding: false, }); } + + @bindThis + public async dispose(): Promise { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } }