Search
 
SCRIPT & CODE EXAMPLE
 
CODE EXAMPLE FOR TYPESCRIPT

amqplib

import 'dotenv/config'
import amqplib, { Options, Connection, Channel, ConsumeMessage } from 'amqplib'
import os from 'os'
import EventEmitter from 'events'

import { HydeLivingError } from '@helpers/helper.error'

export class Rabbitmq {
	private connectionOptions: Options.Connect
	private optionsPublish: Options.Publish
	private assertQueueOptions: Options.AssertQueue
	private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })

	private async connection(): Promise<any> {
		try {
			this.connectionOptions = {
				protocol: process.env.RABBITMQ_PROTOCOL,
				vhost: process.env.RABBITMQ_HOST,
				username: process.env.RABBITMQ_USERNAME,
				password: process.env.RABBITMQ_PASSWORD
			}

			const connect: Connection = await amqplib.connect(this.connectionOptions)
			if (connect instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			const channel: Channel = await connect.createChannel()
			channel.assertExchange('exchage', 'topic', { autoDelete: false, durable: true })

			return connect
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	async publisher(prefix: string, key: string, data: Record<string, any> | Record<string, any>[]): Promise<any> {
		try {
			const broker: Connection = await this.connection()
			if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			this.assertQueueOptions = {
				maxPriority: 10,
				autoDelete: true,
				durable: true,
				messageTtl: Number(new Date().getTime() + 3 * 60 * 1000), // 3 minutes,
				expires: Number(new Date().getTime() + 60 * 60 * 1000) // 1 hours
			}

			this.optionsPublish = {
				persistent: true,
				priority: os.cpus().length,
				timestamp: Date.now()
			}

			const channel: Channel = await broker.createChannel()
			if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			await channel.assertQueue(`${prefix}_${key}`, this.assertQueueOptions)
			const publish: boolean = channel.sendToQueue(`${prefix}_${key}`, Buffer.from(JSON.stringify(data)), this.optionsPublish)

			if (publish == false) {
				await channel.close()
				throw new HydeLivingError('Send message to queue failed')
			}
			return true
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	private async consumer(prefix: string, key: string): Promise<any> {
		try {
			const broker: Connection = await this.connection()
			if (broker instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			const channel: Channel = await broker.createChannel()
			if (channel instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			channel.consume(`${prefix}_${key}`, (msg: ConsumeMessage): void => {
				this.emitter.emit('data', msg.content.toString())
				channel.ack(msg)
			})
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}

	async subscriber(prefix: string, key: string): Promise<any> {
		try {
			const consumer: Channel = await this.consumer(prefix, key)
			if (consumer instanceof Error) throw new HydeLivingError('rabbitmq is not connected')

			return new Promise<any>((resolve, _reject) => {
				this.emitter.on('data', (data: any) => resolve(JSON.parse(JSON.stringify(data))))
			})
		} catch (e: any) {
			return new HydeLivingError(e.message)
		}
	}
}
 
PREVIOUS NEXT
Tagged: #amqplib
ADD COMMENT
Topic
Name
1+7 =