2013年6月14日星期五

RabbitMQ in Action: 测试RabbitMQ的高可用性

我在之前的blog里已描述如何搭建RabbitMQ的Cluster(集群),集群里的每个节点都能访问到公共队列的消息。然而,这个集群只完成了高可用的1/3配置,本文剩下的内容讲述如何配置真正的RabbitMQ高可用性。
我们假设有如下环境:
机器A:IP地址为192.168.1.1,运行RabbitMQ的A实例(broker)
机器B:IP地址为192.186.1.2,运行RabbitMQ的B实例(broker)
机器C:IP地址为192.168.1.3,运行RabbitMQ客户端的producer和consumer程序
客户端在程序里指定,连接到机器A或者机器B进行生产和消费。假如客户端的订阅者连接到A,但是A机器突然挂了,那客户端怎么办呢?只能是手工更改程序配置,去重新连接到B,这违背了高可用的透明原则。所以,在A与B的前端,要有一个负载均衡器(LB – Load Balancer)。客户端与LB进行通信,LB将客户端的请求转发到后端RabbitMQ上。LB会对后端Server进行健康检查,如果有一个服务器挂了,它会在选择算法里予以屏蔽。基于LB的架构图如下:
上述图里是3个RabbitMQ运行在同一主机上,分别用不同的服务端口。当然我们的生产实际里,多个RabbitMQ肯定是运行在不同的物理服务器上,否则就失去了高可用的意义。
关于负载均衡器,商业的比如F5的BIG-IP,Radware的AppDirector,是硬件架构的产品,可以实现很高的处理能力。但这些产品昂贵的价格会让人止步,所以我们还有软件负载均衡方案。互联网公司常用的软件LB一般有LVS、HAProxy、Nginx等。LVS是一个内核层的产品,主要在第四层负责数据包转发,使用较复杂。HAProxy和Nginx是应用层的产品,但Nginx主要用于处理HTTP,所以这里选择HAProxy作为RabbitMQ前端的LB。
HAProxy的安装使用非常简单,在Ubuntu下直接apt-get install haproxy,然后更改/etc/haproxy/haproxy.cfg 文件即可,文件内容大概如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
global
       log 127.0.0.1      local0 info
       maxconn 1024
       user haproxy
       group haproxy
       daemon
defaults
       log  global
       mode    tcp
       option   tcplog
       option   dontlognull
       retries   3
       option redispatch
       maxconn     1024
       contimeout  5000
       clitimeout    50000
       srvtimeout  50000
listen
       rabbitmq_cluster 0.0.0.0:5672
       mode tcp
       balance roundrobin
       server   rabbit65 192.168.1.1:5672 check inter 2000 rise 2 fall 3
       server   rabbit66 192.168.1.2:5672 check inter 2000 rise 2 fall 3
上述我们假设HAProxy也运行在客户端本机上,侦听的端口是5672,它的2个后端RabbitMQ服务器是192.168.1.1和192.168.1.2,端口也是5672。
到这一步,完成了高可用性的2/3配置。现在客户端和HAProxy建立连接,HAProxy就和后台RabbitMQ建立连接,并负责两者间数据转发。如果后台正连接的RabbitMQ服务器宕机了,那么已经建立好的通道(AMQP协议里的channel)就断了,客户端会抛出异常而终止。这个时候需要客户端重新连一次。这里就跟浏览器一样,如果一个后台服务器挂了,浏览器刷新一下,负载均衡器就会选择另一个web服务器去获取请求。所以,还剩下1/3的工作,要在客户端程序里完成,客户端程序负责捕获任何channel异常,并重新建立连接。
我们看如下consumer程序(如下所有演示程序都用ruby编写,使用了bunny这个AMQP协议库):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env ruby
# encoding: utf-8
require “rubygems”
require “bunny”
while true
  begin
    conn = Bunny.new
    conn.start
    ch = conn.create_channel
    x  = ch.direct(“buuny.exchange”, :durable => true)
    q  = ch.queue(“bunny.queue”, :durable => true).bind(x, :routing_key => “test”)
    q.subscribe(:block => true) do |delivery_info, metadata, payload|
        puts “Received #{payload}”
    end
  rescue Exception => e
    puts e.message
  end
end
上述begin块里,包裹了整个消费过程。它订阅到本机的RabbitMQ服务(实际是HAProxy),声明了一个持久化的direct交换机,和一个持久化的队列,并且采用阻塞方式订阅到这个队列,一旦broker上有消息推送过来,就简单的打印出消息主体。
在消费过程中有任何异常,比如RabbitMQ服务器挂掉,程序会捕获到,打印出异常消息,并重新创立连接。HAProxy在接受到重连请求后,会试图与其他正常的RabbitMQ服务器重新建立连接,并重新开始消费业务。producer脚本相对简单,包含如下:
1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env ruby
# encoding: utf-8
require “rubygems”
require “bunny”
conn = Bunny.new
conn.start
ch = conn.create_channel
x  = ch.direct(“buuny.exchange”, :durable => true)
data = rand.to_s
x.publish(data, :routing_key => ‘test’, :persistent => true)
分别开2个终端,先运行consumer,再使用shell的for语句循环运行producer,可以看到consumer在屏幕上正常的打印RabbitMQ推送过来的消息。通过netstat命令可以看到HAProxy与RabbitMQ服务器建立连接的情况,consumer与producer可能与同一台服务器建立连接,也可能与不同的服务器建立连接。我们登录其中一台服务器,运行/etc/init.d/rabbitmq-server stop命令停掉RabbitMQ服务,再观察consumer程序的输出。
我们可能看到屏幕上输出如下信息:
NOT_FOUND – home node ‘rabbit@rabbit65′ of durable queue ‘bunny.queue’ in vhost ‘/’ is down or inaccessible
Caught #<Bunny::NotFound: NOT_FOUND – home node ‘rabbit@rabbit65′ of durable queue ‘bunny.queue’ in vhost ‘/’ is down or inaccessible> while redeclaring and registering bunny.queue!
此时consumer并不能继续。这是为什么?在前一篇blog里我讲过,如果拥有持久化队列和消息的那个节点宕机了,那么RabbitMQ集群不会自动恢复,只有等这台服务器起来后,才能重新工作。我们看到上述错误日志提示NOT_FOUND,因为这个存有持久化队列的服务器挂掉了,在其他节点上重新声明和注册该队列失败,所以工作无法继续。对于这种情况,似乎没有什么好办法,唯一处理办法就是尽快恢复服务器。
如果不是持久化队列,那么consumer能自我恢复吗?我们把consumer程序更改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env ruby
# encoding: utf-8
require “rubygems”
require “bunny”
while true
  begin
    conn = Bunny.new
    conn.start
    ch = conn.create_channel
    x  = ch.direct(“buuny.exchange2″)
    q  = ch.queue(“bunny.queue2″).bind(x, :routing_key => “test”)
    q.subscribe(:block => true) do |delivery_info, metadata, payload|
        puts “Received #{payload}”
    end
  rescue Exception => e
    puts e.message
  end
end
上述主要变更是声明了非持久化的交换机和队列(去掉了:durable=>1参数)。同时producer程序变更如下:
1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env ruby
# encoding: utf-8
require “rubygems”
require “bunny”
conn = Bunny.new
conn.start
ch = conn.create_channel
x  = ch.direct(“buuny.exchange2″)
data = rand.to_s
x.publish(data, :routing_key => ‘test’)
上述主要是去掉了:persistent=>1这个publish参数,还有:durable=>1这个交换机参数,表明非持久化消息。
改完后,同样开2个终端,先后运行consumer程序和producer程序,可以看到producer正常投递消息,consumer正常获取到消息,并打印到屏幕上。
我们登录一个后台RabbitMQ服务器,关掉RabbitMQ服务,此时看到consumer打印一行:
Exception in the main loop: AMQ::Protocol::EmptyResponseError
然后程序继续运行。用netstat命令看到,consumer和producer对应的连接,都自动分配到后台正常的RabbitMQ服务器上,到这一步就真正实现了RabbitMQ的高可用性。

没有评论:

发表评论