Search
 
SCRIPT & CODE EXAMPLE
 
CODE EXAMPLE FOR PYTHON

rabbitmq python

import pika
import time

class consume_engine:

    def __init__(self):
        self._messages = 10
        self._message_interval = 1
        self._queue_name = "some.queue"
        self._connection = None
        self._channel = None

    def connection(self):
        credentials = pika.PlainCredentials('rabbit', 'password')
        parameters = pika.ConnectionParameters('rabbitmq-hostname', 5672, 'vhost_1', credentials, socket_timeout=300)
        print(parameters)
        self._connection = pika.BlockingConnection(parameters)
        print("Connected Successfully !!!")
        return self._connection

    def channel(self):
        self._channel = self._connection.channel()
        print("Channel opened...")

    def declare_queue(self):
        try:
            self._channel.queue_declare(queue=self._queue_name, durable=True)
        except pika.exceptions.ChannelWrongStateError:
            self._channel = self._connection.channel()
            self._channel.queue_unbind(exchange='service.request.exchange',
                                     queue=self._queue_name,
                                     routing_key=self._queue_name)
            self._channel.queue_delete(self._queue_name)
            self._channel.queue_declare(queue=self._queue_name, durable=True, auto_delete=True)
        print("Queue declared....")
        print(' [*] Waiting for messages. To exit press CTRL+C')

    def declare_exchange(self):
        try:
            self._channel.exchange_declare(exchange='some.exchange',
                                           exchange_type='direct')
        except pika.exceptions.ChannelClosedByBroker:
            pass

    def bind_exchange_queue(self):
        self._channel.queue_bind(exchange='some.exchange',
                   queue=self._queue_name,
                   routing_key=self._queue_name)

    def on_message(self, channel, method, properties, body):
        print(" [x] working on %r" % body)
        time.sleep(3)
        print(" [x] Done")
        self._channel.basic_ack(delivery_tag = method.delivery_tag)

    def consume_messages(self):
        self._channel.basic_qos(prefetch_count=1)
        self._channel.basic_consume(self._queue_name,
                                    self.on_message)
        self._channel.start_consuming()

    def run(self):
        self.connection()
        self.channel()
        self.declare_exchange()
        self.declare_queue()
        self.bind_exchange_queue()
        self.consume_messages()

class publish_engine:

    def __init__(self):

        self._messages = 5
        self._message_interval = 1
        self._queue_name = "16999403m1.request.queue"
        self._connection = None
        self._channel = None

    def make_connection(self):
        credentials = pika.PlainCredentials('rabbitProd', 'dangerous')
        parameters = pika.ConnectionParameters('at-rabbit-hop-1.cec.lab.emc.com', 5672, '/', credentials, socket_timeout=300)
        self._connection = pika.BlockingConnection(parameters)
        print("Connected Successfully !!!")
        return self._connection

    def channel(self):
        self._channel = self._connection.channel()
        print("Channel opened...")

    def declare_queue(self):
        self._channel.queue_declare(queue=self._queue_name, durable=True)
        print("Queue declared....")

    def declare_exchange(self):
        self._channel.exchange_declare(exchange='new.exchange',
                                       exchange_type='direct')

    def bind_exchange_queue(self):
        self._channel.queue_bind(exchange='new.exchange',
                   queue=self._queue_name,
                   routing_key=self._queue_name)

    def publish_message(self):
        message_count = 0
        while message_count < self._messages:
            message_count += 1
            message_body = "task number %i" %(message_count)
            self._channel.basic_publish(exchange='new.exchange',
                                  routing_key=self._queue_name,
                                  body=message_body,
                                  properties=pika.BasicProperties(
                                      delivery_mode=2,  # make message persistent
                                  ))
            print("Published message %i" %(message_count))
            time.sleep(self._message_interval)

    def close_connection(self):
        self._connection.close()
        print("Closed connection....")

    def run(self):
        self.make_connection()
        self.channel()
        self.declare_exchange()
        self.declare_queue()
        self.bind_exchange_queue()
        self.publish_message()
        self.close_connection()


Source by www.rabbitmq.com #
 
PREVIOUS NEXT
Tagged: #rabbitmq #python
ADD COMMENT
Topic
Name
7+2 =