From d5524e947f2f05bf2d6b541ba2e01a7875d3176d Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 19:37:21 +0900 Subject: [PATCH 1/9] Defer instance metadata update --- .../backend/src/core/NoteCreateService.ts | 18 ++++++++- packages/backend/src/misc/collapsed-queue.ts | 24 ++++++++++++ .../queue/processors/InboxProcessorService.ts | 37 +++++++++++++++++-- 3 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 packages/backend/src/misc/collapsed-queue.ts diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 1d8d2483228a..bd9d82835751 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -60,6 +60,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js'; import { isReply } from '@/misc/is-reply.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -151,6 +152,7 @@ type Option = { @Injectable() export class NoteCreateService implements OnApplicationShutdown { #shutdownController = new AbortController(); + private updateNotesCountQueue: CollapsedQueue; constructor( @Inject(DI.config) @@ -218,7 +220,9 @@ export class NoteCreateService implements OnApplicationShutdown { private instanceChart: InstanceChart, private utilityService: UtilityService, private userBlockingService: UserBlockingService, - ) { } + ) { + this.updateNotesCountQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseNotesCount, this.performUpdateNotesCount); + } @bindThis public async create(user: { @@ -516,7 +520,7 @@ export class NoteCreateService implements OnApplicationShutdown { // Register host if (this.userEntityService.isRemoteUser(user)) { this.federatedInstanceService.fetch(user.host).then(async i => { - this.instancesRepository.increment({ id: i.id }, 'notesCount', 1); + this.updateNotesCountQueue.enqueue(i.id, 1); if ((await this.metaService.fetch()).enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); } @@ -1035,6 +1039,16 @@ export class NoteCreateService implements OnApplicationShutdown { return false; } + @bindThis + private collapseNotesCount(oldValue: number, newValue: number) { + return oldValue + newValue; + } + + @bindThis + private performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + } + @bindThis public dispose(): void { this.#shutdownController.abort(); diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts new file mode 100644 index 000000000000..c263f0b4f869 --- /dev/null +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -0,0 +1,24 @@ +export class CollapsedQueue { + private jobs: Map = new Map(); + + constructor( + private timeout: number, + private collapse: (oldValue: V, newValue: V) => V, + private doJob: (key: K, value: V) => void, + ) { } + + enqueue(key: K, value: V) { + if (this.jobs.has(key)) { + const old = this.jobs.get(key)!; + const merged = this.collapse(old, value); + this.jobs.set(key, merged); + } else { + this.jobs.set(key, value); + setTimeout(() => { + const value = this.jobs.get(key)!; + this.jobs.delete(key); + this.doJob(key, value); + }, this.timeout); + } + } +} \ No newline at end of file diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index fa7009f8f5d9..c1b88aace8e4 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -28,10 +28,18 @@ import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; +import { CollapsedQueue } from "@/misc/collapsed-queue.js" +import { MiNote } from '@/models/Note.js'; + +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; @Injectable() export class InboxProcessorService { private logger: Logger; + private updateInstanceQueue: CollapsedQueue; constructor( private utilityService: UtilityService, @@ -48,6 +56,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -185,11 +194,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: job.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -225,4 +232,26 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public performUpdateInstance(id: string, job: UpdateInstanceJob) { + this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } } From 9c1164db846d5cf086a621f24223a2f766eae292 Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 19:42:48 +0900 Subject: [PATCH 2/9] Fix last new line --- packages/backend/src/misc/collapsed-queue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index c263f0b4f869..731ff58d2840 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -21,4 +21,4 @@ export class CollapsedQueue { }, this.timeout); } } -} \ No newline at end of file +} From 4c2404c4268c88a91b611e99261ec2ba7a2197d2 Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 19:53:25 +0900 Subject: [PATCH 3/9] Fix typo --- packages/backend/src/queue/processors/InboxProcessorService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index c1b88aace8e4..25ec7d3628a4 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -196,7 +196,7 @@ export class InboxProcessorService { this.federatedInstanceService.fetch(authUser.user.host).then(i => { this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - shouldUnsuspend: job.suspensionState === 'autoSuspendedForNotResponding', + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); From cee59662154b4f2d0653b515458498095ec5590b Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 19:55:52 +0900 Subject: [PATCH 4/9] Add license notice --- packages/backend/src/misc/collapsed-queue.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 731ff58d2840..1d2750b19c12 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + export class CollapsedQueue { private jobs: Map = new Map(); From 7466ec47771b0f3be2d2e68f936d40d49d6d60d0 Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 19:59:46 +0900 Subject: [PATCH 5/9] Fix syntax --- packages/backend/src/queue/processors/InboxProcessorService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 25ec7d3628a4..69f868fcc873 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -28,7 +28,7 @@ import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; -import { CollapsedQueue } from "@/misc/collapsed-queue.js" +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; type UpdateInstanceJob = { From ae0b237e3b0cc897cf993aca6b19cd90caeee287 Mon Sep 17 00:00:00 2001 From: Hidekazu Kobayashi Date: Mon, 16 Sep 2024 11:35:22 +0000 Subject: [PATCH 6/9] Perform deferred jobs on shutdown --- .../backend/src/core/NoteCreateService.ts | 7 ++-- packages/backend/src/misc/collapsed-queue.ts | 32 +++++++++++++------ .../queue/processors/InboxProcessorService.ts | 18 ++++++++--- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index bd9d82835751..31070e5f0906 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1050,12 +1050,13 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - public dispose(): void { + public async dispose(): Promise { this.#shutdownController.abort(); + await this.updateNotesCountQueue.performAllNow(); } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async onApplicationShutdown(signal?: string | undefined): Promise { + await this.dispose(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 1d2750b19c12..dee6df32bd2e 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -3,27 +3,41 @@ * SPDX-License-Identifier: AGPL-3.0-only */ +type Job = { + value: V; + timer: NodeJS.Timeout; +}; + export class CollapsedQueue { - private jobs: Map = new Map(); + private jobs: Map> = new Map(); constructor( private timeout: number, private collapse: (oldValue: V, newValue: V) => V, - private doJob: (key: K, value: V) => void, - ) { } + private perform: (key: K, value: V) => Promise, + ) {} enqueue(key: K, value: V) { if (this.jobs.has(key)) { const old = this.jobs.get(key)!; - const merged = this.collapse(old, value); - this.jobs.set(key, merged); + const merged = this.collapse(old.value, value); + this.jobs.set(key, { ...old, value: merged }); } else { - this.jobs.set(key, value); - setTimeout(() => { - const value = this.jobs.get(key)!; + const timer = setTimeout(() => { + const job = this.jobs.get(key)!; this.jobs.delete(key); - this.doJob(key, value); + this.perform(key, job.value); }, this.timeout); + this.jobs.set(key, { value, timer }); + } + } + + async performAllNow() { + const entries = [...this.jobs.entries()]; + this.jobs.clear(); + for (const [_key, job] of entries) { + clearTimeout(job.timer); } + await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); } } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 69f868fcc873..fa929a4d7796 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Injectable } from '@nestjs/common'; +import { Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -26,10 +26,10 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; type UpdateInstanceJob = { latestRequestReceivedAt: Date, @@ -37,7 +37,7 @@ type UpdateInstanceJob = { }; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; private updateInstanceQueue: CollapsedQueue; @@ -254,4 +254,14 @@ export class InboxProcessorService { suspensionState: job.shouldUnsuspend ? 'none' : undefined, }); } + + @bindThis + public async dispose(): Promise { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } } From 51d76115a4d1917306a158a1502c62d2bfb32700 Mon Sep 17 00:00:00 2001 From: Hidekazu Kobayashi Date: Mon, 16 Sep 2024 11:43:06 +0000 Subject: [PATCH 7/9] Fix missing async/await --- packages/backend/src/core/NoteCreateService.ts | 4 ++-- .../backend/src/queue/processors/InboxProcessorService.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 31070e5f0906..5221b1e2c429 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1045,8 +1045,8 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - private performUpdateNotesCount(id: MiNote['id'], incrBy: number) { - this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + private async async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); } @bindThis diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index fa929a4d7796..d5d0f83bfcc2 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -246,8 +246,8 @@ export class InboxProcessorService implements OnApplicationShutdown { } @bindThis - public performUpdateInstance(id: string, job: UpdateInstanceJob) { - this.federatedInstanceService.update(id, { + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { latestRequestReceivedAt: new Date(), isNotResponding: false, // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる From 0de1e59b982d4f58944ae11a093f1094bdfdfada Mon Sep 17 00:00:00 2001 From: Hidekazu Kobayashi Date: Mon, 16 Sep 2024 11:48:25 +0000 Subject: [PATCH 8/9] Fix typo :) --- packages/backend/src/core/NoteCreateService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 5221b1e2c429..b310f31cd429 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1045,7 +1045,7 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - private async async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); } From 3114897a55e422eb6460e7fd56e313446ff88e4c Mon Sep 17 00:00:00 2001 From: syuilo <4439005+syuilo@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:22:07 +0900 Subject: [PATCH 9/9] Update collapsed-queue.ts --- packages/backend/src/misc/collapsed-queue.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index dee6df32bd2e..5bc20a78ae29 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -8,6 +8,7 @@ type Job = { timer: NodeJS.Timeout; }; +// TODO: redis使えるようにする export class CollapsedQueue { private jobs: Map> = new Map();