import 'dotenv/config'
import { Worker, Queue, WorkerOptions, QueueOptions, Job, QueueEvents, QueueEventsOptions } from 'bullmq'
import { EventEmitter } from 'events'
import IORedis from 'ioredis'
import os from 'os'
import { assert } from 'is-any-type'
import { HydeLivingError } from '@helpers/helper.error'
export type BullJob = Job
export class BullQueue {
private workerOptions: WorkerOptions
private queueOptions: QueueOptions
private eventOptions: QueueEventsOptions
private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })
private redisConnection: InstanceType<typeof IORedis>
constructor(db: number) {
this.redisConnection = new IORedis({
host: process.env.REDIS_HOST as string,
port: parseInt(process.env.REDIS_PORT as any),
password: (process.env.REDIS_PASSWORD as string) || '',
maxRetriesPerRequest: null,
db: db
})
}
async publisher(key: string, value: Record<string, any> | Record<string, any>[]): Promise<Queue | Error> {
try {
if ((assert.isObject(value as any) || assert.isArray(value as any)) != true)
throw new HydeLivingError('value must be an array or object')
this.queueOptions = {
connection: this.redisConnection,
defaultJobOptions: {
removeOnComplete: true,
sizeLimit: 5242880,
timeout: 1000 * 60,
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000 * 3
},
priority: os.cpus().length
}
}
const queue: InstanceType<typeof Queue> = new Queue(key, this.queueOptions)
await queue.add(`hydeliving:${key}:${Date.now()}`, value)
return queue
} catch (e: any) {
return Promise.reject(new HydeLivingError(e.message || 'Publisher crash add value to queue failed'))
}
}
private notification(key: string): InstanceType<typeof QueueEvents> {
this.eventOptions = {
connection: this.redisConnection
}
const events: InstanceType<typeof QueueEvents> = new QueueEvents(key, this.eventOptions)
events.on('waiting', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is waiting`))
events.on('progress', (args: { jobId: string }) => console.info(`jobs ${args.jobId} in progress`))
events.on('completed', (args: { jobId: string }) => console.log(`jobs ${args.jobId}} is completed`))
events.on('removed', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is removed`))
events.on('failed', (args: { jobId: string }) => console.error(`jobs ${args.jobId} is failed`))
events.on('error', (_err: globalThis.Error) => console.error(`jobs ${events.name} is error`))
return events
}
private worker(key: string): InstanceType<typeof Worker> {
this.workerOptions = {
connection: this.redisConnection,
concurrency: os.cpus().length,
skipDelayCheck: true,
runRetryDelay: 1000 * 3,
settings: {
backoffStrategies: {
custom(attemptsMade: number) {
return Math.abs(attemptsMade * 1000)
}
}
}
}
const worker: InstanceType<typeof Worker> = new Worker(
key,
async (job: Job): Promise<any> => {
if (await job.isCompleted()) await job.remove()
if (await job.isFailed()) await job.retry('failed')
if (job.data) this.emitter.emit('data', JSON.stringify(job.data))
return job
},
this.workerOptions
)
worker.on('active', async () => await worker.resume())
worker.on('paused', async () => await worker.resume())
worker.on('progress', async () => await worker.resume())
return worker
}
async subscriber(key: string): Promise<any> {
/**
* @description initalize worker and worker notification
*/
await this.worker(key)
await this.notification(key)
/**
* @description listening data from worker
*/
return new Promise<any>((resolve, _reject) => {
this.emitter.on('data', (data: any) => resolve(JSON.parse(data)))
})
}
}