diff --git a/.nvmrc b/.nvmrc deleted file mode 120000 index b711bfe9..00000000 --- a/.nvmrc +++ /dev/null @@ -1 +0,0 @@ -./.node-version \ No newline at end of file diff --git a/packages/core/src/Structures/Client.ts b/packages/core/src/Structures/Client.ts index c57d4d1a..7ea7c79b 100644 --- a/packages/core/src/Structures/Client.ts +++ b/packages/core/src/Structures/Client.ts @@ -61,12 +61,19 @@ export class Client extends EventEmitter { public connect(): void { this.amqp = createAmqpChannel(this.options.amqpUrl, { setup: async (channel: Channel) => this.setupAmqp(channel) - }); + }, { connectionOptions: { timeout: 360_000 } }); this.rest.setToken(this.options.token!); } public async setupAmqp(channel: Channel): Promise { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + this.amqp._onConnect({ connection: this.amqp._connectionManager.connection }); + } + }); + await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); const routing = new RoutedQueue(GatewayExchangeRoutes.DISPATCH, this.clientId, this.options.instanceName); diff --git a/services/kanao-cache/src/Structures/KanaoCache.ts b/services/kanao-cache/src/Structures/KanaoCache.ts index 79be84a5..4807a115 100644 --- a/services/kanao-cache/src/Structures/KanaoCache.ts +++ b/services/kanao-cache/src/Structures/KanaoCache.ts @@ -16,15 +16,15 @@ import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLim export class KanaoCache extends EventEmitter { public cacheQueue = createAmqpChannel(amqp, { setup: async (channel: Channel) => this.setupCacheQueue(channel) - }); + }, { connectionOptions: { timeout: 360_000 } }); public rpcQueue = createAmqpChannel(amqp, { setup: async (channel: Channel) => this.setupRpc(channel) - }); + }, { connectionOptions: { timeout: 360_000 } }); public queryRpcQueue = createAmqpChannel(amqp, { setup: async (channel: Channel) => this.setupQueryRpc(channel) - }); + }, { connectionOptions: { timeout: 360_000 } }); public logger = createLogger("kanao-cache", clientId, storeLogs, lokiHost); @@ -57,6 +57,13 @@ export class KanaoCache extends EventEmitter { } public async setupCacheQueue(channel: Channel): Promise { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + this.cacheQueue._onConnect({ connection: this.cacheQueue._connectionManager.connection }); + } + }); + await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); // Used for receiving receive events from the gateway @@ -78,6 +85,13 @@ export class KanaoCache extends EventEmitter { } public async setupQueryRpc(channel: Channel): Promise { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + this.queryRpcQueue._onConnect({ connection: this.queryRpcQueue._connectionManager.connection }); + } + }); + const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, clientId, "cache-query"); await channel.assertQueue(rpc.queue, { durable: false, autoDelete: true }); await channel.bindQueue(rpc.queue, RabbitMQ.GATEWAY_EXCHANGE, rpc.key); @@ -117,6 +131,13 @@ export class KanaoCache extends EventEmitter { } public async setupRpc(channel: Channel): Promise { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + this.rpcQueue._onConnect({ connection: this.rpcQueue._connectionManager.connection }); + } + }); + // Used for Counts RPC const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.counts`, clientId, "cache-rpc"); await channel.assertQueue(rpc.queue, { durable: false }); diff --git a/services/kanao-gateway/src/Structures/KanaoGateway.ts b/services/kanao-gateway/src/Structures/KanaoGateway.ts index 6c6d723e..ba2dff02 100644 --- a/services/kanao-gateway/src/Structures/KanaoGateway.ts +++ b/services/kanao-gateway/src/Structures/KanaoGateway.ts @@ -129,6 +129,13 @@ export class NezuGateway extends EventEmitter { public setupAmqp(): void { const amqpChannel = createAmqpChannel(amqp, { setup: async (channel: Channel) => { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + amqpChannel._onConnect({ connection: amqpChannel._connectionManager.connection }); + } + }); + await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); // Used for Stats RPC @@ -163,7 +170,7 @@ export class NezuGateway extends EventEmitter { } }); } - }); + }, { connectionOptions: { timeout: 360_000 } }); amqpChannel.on("error", err => this.logger.error(err, "AMQP Channel on main process Error")); amqpChannel.on("close", () => this.logger.warn("AMQP Channel on main process Closed")); diff --git a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts index d5d771df..9a9dfe37 100644 --- a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts +++ b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts @@ -101,6 +101,13 @@ export class ProcessBootstrapper { const amqpChannel = createAmqpChannel(amqp, { setup: async (channel: Channel) => { + channel.on("close", () => { + if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) { + // @ts-expect-error Reconnect workaround + amqpChannel._onConnect({ connection: amqpChannel._connectionManager.connection }); + } + }); + await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); await channel.assertQueue(routing.queue, { durable: false, autoDelete: true }); await channel.consume(routing.queue, async m => this.onConsumeMessage(channel, m)); @@ -109,7 +116,7 @@ export class ProcessBootstrapper { this.data.shardIds.map(async shardId => channel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key)) ); } - }); + }, { connectionOptions: { timeout: 360_000 } }); amqpChannel.on("error", err => this.logger.error(err, `AMQP Channel on process ${this.data.processId} Error`)); amqpChannel.on("close", () => this.logger.warn(`AMQP Channel on process ${this.data.processId} Closed`));