activeMQ消息的事务

JMS消息的事务
1、创建事务createSession(paramA,paramB);
paramA是设置事务的,paramB设置acknowledgment mode(应答模式)
paramA设置为false时: paramB的值可为Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。

2、事务的应答确认
 A) paramA 设置为false时:
Session.AUTO_ACKNOWLEDGE为自动确认,当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。(默认是批量确认)
DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收,而且允许重复确认。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。

消费者的消费方式
下两种方法之一:
同步消费。通过调用消费者的receive方法从目的地中显式提取消息。
receive方法可以一直阻塞到消息到达。
异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。
JAVA 消息队列学习笔记(五) - 漂动的影子 - 精彩的世界需要你来添彩
   activeMQ支持多种通讯协议TCP/UDP等,我们选取最常用的TCP来分析activeMQ的通讯机制。首先我们来明确一个概念:
   客户(Client):消息的生产者、消费者对activeMQ来说都叫作客户。
  消息中转器(Message broker):它是activeMQ的核心,它接收信息并进行相关处理后分发给消息消费者。
  为了能清楚的描述出activeMQ的核心通讯机制,我们选择3个部分来进行说明,它们分别是建立链接、关闭链接、心跳。
一、Client跟activeMQ的TCP通讯初始化过程分析如下:
      1、activeMQ初始化时,通过TcpTransportServer类根据配置打开TCP侦听端口,客户通过该端口发起建立链接的动作。
      2、把accept的Socket放入阻塞队列中。
      3、另外一个线程Scoket handler 阻塞着等待队列中是否有新的Socket,如果有则取出来。
      4、生成一个TransportConnection的实例。TransportConnection类的主要作用是处理链路的状态信息,并实现CommandVisitor接口来完成各类消息的处理。
     5、TransportConnection 会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序;
MutexTransport -> WireFormatNegotiator -> InactivityMonitor->TcpTransport。在这条链条中最后的一个环就是TcpTransport类,它是实际和Client获取和发送数据的地方,该类的重要
     6、 建链完成,可以进行通讯操作。方法有run()和oneway(),一个负责读取,一个负责发送。

二。关闭链接
activeMQ 发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的 int n = in.read(buffer,position, buffer.length - position);

三、心跳
     为了更好的维护TCP链路的使用,activeMQ采用了心中机制作为判断双方链路的健康情况。activeMQ使用的是双向心中,也就是activeMQ的Broker和Client双方都进行相互心跳,但不管是Broker或Client心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍。
    心跳会产生两个线程“InactivityMonitor ReadCheck” 和 “InactivityMonitor WriteCheck”, 它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要的调用的方法是readCheck(), 当在等待时间内,有消息接收到,则该方法会返回true。WriteCheck线程主要调用的方法是writeCkheck(),这有个小技巧,大家可能参考一下,那就是当WriteCheck纯种休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过TCP向对方真的发送心中消息,这样可以从一定程度上减少网络传输的数据量。