Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

レプリケーション設定時におけるinsertOne()の挙動を調整 #15109

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
- Fix: ユーザーのプロフィール画面をアドレス入力などで直接表示した際に概要タブの描画に失敗する問題の修正( #15032 )
- Fix: 起動前の疎通チェックが機能しなくなっていた問題を修正
(Cherry-picked from https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/737)

- Fix: リードレプリカ設定時にレコードの追加・更新・削除を伴うクエリを発行した際はmasterノードで実行されるように調整( #10897 )

## 2024.11.0

Expand Down
44 changes: 37 additions & 7 deletions packages/backend/src/models/_.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/

import { FindOneOptions, InsertQueryBuilder, ObjectLiteral, Repository, SelectQueryBuilder, TypeORMError } from 'typeorm';
import { DriverUtils } from 'typeorm/driver/DriverUtils.js';
import {
FindOneOptions,
InsertQueryBuilder,
ObjectLiteral,
QueryRunner,
Repository,
SelectQueryBuilder,
} from 'typeorm';
import { RelationCountLoader } from 'typeorm/query-builder/relation-count/RelationCountLoader.js';
import { RelationIdLoader } from 'typeorm/query-builder/relation-id/RelationIdLoader.js';
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer.js';
import { ObjectUtils } from 'typeorm/util/ObjectUtils.js';
import { OrmUtils } from 'typeorm/util/OrmUtils.js';
import {
RawSqlResultsToEntityTransformer,
} from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer.js';
import { PostgresConnectionOptions } from 'typeorm/driver/postgres/PostgresConnectionOptions.js';
import { MiAbuseUserReport } from '@/models/AbuseUserReport.js';
import { MiAbuseReportNotificationRecipient } from '@/models/AbuseReportNotificationRecipient.js';
import { MiAccessToken } from '@/models/AccessToken.js';
Expand Down Expand Up @@ -83,7 +90,11 @@ import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialE

export interface MiRepository<T extends ObjectLiteral> {
createTableColumnNames(this: Repository<T> & MiRepository<T>): string[];

insertOne(this: Repository<T> & MiRepository<T>, entity: QueryDeepPartialEntity<T>, findOptions?: Pick<FindOneOptions<T>, 'relations'>): Promise<T>;

insertOneImpl(this: Repository<T> & MiRepository<T>, entity: QueryDeepPartialEntity<T>, findOptions?: Pick<FindOneOptions<T>, 'relations'>, queryRunner?: QueryRunner): Promise<T>;

selectAliasColumnNames(this: Repository<T> & MiRepository<T>, queryBuilder: InsertQueryBuilder<T>, builder: SelectQueryBuilder<T>): void;
}

Expand All @@ -92,14 +103,31 @@ export const miRepository = {
return this.metadata.columns.filter(column => column.isSelect && !column.isVirtual).map(column => column.databaseName);
},
async insertOne(entity, findOptions?) {
const opt = this.manager.connection.options as PostgresConnectionOptions;
if (opt.replication) {
Comment on lines +106 to +107
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configに設定したreplicationの値が巡り巡ってこの値に入っています。
レプリケーションが無効の時はnull(かundefinedかは忘れましたが)になっているので、else(=今までの処理)に流れます。

const queryRunner = this.manager.connection.createQueryRunner('master');
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

これによりmasterノードでクエリを実行してくれるrunnerが取れます。

try {
return this.insertOneImpl(entity, findOptions, queryRunner);
} finally {
await queryRunner.release();
}
} else {
return this.insertOneImpl(entity, findOptions);
}
},
async insertOneImpl(entity, findOptions?, queryRunner?) {
// ---- insert + returningの結果を共通テーブル式(CTE)に保持するクエリを生成 ----

const queryBuilder = this.createQueryBuilder().insert().values(entity);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const mainAlias = queryBuilder.expressionMap.mainAlias!;
const name = mainAlias.name;
mainAlias.name = 't';
const columnNames = this.createTableColumnNames();
queryBuilder.returning(columnNames.reduce((a, c) => `${a}, ${queryBuilder.escape(c)}`, '').slice(2));
const builder = this.createQueryBuilder().addCommonTableExpression(queryBuilder, 'cte', { columnNames });

// ---- 共通テーブル式(CTE)から結果を取得 ----
const builder = this.createQueryBuilder(undefined, queryRunner).addCommonTableExpression(queryBuilder, 'cte', { columnNames });
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
builder.expressionMap.mainAlias!.tablePath = 'cte';
this.selectAliasColumnNames(queryBuilder, builder);
Expand Down Expand Up @@ -197,7 +225,9 @@ export {
};

export type AbuseUserReportsRepository = Repository<MiAbuseUserReport> & MiRepository<MiAbuseUserReport>;
export type AbuseReportNotificationRecipientRepository = Repository<MiAbuseReportNotificationRecipient> & MiRepository<MiAbuseReportNotificationRecipient>;
export type AbuseReportNotificationRecipientRepository =
Repository<MiAbuseReportNotificationRecipient>
& MiRepository<MiAbuseReportNotificationRecipient>;
export type AccessTokensRepository = Repository<MiAccessToken> & MiRepository<MiAccessToken>;
export type AdsRepository = Repository<MiAd> & MiRepository<MiAd>;
export type AnnouncementsRepository = Repository<MiAnnouncement> & MiRepository<MiAnnouncement>;
Expand Down
32 changes: 24 additions & 8 deletions packages/backend/src/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

// https://github.com/typeorm/typeorm/issues/2400
import pg from 'pg';
import { DataSource, Logger } from 'typeorm';
import { DataSource, Logger, type QueryRunner } from 'typeorm';
import * as highlight from 'cli-highlight';
import { entities as charts } from '@/core/chart/entities.js';

Expand Down Expand Up @@ -90,6 +90,11 @@ export const dbLogger = new MisskeyLogger('db');
const sqlLogger = dbLogger.createSubLogger('sql', 'gray');

class MyCustomLogger implements Logger {
constructor(
private printReplicationMode?: boolean,
) {
}

@bindThis
private highlight(sql: string) {
return highlight.highlight(sql, {
Expand All @@ -98,18 +103,29 @@ class MyCustomLogger implements Logger {
}

@bindThis
public logQuery(query: string, parameters?: any[]) {
sqlLogger.info(this.highlight(query).substring(0, 100));
private appendPrefixIfNeeded(message: string, opts?: {
queryRunner?: QueryRunner;
}): string {
if (this.printReplicationMode && opts?.queryRunner) {
return `[${opts.queryRunner.getReplicationMode()}] ${message}`;
} else {
return message;
}
}

@bindThis
public logQuery(query: string, parameters?: any[], queryRunner?: QueryRunner) {
sqlLogger.info(this.appendPrefixIfNeeded(this.highlight(query).substring(0, 100), { queryRunner }));
}

@bindThis
public logQueryError(error: string, query: string, parameters?: any[]) {
sqlLogger.error(this.highlight(query));
public logQueryError(error: string, query: string, parameters?: any[], queryRunner?: QueryRunner) {
sqlLogger.error(this.appendPrefixIfNeeded(this.highlight(query), { queryRunner }));
}

@bindThis
public logQuerySlow(time: number, query: string, parameters?: any[]) {
sqlLogger.warn(this.highlight(query));
public logQuerySlow(time: number, query: string, parameters?: any[], queryRunner?: QueryRunner) {
sqlLogger.warn(this.appendPrefixIfNeeded(this.highlight(query), { queryRunner }));
}

@bindThis
Expand Down Expand Up @@ -247,7 +263,7 @@ export function createPostgresDataSource(config: Config) {
},
} : false,
logging: log,
logger: log ? new MyCustomLogger() : undefined,
logger: log ? new MyCustomLogger(config.dbReplications) : undefined,
maxQueryExecutionTime: 300,
entities: entities,
migrations: ['../../migration/*.js'],
Expand Down
Loading