ActiveMQ教程

Queue与Topic的比较

1、JMS Queue执行load balancer语义:

一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。

2、Topic实现publish和subscribe语义:

一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

3、分别对应两种消息模式:

Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者)

其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久订阅)和durable subscription (持久化订阅)2种消息处理方式。



ProducerTool.Java用于发送消息:

import javax.jms.Connection;     
import javax.jms.DeliveryMode;     
import javax.jms.Destination;     
import javax.jms.JMSException;     
import javax.jms.MessageProducer;     
import javax.jms.Session;     
import javax.jms.TextMessage;     
    
import org.apache.activemq.ActiveMQConnection;     
import org.apache.activemq.ActiveMQConnectionFactory;     
    
public class ProducerTool {     
    
    private String user = ActiveMQConnection.DEFAULT_USER;     
    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;     
    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;     
    
    private String subject = "TOOL.DEFAULT";     
    
    private Destination destination = null;     
    
    private Connection connection = null;     
    
    private Session session = null;     
    
    private MessageProducer producer = null;     
    
    // 初始化     
    private void initialize() throws JMSException, Exception {     
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(     
                user, password, url);     
        connection = connectionFactory.createConnection();     
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);     
        destination = session.createQueue(subject);     
        producer = session.createProducer(destination);     
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     
    }     
    
    // 发送消息     
    public void produceMessage(String message) throws JMSException, Exception {     
        initialize();     
        TextMessage msg = session.createTextMessage(message);     
        connection.start();     
        System.out.println("Producer:->Sending message: " + message);     
        producer.send(msg);     
        System.out.println("Producer:->Message sent complete!");     
    }     
    
    // 关闭连接     
    public void close() throws JMSException {     
        System.out.println("Producer:->Closing connection");     
        if (producer != null)     
            producer.close();     
        if (session != null)     
            session.close();     
        if (connection != null)     
            connection.close();     
    }     
} 

ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

import javax.jms.Connection;     
import javax.jms.Destination;     
import javax.jms.JMSException;     
import javax.jms.MessageConsumer;     
import javax.jms.Session;     
import javax.jms.MessageListener;     
import javax.jms.Message;     
import javax.jms.TextMessage;     
    
import org.apache.activemq.ActiveMQConnection;     
import org.apache.activemq.ActiveMQConnectionFactory;     
    
public class ConsumerTool implements MessageListener {     
    
    private String user = ActiveMQConnection.DEFAULT_USER;     
    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;     
    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;     
    
    private String subject = "TOOL.DEFAULT";     
    
    private Destination destination = null;     
    
    private Connection connection = null;     
    
    private Session session = null;     
    
    private MessageConsumer consumer = null;     
    
    // 初始化     
    private void initialize() throws JMSException, Exception {  
     //连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。  
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(     
                user, password, url);   
        //连接工厂创建一个jms connection  
        connection = connectionFactory.createConnection();     
        //是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //不支持事务   
        //目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅  
        destination = session.createQueue(subject);   
        //会话创建消息的生产者将消息发送到目的地  
        consumer = session.createConsumer(destination);     
             
    }     
    
    // 消费消息     
    public void consumeMessage() throws JMSException, Exception {     
        initialize();     
        connection.start();     
             
        System.out.println("Consumer:->Begin listening...");     
        // 开始监听     
        consumer.setMessageListener(this);     
        // Message message = consumer.receive();     
    }     
    
    // 关闭连接     
    public void close() throws JMSException {     
        System.out.println("Consumer:->Closing connection");     
        if (consumer != null)     
            consumer.close();     
        if (session != null)     
            session.close();     
        if (connection != null)     
            connection.close();     
    }     
    
    // 消息处理函数     
    public void onMessage(Message message) {     
        try {     
            if (message instanceof TextMessage) {     
                TextMessage txtMsg = (TextMessage) message;     
                String msg = txtMsg.getText();     
                System.out.println("Consumer:->Received: " + msg);     
            } else {     
                System.out.println("Consumer:->Received: " + message);     
            }     
        } catch (JMSException e) {     
            // TODO Auto-generated catch block     
            e.printStackTrace();     
        }     
    }     
}   


如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:

import javax.jms.JMSException;     
mport org.apache.activemq.ActiveMQConnection;  
    
public class Test {     
    
    /**   
     * @param args   
     */    
    public static void main(String[] args) throws JMSException, Exception {     
       
     // TODO Auto-generated method stub     
        ConsumerTool consumer = new ConsumerTool();     
        ProducerTool producer = new ProducerTool();    
        System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");  
        // 开始监听     
        consumer.consumeMessage();     
             
        // 延时500毫秒之后发送消息     
        Thread.sleep(500);     
        producer.produceMessage("Hello, world!");     
        producer.close();     
             
        // 延时500毫秒之后停止接受消息     
        Thread.sleep(500);     
        consumer.close();     
    }     
}