Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-5015 update bullmq metrics plugin to use redis configs instead of redis instances #189

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ This plugin depends on the following peer-installed packages:

Add the plugin to your Fastify instance by registering it with the following possible options:

- `redisClients`, a Redis client instances which are used by the BullMQ: plugin uses it to discover the queues.
- `redisConfigs`, Redis configurations used for BullMQ. Plugin uses them to discover the queues.
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class MetricsCollector {
.filter((queue) => !this.options.excludedQueues.includes(queue.queueName))
.map(
(queue) =>
new ObservableQueue(queue.queueName, queue.redisInstance, this.metrics, this.logger),
new ObservableQueue(queue.queueName, queue.redisConfig, this.metrics, this.logger),
)
}

Expand Down
8 changes: 4 additions & 4 deletions lib/plugins/bull-mq-metrics/ObservableQueue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Queue, QueueEvents } from 'bullmq'
import type { FinishedStatus } from 'bullmq'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'

import type { RedisConfig } from '@lokalise/node-core'
import type { Metrics } from './MetricsCollector'

export class ObservableQueue {
Expand Down Expand Up @@ -33,12 +33,12 @@ export class ObservableQueue {

constructor(
readonly name: string,
private readonly redis: Redis,
readonly redisConfig: RedisConfig,
private readonly metrics: Metrics,
private readonly logger: FastifyBaseLogger,
) {
this.queue = new Queue(name, { connection: redis })
this.events = new QueueEvents(name, { connection: redis, autorun: true })
this.queue = new Queue(name, { connection: redisConfig })
this.events = new QueueEvents(name, { connection: redisConfig, autorun: true })

this.events.on('failed', async ({ jobId }) => {
await this.collectDurationMetric(jobId, 'failed')
Expand Down
32 changes: 18 additions & 14 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common'
import {
backgroundJobProcessorGetActiveQueueIds,
createSanitizedRedisClient,
} from '@lokalise/background-jobs-common'
import type { RedisConfig } from '@lokalise/node-core'
import { PromisePool } from '@supercharge/promise-pool'
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
discoverQueues: () => Promise<RedisQueue[]>
}

type RedisQueue = {
redisInstance: Redis
redisConfig: RedisConfig
queueName: string
}

const QUEUE_DISCOVERY_CONCURRENCY = 3

export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(protected readonly redisInstances: Redis[]) {}
constructor(protected readonly redisConfigs: RedisConfig[]) {}

async discoverQueues(): Promise<RedisQueue[]> {
const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY)
.for(this.redisInstances)
.for(this.redisConfigs)
.process((redisInstance) => this.discoverQueuesForInstance(redisInstance))

if (errors.length > 0) {
Expand All @@ -29,19 +32,20 @@ export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscover
return results.flat()
}

protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]>
protected abstract discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]>
}

export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
constructor(
redisInstances: Redis[],
redisConfigs: RedisConfig[],
private readonly queuesPrefix: string,
) {
super(redisInstances)
super(redisConfigs)
}

protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
const scanStream = redisInstance.scanStream({
protected async discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]> {
const redis = createSanitizedRedisClient(redisConfig)
const scanStream = redis.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})

Expand All @@ -57,17 +61,17 @@ export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer
return Array.from(queues)
.sort()
.map((queueName) => ({
redisInstance: redisInstance,
redisConfig,
queueName,
}))
}
}

export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) =>
protected async discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisConfig).then((queueNames) =>
queueNames.map((queueName) => ({
redisInstance,
redisConfig,
queueName,
})),
)
Expand Down
25 changes: 15 additions & 10 deletions lib/plugins/bullMqMetricsPlugin.spec.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { setTimeout } from 'node:timers/promises'

import { buildClient, sendGet } from '@lokalise/backend-http-client'
import type {
AbstractBackgroundJobProcessor,
BaseJobPayload,
import {
type AbstractBackgroundJobProcessor,
type BaseJobPayload,
createSanitizedRedisClient,
} from '@lokalise/background-jobs-common'
import type { FastifyInstance } from 'fastify'
import fastify from 'fastify'
import type { Redis } from 'ioredis'
import type Redis from 'ioredis'

import { TestBackgroundJobProcessor } from '../../test/mocks/TestBackgroundJobProcessor'
import { TestDepedendencies } from '../../test/mocks/TestDepedendencies'

import type { RedisConfig } from '@lokalise/node-core'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { z } from 'zod'
import { RedisBasedQueueDiscoverer } from './bull-mq-metrics/queueDiscoverers'
Expand Down Expand Up @@ -42,7 +44,7 @@ async function initAppWithBullMqMetrics(
}

await app.register(bullMqMetricsPlugin, {
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClients, 'bull'),
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisConfigs, 'bull'),
collectionOptions: {
type: 'interval',
intervalInMs: 50,
Expand All @@ -69,11 +71,14 @@ describe('bullMqMetricsPlugin', () => {
let app: FastifyInstance
let dependencies: TestDepedendencies
let processor: AbstractBackgroundJobProcessor<BaseJobPayload, JobReturn>
let redisConfig: RedisConfig
let redis: Redis

beforeEach(async () => {
dependencies = new TestDepedendencies()
redis = dependencies.startRedis()
redisConfig = dependencies.getRedisConfig()

redis = createSanitizedRedisClient(redisConfig)
await redis.flushall()

processor = new TestBackgroundJobProcessor<BaseJobPayload, JobReturn>(
Expand All @@ -88,15 +93,15 @@ describe('bullMqMetricsPlugin', () => {
if (app) {
await app.close()
}
await dependencies.dispose()
await redis.quit()
await processor.dispose()
})

it('throws if fastify-metrics was not initialized', async () => {
await expect(() => {
return initAppWithBullMqMetrics(
{
redisClients: [redis],
redisConfigs: [redisConfig],
},
{
enableMetricsPlugin: false,
Expand All @@ -109,7 +114,7 @@ describe('bullMqMetricsPlugin', () => {

it('exposes metrics collect() function', async () => {
app = await initAppWithBullMqMetrics({
redisClients: [redis],
redisConfigs: [redisConfig],
collectionOptions: {
type: 'manual',
},
Expand Down Expand Up @@ -141,7 +146,7 @@ describe('bullMqMetricsPlugin', () => {

it('works with multiple redis clients', async () => {
app = await initAppWithBullMqMetrics({
redisClients: [redis, redis],
redisConfigs: [redisConfig, redisConfig],
collectionOptions: {
type: 'manual',
},
Expand Down
6 changes: 3 additions & 3 deletions lib/plugins/bullMqMetricsPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { FastifyInstance } from 'fastify'
import 'fastify-metrics'
import fp from 'fastify-plugin'
import type { Redis } from 'ioredis'

import type { RedisConfig } from '@lokalise/node-core'
import type { CollectionScheduler } from './bull-mq-metrics/CollectionScheduler'
import { PromiseBasedCollectionScheduler } from './bull-mq-metrics/CollectionScheduler'
import type { MetricCollectorOptions } from './bull-mq-metrics/MetricsCollector'
Expand All @@ -19,7 +19,7 @@ declare module 'fastify' {
}

export type BullMqMetricsPluginOptions = {
redisClients: Redis[]
redisConfigs: RedisConfig[]
collectionOptions?:
| {
type: 'interval'
Expand All @@ -46,7 +46,7 @@ function plugin(
const options = {
bullMqPrefix: 'bull',
metricsPrefix: 'bullmq',
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClients),
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisConfigs),
excludedQueues: [],
histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000],
collectionOptions: {
Expand Down
20 changes: 5 additions & 15 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,9 @@
"type": "git",
"url": "git://github.com/lokalise/fastify-extras.git"
},
"keywords": [
"fastify",
"newrelic",
"bugsnag",
"request-context",
"request-id",
"split-io"
],
"keywords": ["fastify", "newrelic", "bugsnag", "request-context", "request-id", "split-io"],
"homepage": "https://github.com/lokalise/fastify-extras",
"files": [
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
"dist/**",
"LICENSE",
"README.md"
],
"files": ["dist/**", "LICENSE", "README.md"],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"type": "commonjs",
Expand All @@ -40,13 +29,14 @@
"lint:fix": "biome check --write",
"docker:start": "docker compose -f docker-compose.yml up --build -d redis && docker compose -f docker-compose.yml up --build -d wait_for_redis",
"docker:stop": "docker compose -f docker-compose.yml down",
"version": "auto-changelog -p && git add CHANGELOG.md"
"version": "auto-changelog -p && git add CHANGELOG.md",
"postversion": "biome check --write package.json"
},
"dependencies": {
"@bugsnag/js": "^7.25.0",
"@supercharge/promise-pool": "^3.2.0",
"@lokalise/error-utils": "^2.0.0",
"@lokalise/background-jobs-common": "^7.1.0",
"@lokalise/background-jobs-common": "^7.6.0",
"@splitsoftware/splitio": "^10.27.0",
"@amplitude/analytics-node": "^1.3.6",
"fastify-metrics": "^11.0.0",
Expand Down
29 changes: 15 additions & 14 deletions test/mocks/TestDepedendencies.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { BackgroundJobProcessorDependencies } from '@lokalise/background-jobs-common'
import { CommonBullmqFactory } from '@lokalise/background-jobs-common'
import { globalLogger } from '@lokalise/node-core'
import { Redis } from 'ioredis'
import { type RedisConfig, globalLogger } from '@lokalise/node-core'
import type { MockInstance } from 'vitest'
import { vi, vitest } from 'vitest'

Expand All @@ -10,8 +9,6 @@ export let lastInfoSpy: MockInstance
export let lastErrorSpy: MockInstance

export class TestDepedendencies {
private client?: Redis

// eslint-disable-next-line @typescript-eslint/no-explicit-any
createMocksForBackgroundJobProcessor(): BackgroundJobProcessorDependencies<any, any> {
const originalChildFn = testLogger.child.bind(testLogger)
Expand All @@ -37,11 +34,7 @@ export class TestDepedendencies {
}
}

async dispose(): Promise<void> {
await this.client?.quit()
}

startRedis(): Redis {
getRedisConfig(): RedisConfig {
const db = process.env.REDIS_DB ? Number.parseInt(process.env.REDIS_DB) : undefined
const host = process.env.REDIS_HOST
const port = process.env.REDIS_PORT ? Number(process.env.REDIS_PORT) : undefined
Expand All @@ -53,18 +46,26 @@ export class TestDepedendencies {
const commandTimeout = process.env.REDIS_COMMAND_TIMEOUT
? Number.parseInt(process.env.REDIS_COMMAND_TIMEOUT, 10)
: undefined
this.client = new Redis({

if (!host) {
throw new Error('Missing REDIS_HOST env')
}

if (!port) {
throw new Error('Missing REDIS_PORT env')
}

return {
host,
db,
port,
username,
password,
connectTimeout,
commandTimeout,
maxRetriesPerRequest: null,
enableReadyCheck: false,
})

return this.client
maxRetriesPerRequest: null,
useTls: false,
}
}
}
Loading