使用ActiveMQ Apollo实现即时消息推送

前言

现在大多网站会员系统或云端协作平台上都有即时消息通知功能,即消息推送,这对用户来说是非常贴心的功能。要实现消息推送服务,大致可以采用以下几种方式:

  1. 使用HTTP轮循方式
    • 说明:定时向HTTP服务端接口(Web Service API)获取最新消息,可结合ajax技术实现页面无刷新效果,这是主动拉取消息的机制,严格来说这不属于消息推送。
    • 优点:实现简单、可控性强、部署成本低
    • 缺点:实时性差
  2. 使用XMPP协议
    • 说明:XMPP(可扩展消息处理现场协议)是基于可扩展标记语言(XML)的协议,它用于即时消息(IM)以及在线现场探测。它促进在服务器之间的准即时操作,其前身是Jabber,是一个开源形式组织产生的网络即时通信协议。XMPP目前被IETF国际标准组织完成了标准化工作。
    • 优点:协议成熟、强大、可扩展性强、目前主要应用于众多IM系统
    • 缺点:协议比较复杂、冗余(基于XML)、费流量、部署成本高高。
  3. 使用MQTT协议
    • 说明:MQTT协议是IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明的,是轻量级的、基于代理的“发布/订阅”模式的消息传输协议
    • 优点:MQTT协议简洁、小巧、可扩展性强、流量开销很小、目前已经应用到企业领域
    • 缺点:还不够成熟、实现较复杂、服务端组件不开源,部署成本较高

EasyPM作为一个团队协作的项目管理平台,消息推送功能是少不了的。我们综合众多因素,最终采用MQTT协议方式,在众多MQTT协议实现架构中,我们选择了Apollo的实现架构。下边就介绍下基于Apollo架构如何实现消息推送和接收。

 

Apollo是什么?

Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。

 

参考资源

  1. MQTT
  2. ActiveMQ5
  3. Apollo官方文档
  4. eclipse paho

 

Apollo的下载和安装(本文仅介绍wendows系统安装)

进入Apollo下载页面 ,下载windows版本的压缩包并解压到自己的工作目录(如:E:\apache-apollo-1.7),并创建系统环境变量APOLLO_HOME=E:\apache-apollo-1.7。

如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable:
64位JVM
32位JVM

 

Apollo实例的创建和服务启动

  1. 创建实例
    进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create D:\apollo_broker,命令执行成功后,在D盘下会有apollo_broker目录,这便是apollo的服务实例,apollo之旅便从这里开始。
    在D:\apollo_broker下有个bin目录,其中有两个文件:
    apollo-broker.cmd是通过cmd命令启动apollo服务的
    apollo-broker-service.exe,是用于创建window服务的
  2. 命令行启动服务
    在D:\apollo_broker\bin目录下打开cmd窗口,执行apollo-broker run命令来启动apollo服务,
    启动成功可以在浏览器中查看运行情况,访问地址为 http://127.0.0.1:61680 , 默认用户名/密码:admin/password
  3. 创建windows服务
    找到cmd.exe文件,点击鼠标右键,以管理员身份运行,输入创建windows服务命令,如下图:
    EasyPM 团队协作 敏捷看板
    创建成功后,在windows服务中会有一个apollo_broker服务,设置其自动启动,启动系统时apollo就会自动启动了
    EasyPM 项目管理 迭代规划

 

MQTT协议的应用

MQTT协议有众多客户端实现,相关客户端请参考apollo官方文档
本文采用eclipse的paho客户端实现

  1. javascript客户端实现
    将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,下边时demo代码:
    var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
    /* 61623是ws连接的默认端口,可以在apollo中间件中进行配置
    (关于apollo的配置请参考:
    http://activemq.apache.org/apollo/documentation/user-manual.html
    ) */
    // set callback handlers 
    client.onConnectionLost = onConnectionLost; 
    client.onMessageArrived = onMessageArrived; 
    // connect the client 
    client.connect({userName:'admin',password:'password',onSuccess:onConnect}); 
    // called when the client connects 
    function onConnect() { // 连接成功后的处理 
         // Once a connection has been made, make a subscription and send a message. 
         console.log("onConnect"); 
         client.subscribe("/topic/event"); // 订阅消息的主题 
         var message = new Paho.MQTT.Message("Hello,this is a test"); 
         message.destinationName = "/topic/event"; 
         client.send(message); // 发送消息 
    } 
    // called when the client loses its connection 
    function onConnectionLost(responseObject) { // 连接丢失后的处理 
         if (responseObject.errorCode !== 0) { 
                 console.log("onConnectionLost:"+responseObject.errorMessage); 
         } 
    } 
    // called when a message arrives 
    function onMessageArrived(message) { // 消息接收成功后的处理 
         console.log("onMessageArrived:"+message.payloadString); 
    }
  2. java客户端实现
    paho java客户端目前只支持J2SE和安卓,提供源码下载和maven库。
    我们采用maven库,其地址如下:
    Official Releases
    Nightly Snapshots
    说明:设定版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。
    maven dependency配置:
    <dependency> 
        <groupId>org.eclipse.paho</groupId> 
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
        <version>1.0.1</version> 
    </dependency>
    java实现代码:
    String topic        = "MQTT Examples"; 
        String content      = "Message from MqttPublishSample";
        int qos             = 2;
        String broker       = "tcp://127.0.0.1:61613";
        String clientId     = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: "+content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }