Search
 
SCRIPT & CODE EXAMPLE
 

PYTHON

RabbitMQ python

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Comment

rabbitmq python

import pika
import time

class consume_engine:

    def __init__(self):
        self._messages = 10
        self._message_interval = 1
        self._queue_name = "some.queue"
        self._connection = None
        self._channel = None

    def connection(self):
        credentials = pika.PlainCredentials('rabbit', 'password')
        parameters = pika.ConnectionParameters('rabbitmq-hostname', 5672, 'vhost_1', credentials, socket_timeout=300)
        print(parameters)
        self._connection = pika.BlockingConnection(parameters)
        print("Connected Successfully !!!")
        return self._connection

    def channel(self):
        self._channel = self._connection.channel()
        print("Channel opened...")

    def declare_queue(self):
        try:
            self._channel.queue_declare(queue=self._queue_name, durable=True)
        except pika.exceptions.ChannelWrongStateError:
            self._channel = self._connection.channel()
            self._channel.queue_unbind(exchange='service.request.exchange',
                                     queue=self._queue_name,
                                     routing_key=self._queue_name)
            self._channel.queue_delete(self._queue_name)
            self._channel.queue_declare(queue=self._queue_name, durable=True, auto_delete=True)
        print("Queue declared....")
        print(' [*] Waiting for messages. To exit press CTRL+C')

    def declare_exchange(self):
        try:
            self._channel.exchange_declare(exchange='some.exchange',
                                           exchange_type='direct')
        except pika.exceptions.ChannelClosedByBroker:
            pass

    def bind_exchange_queue(self):
        self._channel.queue_bind(exchange='some.exchange',
                   queue=self._queue_name,
                   routing_key=self._queue_name)

    def on_message(self, channel, method, properties, body):
        print(" [x] working on %r" % body)
        time.sleep(3)
        print(" [x] Done")
        self._channel.basic_ack(delivery_tag = method.delivery_tag)

    def consume_messages(self):
        self._channel.basic_qos(prefetch_count=1)
        self._channel.basic_consume(self._queue_name,
                                    self.on_message)
        self._channel.start_consuming()

    def run(self):
        self.connection()
        self.channel()
        self.declare_exchange()
        self.declare_queue()
        self.bind_exchange_queue()
        self.consume_messages()

class publish_engine:

    def __init__(self):

        self._messages = 5
        self._message_interval = 1
        self._queue_name = "16999403m1.request.queue"
        self._connection = None
        self._channel = None

    def make_connection(self):
        credentials = pika.PlainCredentials('rabbitProd', 'dangerous')
        parameters = pika.ConnectionParameters('at-rabbit-hop-1.cec.lab.emc.com', 5672, '/', credentials, socket_timeout=300)
        self._connection = pika.BlockingConnection(parameters)
        print("Connected Successfully !!!")
        return self._connection

    def channel(self):
        self._channel = self._connection.channel()
        print("Channel opened...")

    def declare_queue(self):
        self._channel.queue_declare(queue=self._queue_name, durable=True)
        print("Queue declared....")

    def declare_exchange(self):
        self._channel.exchange_declare(exchange='new.exchange',
                                       exchange_type='direct')

    def bind_exchange_queue(self):
        self._channel.queue_bind(exchange='new.exchange',
                   queue=self._queue_name,
                   routing_key=self._queue_name)

    def publish_message(self):
        message_count = 0
        while message_count < self._messages:
            message_count += 1
            message_body = "task number %i" %(message_count)
            self._channel.basic_publish(exchange='new.exchange',
                                  routing_key=self._queue_name,
                                  body=message_body,
                                  properties=pika.BasicProperties(
                                      delivery_mode=2,  # make message persistent
                                  ))
            print("Published message %i" %(message_count))
            time.sleep(self._message_interval)

    def close_connection(self):
        self._connection.close()
        print("Closed connection....")

    def run(self):
        self.make_connection()
        self.channel()
        self.declare_exchange()
        self.declare_queue()
        self.bind_exchange_queue()
        self.publish_message()
        self.close_connection()


Comment

python rabbitmq

# Check out https://www.rabbitmq.com/tutorials/tutorial-one-python.html 
Comment

PREVIOUS NEXT
Code Example
Python :: how to transcode a video in python using ffmpeg 
Python :: matplotlib cheat sheet 
Python :: how to convert uppercase to lowercase and vice versa in python 
Python :: python how to get rid of spaces in print 
Python :: part of a flower 
Python :: diccionario python 
Python :: printing hello world in python 
Python :: turtle write function in turtle package python 
Python :: python 3.4 release date 
Python :: 1024x768 
Python :: adding in python 
Python :: remove last element in list python 
Python :: error aesthetics must be either length 1 or the same as the data (3) fill 
Python :: how to block a ip adress 
Python :: python convert integer to signed base 2 complement 
Python :: ValueError: tuple.index(x): x not in tuple 
Python :: inverse matrix gauss python 
Python :: unocode error pytonn 
Python :: gnuplot sum over a column 
Python :: python togli spazio 
Python :: how to add twoo segmen time series in a single plot 
Python :: Kernel Ridge et Hyperparameter cross validation sklearn 
Shell :: copy ssh key mac 
Shell :: what is --use-feature=2020-resolver 
Shell :: debian disable ipv6 
Shell :: react router v5 install 
Shell :: remove all the containers docker 
Shell :: ssh restart ubuntu 
Shell :: ubuntu play on linux install 
Shell :: ubuntu 20.04 install skype 
ADD CONTENT
Topic
Content
Source link
Name
8+6 =