Search
 
SCRIPT & CODE EXAMPLE
 

TYPESCRIPT

rascal npm

/**
* CUSTOM LIBS SETUP RASCAL FOR RABBITMQ BY RESTU WAHYU SAPUTRA
**/

import 'dotenv/config'
import { CustomError } from '@helpers/helper.error'
import Broker, { PublicationSession, SubscriberSessionAsPromised } from 'rascal'
import os from 'os'

interface SetterConfig {
	key: string
	prefix: string
}

export class RabbitMQ {
	private brokerConfig: Broker.BrokerConfig
	private key: string
	private prefix: string

	constructor(key: string, prefix: string) {
		this.key = key
		this.prefix = prefix
		this.setConfig({ key, prefix })
	}

	private setConfig(config: SetterConfig) {
		const pubConfig: Broker.PublicationConfig = {
			vhost: process.env.RABBITMQ_VHOST,
			exchange: `rabbitmq_ex:${config.prefix}`,
			routingKey: 'a.b.c',
			autoCreated: true,
			options: {
				persistent: true,
				priority: os.cpus().length
			}
		}

		const subConfig: Broker.PublicationConfig = {
			vhost: process.env.RABBITMQ_VHOST,
			queue: `rabbitmq_eq:${config.prefix}`,
			autoCreated: true
		}

		const newPubConfig: Record<string, any> = {}
		Object.assign(newPubConfig, { [`rabbitmq_pub:${config.key}:${config.prefix}`]: pubConfig })

		const newSubConfig: Record<string, any> = {}
		Object.assign(newSubConfig, { [`rabbitmq_sub:${config.key}:${config.prefix}`]: subConfig })

		this.brokerConfig = {
			vhosts: {
				'/': {
					connectionStrategy: 'random',
					connection: {
						hostname: process.env.RABBITMQ_HOST,
						user: process.env.RABBITMQ_USERNAME,
						password: process.env.RABBITMQ_PASSWORD,
						port: process.env.RABBITMQ_PORT,
						protocol: process.env.RABBITMQ_PROTOCOL
					},
					exchanges: [`rabbitmq_ex:${config.prefix}`],
					queues: [`rabbitmq_eq:${config.prefix}`],
					bindings: [`rabbitmq_ex:${config.prefix}[a.b.c] -> rabbitmq_eq:${config.prefix}`],
					publications: newPubConfig,
					subscriptions: newSubConfig
				}
			},
			recovery: {
				[`rabbitmq_pub:${config.key}:${config.prefix}`]: { requeue: true, strategy: 'ack', xDeathFix: true },
				[`rabbitmq_sub:${config.key}:${config.prefix}`]: { strategy: 'nack' }
			}
		}
	}

	private getConfig(): Broker.BrokerConfig {
		return this.brokerConfig
	}

	private async connection(): Promise<any> {
		try {
			const broker: Broker.BrokerAsPromised = await Broker.BrokerAsPromised.create(this.getConfig())
			broker.on('error', console.error)
			return broker
		} catch (err: any) {
			return new CustomError(err.message)
		}
	}

	async publisher(data: Record<string, any> | Record<string, any>[]): Promise<any> {
		try {
			const connection: Broker.BrokerAsPromised = await this.connection()
			const publisher: PublicationSession = await connection.publish(`rabbitmq_pub:${this.key}:${this.prefix}`, data)

			console.info('RabbitMQ publisher is called')

			publisher.on('success', (jobId: string) => console.log(`job ${jobId} is success`))
			publisher.on('error', (_err: Error, jobId: string) => {
				console.log(`job ${jobId} is error`)
				connection.purge()
			})

			return true
		} catch (err: any) {
			return new CustomError(err.message)
		}
	}

	async subscriber(cb: (content: any, error?: Error) => any): Promise<void> {
		try {
			const connection: Broker.BrokerAsPromised = await this.connection()
			const subscriber: SubscriberSessionAsPromised = await connection.subscribe(`rabbitmq_sub:${this.key}:${this.prefix}`)

			console.info('RabbitMQ subscriber is called')

			subscriber
				.on('message', (_message: any, content: any, ackOrNack: Broker.AckOrNack): void => {
					cb(content)
					ackOrNack()
				})
				.on('error', console.error)
		} catch (err: any) {
			cb(null, err)
		}
	}
}


// publisher demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
	try {
		const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
		setInterval(async () => {
			const res = await broker.publisher({ message: `hello wordl from publisher:${new Date().getTime()}` })
			console.log(res)
		}, 5000)
	} catch (err) {
		console.error(err)
	}
})()

// subscriber demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
	try {
		const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
		broker.subscriber((content: string, error: Error) => {
			if (!error) console.log(content)
		})
	} catch (err) {
		console.error(err)
	}
})()
Comment

PREVIOUS NEXT
Code Example
Typescript :: Cave Generator 
Typescript :: how to set value to readonly property in typescript 
Typescript :: addObjects giving a fatal error when pushing data to algolia 
Typescript :: algorithm that prints if one of the numbers is multiple of the other 
Typescript :: wergensherts meaning 
Typescript :: The velocity of light in vacuum is 
Typescript :: three requirements for laser action 
Typescript :: welsh cup electrodes have 
Typescript :: c# ienumerable wrap to know when its compltee 
Typescript :: Route.component does not have any construct or call signatures - React Router with TypeScript 
Typescript :: Will Tenet come in plate format 
Typescript :: get database num hits django 
Typescript :: What do HTTP requests and responses look like? 
Typescript :: c++ program to separate unique elements of array 
Typescript :: how-to-pass-data-between-middleware 
Typescript :: missing return type on function @typescript-eslint/explicit-function-return-type 
Typescript :: find unique elements in pandas and their connection with other column 
Typescript :: hardness of water is due to the presence of salts of 
Typescript :: No query results for model 
Typescript :: delete the last string from file in typescript 
Typescript :: typescript timeout browser 
Typescript :: how many bits are there in a hexadecimal digit 
Typescript :: Named types just give a name to a type 
Typescript :: typescript maybe type 
Typescript :: how to set up vuex with typescript 
Typescript :: why my res.data returns array of objects ? (AngularJs) 
Typescript :: how to delete a struct in a arra of strcts c 
Typescript :: engineering adding requirements to password 
Typescript :: code to check if a triangle is valid or not 
Typescript :: typescript declare "userLanguage" 
ADD CONTENT
Topic
Content
Source link
Name
8+2 =