rabbitMQ入门笔记

By AverageJoeWang
 标签:

本文参考文章1

1.安装python等依赖项

sudo apt-get install python-pip git-core
sudo pip install pika==0.10.0

2.必要名词解释

一般提到RabbitMQ和消息,都会用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用P来表示:


  • 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列queue中。 队列queue没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个生产者producers能够把消息发送给同一个队列,同样,多个消费者consumers也能够从同一个队列queue中获取数据。队列可以绘制成这样(图上是队列的名称):


  • 消费Consuming和获取消息是一样的意思。一个消费者consumer就是一个等待获取消息的程序。我们把它绘制为C:


3.HelloWorld例子

RabbitMQ实现了AMQP定义的消息队列。它实现的功能”非常简单“:从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。
和任何的Hello world一样,它们都不复杂。我们将会设计两个程序,一个发送Hello world,另一个接收这个数据并且打印到屏幕。
整体的设计如下图:



3.1.send.py发送数据



我们第一个程序send.py会发送一个消息到队列中。首先要做的事情就是建立一个到RabbitMQ服务器的连接。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               host = 'localhost'))
channel = connection.channel()

现在我们已经连接上服务器了,那么,在发送消息之前我们需要确认队列是存在的。如果我们把消息发送到一个不存在的队列,RabbitMQ会丢弃这条消息。我门先创建一个名为hello的队列,然后把消息发送到这个队列中。

channel.queue_declare(queue='hello')

这时候我们就可以发送消息了,我们第一条消息只包含了 Hello World!字符串,我们打算把它发送到我们的hello队列

在RabbitMQ中,消息是不能直接发送到队列,它需要发送到交换机exchange中。我们不打算在这里深入讨论它——你可以通过教程的第三部分了解更多。现在我们所需要了解的是如何使用默认的交换机exchange,它使用一个空字符串来标识。交换机允许我们指定某条消息需要投递到哪个队列,routing_key参数必须指定为队列的名称:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"

在退出程序之前,我们需要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。完成这些事情(正确的关闭连接)是很简单的。

connection.close()

3.2.receive.py接收数据



我们的第二个程序receive.py,将会从队列中获取消息并打印消息。

这次我们还是先要连接到RabbitMQ服务器。连接服务器的代码和之前是一样的。

下一步也和之前一样,我们需要确认队列是存在的。使用queue_declare创建一个队列——我们可以运行这个命令很多次,但是只有一个队列会被创建。

channel.queue_declare(queue='hello')

你也许要问: 为什么要重复声明队列呢 —— 我们已经在前面的代码中声明过它了。如果我们确定了队列是已经存在的,那么我们可以不这么做,比如此前预先运行了send.py程序。可是我们并不确定哪个程序会首先运行。这种情况下,在程序中重复将队列重复声明一下是种值得推荐的做法。

列出所有队列
你也许希望查看RabbitMQ中有哪些队列、有多少消息在队列中。此时你可以使用rabbitmqctl工具(使用有权限的用户):

   $ sudo rabbitmqctl list_queues
   Listing queues ...
   hello    0

从队列中获取消息相对来说稍显复杂。需要为队列定义一个回调callback函数。当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

下一步,我们需要告诉RabbitMQ这个回调函数将会从名为hello的队列中接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

要成功运行这些命令,我们必须保证队列是存在的,我们的确可以确保它的存在——因为我们之前已经使用queue_declare将其声明过了。

no_ack参数稍后会进行介绍。

最后,我们输入一个用来等待消息数据并且在需要的时候运行回调函数的无限循环。

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

4.完整实例

4.1.send.py

#!/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))


#create a channel
channel = connection.channel()

#create a queue named 'hello'
channel.queue_declare(queue = 'hello')

#send message to exchange named ''
channel.basic_publish(exchange = '',
              routing_key = 'hello',
              body = 'Hello World!')

#print sth in order to prove the progarm is correct
print " [x] Sent 'Hello World!' "

#we should close the connection
connection.close()

4.2.receive.py

#!/usr/bin/env python

import pika

#create a connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))

# create a channel
channel = connection.channel()

# create a queue,name = hello
channel.queue_declare(queue = 'hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

# this function operates the data received
def callback(ch,method,properties,body):
    print " [x] Receive %r" % (body,)


channel.basic_consume(callback,
              queue = 'hello',
              no_ack = True)


channel.start_consuming()

5.运行例子

#一个终端
python send.py
#此时可以在浏览器使用rabbitmq插件查看queue

#接收信息
python receive.py

发送不成功!

如果这是你第一次使用RabbitMQ,并且没有看到“Sent”消息出现在屏幕上,你可能会抓耳挠腮不知所以。这也许是因为没有足够的磁盘空间给代理使用所造成的(代理默认需要1Gb的空闲空间),所以它才会拒绝接收消息。查看一下代理的日志确定并且减少必要的限制。配置文件文档会告诉你如何更改磁盘空间限制disk_free_limit

成功了!我们已经通过RabbitMQ发送第一条消息。你也许已经注意到了,receive.py程序并没有退出。它一直在准备获取消息,你可以通过Ctrl-C来中止它。

试下在新的终端中再次运行send.py

我们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,我们将会建立一个简单的工作队列work queue