Search
 
SCRIPT & CODE EXAMPLE
 

TYPESCRIPT

bullmq

example bullmq todoapp + custom bullmq pub/sub

https://github.com/restuwahyu13/express-todo-bullmq
Comment

bullmq

import 'dotenv/config'
import { Worker, Queue, WorkerOptions, QueueOptions, Job, QueueEvents, QueueEventsOptions } from 'bullmq'
import { EventEmitter } from 'events'
import IORedis from 'ioredis'
import os from 'os'
import { assert } from 'is-any-type'

import { HydeLivingError } from '@helpers/helper.error'

export type BullJob = Job
export class BullQueue {
	private workerOptions: WorkerOptions
	private queueOptions: QueueOptions
	private eventOptions: QueueEventsOptions
	private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })
	private redisConnection: InstanceType<typeof IORedis>

	constructor(db: number) {
		this.redisConnection = new IORedis({
			host: process.env.REDIS_HOST as string,
			port: parseInt(process.env.REDIS_PORT as any),
			password: (process.env.REDIS_PASSWORD as string) || '',
			maxRetriesPerRequest: null,
			db: db
		})
	}

	async publisher(key: string, value: Record<string, any> | Record<string, any>[]): Promise<Queue | Error> {
		try {
			if ((assert.isObject(value as any) || assert.isArray(value as any)) != true)
				throw new HydeLivingError('value must be an array or object')

			this.queueOptions = {
				connection: this.redisConnection,
				defaultJobOptions: {
					removeOnComplete: true,
					sizeLimit: 5242880,
					timeout: 1000 * 60,
					attempts: 5,
					backoff: {
						type: 'exponential',
						delay: 1000 * 3
					},
					priority: os.cpus().length
				}
			}

			const queue: InstanceType<typeof Queue> = new Queue(key, this.queueOptions)
			await queue.add(`hydeliving:${key}:${Date.now()}`, value)

			return queue
		} catch (e: any) {
			return Promise.reject(new HydeLivingError(e.message || 'Publisher crash add value to queue failed'))
		}
	}

	private notification(key: string): InstanceType<typeof QueueEvents> {
		this.eventOptions = {
			connection: this.redisConnection
		}
		const events: InstanceType<typeof QueueEvents> = new QueueEvents(key, this.eventOptions)

		events.on('waiting', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is waiting`))
		events.on('progress', (args: { jobId: string }) => console.info(`jobs ${args.jobId} in progress`))
		events.on('completed', (args: { jobId: string }) => console.log(`jobs ${args.jobId}} is completed`))
		events.on('removed', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is removed`))
		events.on('failed', (args: { jobId: string }) => console.error(`jobs ${args.jobId} is failed`))
		events.on('error', (_err: globalThis.Error) => console.error(`jobs ${events.name} is error`))

		return events
	}

	private worker(key: string): InstanceType<typeof Worker> {
		this.workerOptions = {
			connection: this.redisConnection,
			concurrency: os.cpus().length,
			skipDelayCheck: true,
			runRetryDelay: 1000 * 3,
			settings: {
				backoffStrategies: {
					custom(attemptsMade: number) {
						return Math.abs(attemptsMade * 1000)
					}
				}
			}
		}

		const worker: InstanceType<typeof Worker> = new Worker(
			key,
			async (job: Job): Promise<any> => {
				if (await job.isCompleted()) await job.remove()
				if (await job.isFailed()) await job.retry('failed')
				if (job.data) this.emitter.emit('data', JSON.stringify(job.data))
				return job
			},
			this.workerOptions
		)

		worker.on('active', async () => await worker.resume())
		worker.on('paused', async () => await worker.resume())
		worker.on('progress', async () => await worker.resume())

		return worker
	}

	async subscriber(key: string): Promise<any> {
		/**
		 * @description initalize worker and worker notification
		 */
		await this.worker(key)
		await this.notification(key)

		/**
		 * @description listening data from worker
		 */
		return new Promise<any>((resolve, _reject) => {
			this.emitter.on('data', (data: any) => resolve(JSON.parse(data)))
		})
	}
}
Comment

PREVIOUS NEXT
Code Example
Typescript :: How to reuse parts of Eloquent builder in Laravel 
Typescript :: typescript get a number param 
Typescript :: dots are displaying only when trying to fetch records html template 
Typescript :: searching filtering ibraries in angular 
Typescript :: keep footer after all elements react 
Typescript :: Exclude value from array typescript type 
Typescript :: how to execute the same test case for multiple time using testng? 
Typescript :: check if all elements in array can be divided by python 
Typescript :: Use AuthGuard with gRPC Metadata 
Typescript :: array of objects create common key as a property and create array of objects 
Typescript :: accout 
Typescript :: struts 2 rest api example 
Typescript :: why touchable opacity to take width of its child 
Typescript :: that asks for a two digit number and then prints the english word for the number 
Typescript :: CUSTOM_ELEMENTS_SCHEMA error occur while unit testing with jasmine and karma 
Typescript :: java objects cannot change? 
Typescript :: js 
Typescript :: Get the Post Categories From Outside the Loop 
Typescript :: typescript enum includes value 
Typescript :: ioredis 
Typescript :: 10 elements of gothic literature 
Typescript :: error: postfix operator toArray needs to be enabled 
Cpp :: how to hide the console c++ 
Cpp :: c++ get filename from path 
Cpp :: c++ typedef array 
Cpp :: std::string to qstring 
Cpp :: sfml delta time 
Cpp :: hello world c++ visual studio 
Cpp :: find max value in image c++ 
Cpp :: every number is coming thrice except one 
ADD CONTENT
Topic
Content
Source link
Name
8+8 =