import pika
import time
class consume_engine:
def __init__(self):
self._messages = 10
self._message_interval = 1
self._queue_name = "some.queue"
self._connection = None
self._channel = None
def connection(self):
credentials = pika.PlainCredentials('rabbit', 'password')
parameters = pika.ConnectionParameters('rabbitmq-hostname', 5672, 'vhost_1', credentials, socket_timeout=300)
print(parameters)
self._connection = pika.BlockingConnection(parameters)
print("Connected Successfully !!!")
return self._connection
def channel(self):
self._channel = self._connection.channel()
print("Channel opened...")
def declare_queue(self):
try:
self._channel.queue_declare(queue=self._queue_name, durable=True)
except pika.exceptions.ChannelWrongStateError:
self._channel = self._connection.channel()
self._channel.queue_unbind(exchange='service.request.exchange',
queue=self._queue_name,
routing_key=self._queue_name)
self._channel.queue_delete(self._queue_name)
self._channel.queue_declare(queue=self._queue_name, durable=True, auto_delete=True)
print("Queue declared....")
print(' [*] Waiting for messages. To exit press CTRL+C')
def declare_exchange(self):
try:
self._channel.exchange_declare(exchange='some.exchange',
exchange_type='direct')
except pika.exceptions.ChannelClosedByBroker:
pass
def bind_exchange_queue(self):
self._channel.queue_bind(exchange='some.exchange',
queue=self._queue_name,
routing_key=self._queue_name)
def on_message(self, channel, method, properties, body):
print(" [x] working on %r" % body)
time.sleep(3)
print(" [x] Done")
self._channel.basic_ack(delivery_tag = method.delivery_tag)
def consume_messages(self):
self._channel.basic_qos(prefetch_count=1)
self._channel.basic_consume(self._queue_name,
self.on_message)
self._channel.start_consuming()
def run(self):
self.connection()
self.channel()
self.declare_exchange()
self.declare_queue()
self.bind_exchange_queue()
self.consume_messages()
class publish_engine:
def __init__(self):
self._messages = 5
self._message_interval = 1
self._queue_name = "16999403m1.request.queue"
self._connection = None
self._channel = None
def make_connection(self):
credentials = pika.PlainCredentials('rabbitProd', 'dangerous')
parameters = pika.ConnectionParameters('at-rabbit-hop-1.cec.lab.emc.com', 5672, '/', credentials, socket_timeout=300)
self._connection = pika.BlockingConnection(parameters)
print("Connected Successfully !!!")
return self._connection
def channel(self):
self._channel = self._connection.channel()
print("Channel opened...")
def declare_queue(self):
self._channel.queue_declare(queue=self._queue_name, durable=True)
print("Queue declared....")
def declare_exchange(self):
self._channel.exchange_declare(exchange='new.exchange',
exchange_type='direct')
def bind_exchange_queue(self):
self._channel.queue_bind(exchange='new.exchange',
queue=self._queue_name,
routing_key=self._queue_name)
def publish_message(self):
message_count = 0
while message_count < self._messages:
message_count += 1
message_body = "task number %i" %(message_count)
self._channel.basic_publish(exchange='new.exchange',
routing_key=self._queue_name,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print("Published message %i" %(message_count))
time.sleep(self._message_interval)
def close_connection(self):
self._connection.close()
print("Closed connection....")
def run(self):
self.make_connection()
self.channel()
self.declare_exchange()
self.declare_queue()
self.bind_exchange_queue()
self.publish_message()
self.close_connection()