/**
* CUSTOM LIBS SETUP RASCAL FOR RABBITMQ BY RESTU WAHYU SAPUTRA
**/
import 'dotenv/config'
import { CustomError } from '@helpers/helper.error'
import Broker, { PublicationSession, SubscriberSessionAsPromised } from 'rascal'
import os from 'os'
interface SetterConfig {
key: string
prefix: string
}
export class RabbitMQ {
private brokerConfig: Broker.BrokerConfig
private key: string
private prefix: string
constructor(key: string, prefix: string) {
this.key = key
this.prefix = prefix
this.setConfig({ key, prefix })
}
private setConfig(config: SetterConfig) {
const pubConfig: Broker.PublicationConfig = {
vhost: process.env.RABBITMQ_VHOST,
exchange: `rabbitmq_ex:${config.prefix}`,
routingKey: 'a.b.c',
autoCreated: true,
options: {
persistent: true,
priority: os.cpus().length
}
}
const subConfig: Broker.PublicationConfig = {
vhost: process.env.RABBITMQ_VHOST,
queue: `rabbitmq_eq:${config.prefix}`,
autoCreated: true
}
const newPubConfig: Record<string, any> = {}
Object.assign(newPubConfig, { [`rabbitmq_pub:${config.key}:${config.prefix}`]: pubConfig })
const newSubConfig: Record<string, any> = {}
Object.assign(newSubConfig, { [`rabbitmq_sub:${config.key}:${config.prefix}`]: subConfig })
this.brokerConfig = {
vhosts: {
'/': {
connectionStrategy: 'random',
connection: {
hostname: process.env.RABBITMQ_HOST,
user: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
port: process.env.RABBITMQ_PORT,
protocol: process.env.RABBITMQ_PROTOCOL
},
exchanges: [`rabbitmq_ex:${config.prefix}`],
queues: [`rabbitmq_eq:${config.prefix}`],
bindings: [`rabbitmq_ex:${config.prefix}[a.b.c] -> rabbitmq_eq:${config.prefix}`],
publications: newPubConfig,
subscriptions: newSubConfig
}
},
recovery: {
[`rabbitmq_pub:${config.key}:${config.prefix}`]: { requeue: true, strategy: 'ack', xDeathFix: true },
[`rabbitmq_sub:${config.key}:${config.prefix}`]: { strategy: 'nack' }
}
}
}
private getConfig(): Broker.BrokerConfig {
return this.brokerConfig
}
private async connection(): Promise<any> {
try {
const broker: Broker.BrokerAsPromised = await Broker.BrokerAsPromised.create(this.getConfig())
broker.on('error', console.error)
return broker
} catch (err: any) {
return new CustomError(err.message)
}
}
async publisher(data: Record<string, any> | Record<string, any>[]): Promise<any> {
try {
const connection: Broker.BrokerAsPromised = await this.connection()
const publisher: PublicationSession = await connection.publish(`rabbitmq_pub:${this.key}:${this.prefix}`, data)
console.info('RabbitMQ publisher is called')
publisher.on('success', (jobId: string) => console.log(`job ${jobId} is success`))
publisher.on('error', (_err: Error, jobId: string) => {
console.log(`job ${jobId} is error`)
connection.purge()
})
return true
} catch (err: any) {
return new CustomError(err.message)
}
}
async subscriber(cb: (content: any, error?: Error) => any): Promise<void> {
try {
const connection: Broker.BrokerAsPromised = await this.connection()
const subscriber: SubscriberSessionAsPromised = await connection.subscribe(`rabbitmq_sub:${this.key}:${this.prefix}`)
console.info('RabbitMQ subscriber is called')
subscriber
.on('message', (_message: any, content: any, ackOrNack: Broker.AckOrNack): void => {
cb(content)
ackOrNack()
})
.on('error', console.error)
} catch (err: any) {
cb(null, err)
}
}
}
// publisher demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
try {
const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
setInterval(async () => {
const res = await broker.publisher({ message: `hello wordl from publisher:${new Date().getTime()}` })
console.log(res)
}, 5000)
} catch (err) {
console.error(err)
}
})()
// subscriber demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
try {
const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
broker.subscriber((content: string, error: Error) => {
if (!error) console.log(content)
})
} catch (err) {
console.error(err)
}
})()