2013年6月13日星期四

RabbitMQ与PHP(二)


在上一节中,详细介绍了RabbitMQ的exchange/routingkey/queue等概念,以及示例了如何使用PHP发送和处理消息的代码。这一节,将介绍在项目中如何使用PHP多线程的进行消息实时处理,以及简要介绍一些RabbitMQ的安装相关。熟悉的可以将安装这部分跳过。

一, RabbitMQ的安装:
需要首先安装erlong 
1
2
3
4
5
6
7
wget http://www.erlang.org/download/otp_src_R16B.tar.gz
tar -xzvf otp_src_R16B.tar.gz
cd otp_src_R16B
./configure --help
./configure --prefix=/usr/local/erlong
make
make install
注意: 这里将erlong安装到了指定的目录: /usr/local/erlong,而不是使用默认的路径。这是一个好的习惯,对于版本控制等都会有好处。但是这会导致后面 rabbitMQ报错:找不到erl 执行文件,需要多做一些处理才行。

安装RabbitMQ
1
2
3
4
5
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.0.4/rabbitmq-server-generic-unix-3.0.4.tar.gz
tar -xzvf rabbitmq-server-generic-unix-3.0.4.tar.gz 
mv rabbitmq_server-3.0.4/ /usr/local/
cd /usr/local/rabbitmq_server-3.0.4/sbin
./rabbitmq-server
到这里会出现一个报错信息: ./rabbitmq-server: line 86: erl: command not found  这是因为erlong指定了安装路径,在系统的PATH中找不到。只要export PATH=$PATH:/usr/local/erlong/bin 就可以了。
如果为了rc.local启动方便,可以将 export PATH=$PATH:/usr/local/erlong/bin 这一行写入到 rabbitmq-server 文件中:
执行后,ps -aux 一下,看到进程中有/usr/local/erlong/lib/erlang/erts-5.10.1/bin/epmd -daemon 和 /usr/local/erlong/lib/erlang/erts-5.10.1/bin/beam.smp 就OK了。

sbin目录下还有一个脚本: rabbitmqctl 也很常用,与 rabbitmq-server 一样需要指明erlong的路径才能正确工作。
常用的方法:  
rabbitmqctl start-app 启动后,执行一下这个比较保险
rabbitmqctl list_exchanges 显示当前所有的交换机
rabbitmqctl list_queue 查看当前有效队列情况

二, PHP extension的安装:
PHP操作rabbitmq需要AMQP扩展的支持。下载扩展: http://pecl.php.net/package/amqp ,安装过程与一般扩展一样,/usr/local/php/bin/phpize; ./configure --with-php-config=/usr/local/php/bin/php-config; make && make install
然后编辑php.ini 插入: 
[amqp]
extension = amqp.so
重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。


三,如何使用PHP进行实时后端消息处理
首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)
然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# _*_ coding:utf-8 _*_
'''
yoka at 实现多线程处理任务的守护进程
Created on 2012-4-7
@author: xwarrior
@update: jimmy.dong@gmail.com
'''
#在此引入项目需要的数据包
from MyJobs import MyJobs
from MyThread import MyThread
import logging
import time
     
def main():
    logger = logging.getLogger('main()')
    logger.info('server start!')
    worker_threads = 2 #定义需要启动的线程数量
    timeline = 2 #线程检查时间间隔,秒
    thread_pool = {}
         
    for in range(0, worker_threads ):
        param = 'some param'
        job = MyJobs( param )
        thread = MyThread( job, i )
        thread.setDaemon = True
        thread.start()
        logger.info('start thread %s' %( thread.getName() ))
        thread_pool[i] = thread
         
    #干完就结束模式
    #for eachKey in thread_pool.keys():
    #    thread_pool[eachKey].join()
         
    #保持线程数量模式
    while 1:
        time.sleep(timeline)
        #   检查每一个线程
        for eachKey in thread_pool.keys():
            if thread_pool[eachKey].isAlive():
                print 'thread alive:' + str(i)
            else:
                print 'thread down:' + str(i)
                thread_pool[eachKey].run()
             
    logger.info('main exist!')
    return
     
     
if __name__ == '__main__':
    #init config format
    FORMAT = '%(asctime)-15s  %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
     
    main()
    pass
线程脚本: MyThread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# _*_ coding:utf-8 _*_
'''
Created on 2013-03-25
@author: jimmy.dong@gmail.com
'''
from threading import Thread
              
class MyThread(Thread):
    '''
    创建线程
    '''
              
    def __init__(self,job,thread_id):
        '''
        Constructor
        '''
        self.job = job
        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))
                  
    def run(self):
        self.job.run()
                      
    def stop(self):
        self.job.exit()
任务脚本: MyJobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# _*_ coding:utf-8 _*_
'''
Created on 2013-03-25
@author: jimmy.dong@gmail.com
'''
import os
import urllib2
              
class MyJobs(object):
                  
    def __init__(self, param ):
        #do something
        self.param = param
                  
    def __del__(self):
        ''' destruct '''
        self.exit()
                      
    def exit(self):
        ''' 退出'''
        self.quit = True
              
    def run(self):
        ''' 开始处理 '''
        #使用shell模式
        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'
        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)
        re = os.system(cmd)
                      
        #使用web模式
        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))
        #response = urllib2.urlopen(req)
        #re = response.read()
        #print re
                     
在任务调度(start_jobs.py)中,设计了两种工作模式: 
一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;
另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。
1
2
3
4
5
6
7
8
9
10
11
12
/$q->consume('processMessage');   //需手动应答
         
/**
 * 消费回调函数
 */
function processMessage($envelope$queue) { 
    global $counter;
    $msg $envelope->getBody(); 
    echo $msg."\n"//处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
    if($counter++ > 5)return FALSE;  //处理5个消息后退出
}
用 两个线程,检查间隔2秒,SHELL模式 测试运行结果:

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。


没有评论:

发表评论