ActiveMQ使用线程池实现消息的生产与消费


1。  首先先引入相关的lib包,重点需引用activemq-client-5.8.0.jar,activemq-core-5.7.0.jar,activemq-pool-5.8.0.jar,activemq-protobuf-1.1.jar等包,其他包

自行配置。


2。  一些公共工具类的代码:

JMSProducer.Java

package com.ffcs.icity.jms;  
  
import java.util.Map;  
import java.util.Set;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
import javax.jms.Connection;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.ExceptionListener;  
import javax.jms.JMSException;  
import javax.jms.MapMessage;  
import javax.jms.Message;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
import org.apache.activemq.pool.PooledConnectionFactory;  
  
/** 
 * JMS消息生产者 
 * @author linwei 
 * 
 */  
public class JMSProducer implements ExceptionListener{  
      
    //设置连接的最大连接数  
    public final static int DEFAULT_MAX_CONNECTIONS=5;  
    private int maxConnections = DEFAULT_MAX_CONNECTIONS;  
    //设置每个连接中使用的最大活动会话数  
    private int maximumActiveSessionPerConnection = DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION;  
    public final static int DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION=300;  
    //线程池数量  
    private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;  
    public final static int DEFAULT_THREAD_POOL_SIZE=50;  
    //强制使用同步返回数据的格式  
    private boolean useAsyncSendForJMS = DEFAULT_USE_ASYNC_SEND_FOR_JMS;  
    public final static boolean DEFAULT_USE_ASYNC_SEND_FOR_JMS=true;  
    //是否持久化消息  
    private boolean isPersistent = DEFAULT_IS_PERSISTENT;  
    public final static boolean DEFAULT_IS_PERSISTENT=true;   
      
    //连接地址  
    private String brokerUrl;  
  
    private String userName;  
  
    private String password;  
  
    private ExecutorService threadPool;  
  
    private PooledConnectionFactory connectionFactory;  
  
    public JMSProducer(String brokerUrl, String userName, String password) {  
        this(brokerUrl, userName, password, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION, DEFAULT_THREAD_POOL_SIZE, DEFAULT_USE_ASYNC_SEND_FOR_JMS, DEFAULT_IS_PERSISTENT);  
    }  
      
    public JMSProducer(String brokerUrl, String userName, String password, int maxConnections, int maximumActiveSessionPerConnection, int threadPoolSize,boolean useAsyncSendForJMS, boolean isPersistent) {  
        this.useAsyncSendForJMS = useAsyncSendForJMS;  
        this.isPersistent = isPersistent;  
        this.brokerUrl = brokerUrl;  
        this.userName = userName;  
        this.password = password;  
        this.maxConnections = maxConnections;  
        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;  
        this.threadPoolSize = threadPoolSize;  
        init();  
    }  
        
    private void init() {  
        //设置JAVA线程池  
        this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);  
        //ActiveMQ的连接工厂  
        ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);  
        actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS);  
        //Active中的连接池工厂  
        this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory);  
        this.connectionFactory.setCreateConnectionOnStartup(true);  
        this.connectionFactory.setMaxConnections(this.maxConnections);  
        this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection);  
    }  
      
      
    /** 
     * 执行发送消息的具体方法 
     * @param queue 
     * @param map 
     */  
    public void send(final String queue, final Map<String, Object> map) {  
        //直接使用线程池来执行具体的调用  
        this.threadPool.execute(new Runnable(){  
            @Override  
            public void run() {  
                try {  
                    sendMsg(queue,map);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
    }  
      
    /** 
     * 真正的执行消息发送 
     * @param queue 
     * @param map 
     * @throws Exception 
     */  
    private void sendMsg(String queue, Map<String, Object> map) throws Exception {  
          
        Connection connection = null;  
        Session session = null;  
        try {  
            //从连接池工厂中获取一个连接  
            connection = this.connectionFactory.createConnection();  
            /*createSession(boolean transacted,int acknowledgeMode) 
              transacted - indicates whether the session is transacted acknowledgeMode - indicates whether the consumer or the client  
              will acknowledge any messages it receives; ignored if the session is transacted.  
              Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE. 
            */  
            //false 参数表示 为非事务型消息,后面的参数表示消息的确认类型  
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
            //Destination is superinterface of Queue  
            //PTP消息方式       
            Destination destination = session.createQueue(queue);  
            //Creates a MessageProducer to send messages to the specified destination  
            MessageProducer producer = session.createProducer(destination);  
            //set delevery mode  
            producer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);  
            //map convert to javax message  
            Message message = getMessage(session, map);  
            producer.send(message);  
        } finally {  
            closeSession(session);  
            closeConnection(connection);  
        }  
    }  
      
    private Message getMessage(Session session, Map<String, Object> map) throws JMSException {  
        MapMessage message = session.createMapMessage();  
        if (map != null && !map.isEmpty()) {  
            Set<String> keys = map.keySet();  
            for (String key : keys) {  
                message.setObject(key, map.get(key));  
            }  
        }  
        return message;  
    }  
      
    private void closeSession(Session session) {  
        try {  
            if (session != null) {  
                session.close();  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    private void closeConnection(Connection connection) {  
        try {  
            if (connection != null) {  
                connection.close();  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
    @Override  
    public void onException(JMSException e) {  
        e.printStackTrace();  
    }  
  
}  
JMSConsumer.java



package com.ffcs.icity.jms;  
  
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.ExceptionListener;  
import javax.jms.JMSException;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;  
import javax.jms.Session;  
  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
import org.apache.activemq.ActiveMQPrefetchPolicy;  
  
  
/** 
 * JMS消息消费者 
 * @author linwei 
 * 
 */  
public class JMSConsumer implements ExceptionListener {  
  
    //队列预取策略  
    private int queuePrefetch=DEFAULT_QUEUE_PREFETCH;  
    public final static int DEFAULT_QUEUE_PREFETCH=10;  
      
    private String brokerUrl;  
      
    private String userName;  
  
    private String password;  
  
    private MessageListener messageListener;  
      
    private Connection connection;  
      
    private Session session;  
    //队列名  
    private String queue;  
      
      
    /** 
     * 执行消息获取的操作 
     * @throws Exception 
     */  
    public void start() throws Exception {  
        //ActiveMQ的连接工厂  
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);  
        connection = connectionFactory.createConnection();  
        //activeMQ预取策略  
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();  
        prefetchPolicy.setQueuePrefetch(queuePrefetch);  
        ((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy);  
        connection.setExceptionListener(this);  
        connection.start();  
        //会话采用非事务级别,消息到达机制使用自动通知机制  
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        Destination destination = session.createQueue(this.queue);  
        MessageConsumer consumer = session.createConsumer(destination);  
        consumer.setMessageListener(this.messageListener);  
    }  
      
      
    /** 
     * 关闭连接 
     */  
    public void shutdown(){  
        try {  
            if (session != null) {  
                session.close();  
                session=null;  
            }  
            if (connection != null) {  
                connection.close();  
                connection=null;  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
    @Override  
    public void onException(JMSException e) {  
        e.printStackTrace();  
    }  
  
  
    public String getBrokerUrl() {  
        return brokerUrl;  
    }  
  
  
    public void setBrokerUrl(String brokerUrl) {  
        this.brokerUrl = brokerUrl;  
    }  
  
  
    public String getUserName() {  
        return userName;  
    }  
  
  
    public void setUserName(String userName) {  
        this.userName = userName;  
    }  
  
  
    public String getPassword() {  
        return password;  
    }  
  
  
    public void setPassword(String password) {  
        this.password = password;  
    }  
  
  
    public String getQueue() {  
        return queue;  
    }  
  
  
    public void setQueue(String queue) {  
        this.queue = queue;  
    }  
  
  
    public MessageListener getMessageListener() {  
        return messageListener;  
    }  
  
  
    public void setMessageListener(MessageListener messageListener) {  
        this.messageListener = messageListener;  
    }  
  
  
    public int getQueuePrefetch() {  
        return queuePrefetch;  
    }  
  
  
    public void setQueuePrefetch(int queuePrefetch) {  
        this.queuePrefetch = queuePrefetch;  
    }  
      
      
}  
MessageHandler.java



package com.ffcs.icity.jms;  
  
import javax.jms.Message;  
  
  
/** 
 * 提供消息操作的回调接口 
 * @author linwei 
 * 
 */  
public interface MessageHandler {  
  
      
    /** 
     * 消息回调提供的调用方法 
     * @param message 
     */  
    public void handle(Message message);  
}  

MultiThreadMessageListener.java
[java] view plain copy 在CODE上查看代码片派生到我的代码片
package com.ffcs.icity.jms;  
  
import java.util.concurrent.ExecutorService;  
  
import javax.jms.Message;  
import javax.jms.MessageListener;  
  
  
/** 
 * 消息消费者中使用的多线程消息监听服务 
 * @author linwei 
 * 
 */  
public class MultiThreadMessageListener implements MessageListener {  
  
    //默认线程池数量  
    public final static int DEFAULT_HANDLE_THREAD_POOL=10;  
    //最大的处理线程数.  
    private int maxHandleThreads;  
    //提供消息回调调用接口  
    private MessageHandler messageHandler;  
  
    private ExecutorService handleThreadPool;  
      
      
    public MultiThreadMessageListener(MessageHandler messageHandler){  
        this(DEFAULT_HANDLE_THREAD_POOL, messageHandler);  
    }  
      
    public MultiThreadMessageListener(int maxHandleThreads,MessageHandler messageHandler){  
        this.maxHandleThreads=maxHandleThreads;  
        this.messageHandler=messageHandler;  
        //支持阻塞的固定大小的线程池(自行手动创建的)  
        this.handleThreadPool = new FixedAndBlockedThreadPoolExecutor(this.maxHandleThreads);  
    }  
      
      
    /** 
     * 监听程序中自动调用的方法 
     */  
    @Override  
    public void onMessage(final Message message) {  
        //使用支持阻塞的固定大小的线程池来执行操作  
        this.handleThreadPool.execute(new Runnable() {  
            public void run() {  
                try {  
                    MultiThreadMessageListener.this.messageHandler.handle(message);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
    }  
  
}  
FixedAndBlockedThreadPoolExecutor.java



package com.ffcs.icity.jms;  
  
import java.util.concurrent.BlockingQueue;  
import java.util.concurrent.LinkedBlockingQueue;  
import java.util.concurrent.ThreadPoolExecutor;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
  
/** 
 * 支持阻塞的固定大小的线程池 
 * @author linwei 
 * 
 */  
public class FixedAndBlockedThreadPoolExecutor extends ThreadPoolExecutor {  
  
      
    //一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。  
    //使用 lock 块来调用 try,在之前/之后的构造中  
    private ReentrantLock lock = new ReentrantLock();  
      
    private Condition condition = this.lock.newCondition();  
      
    public FixedAndBlockedThreadPoolExecutor(int size) {  
        super(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());  
    }  
  
      
    /** 
     * 当线程池中没有空闲线程时,会挂起此方法的调用线程.直到线程池中有线程有空闲线程. 
     */  
    @Override  
    public void execute(Runnable command) {  
        //进行同步锁定  
        this.lock.lock();  
        super.execute(command);  
        try {  
            //如果线程池的数量已经达到最大线程池的数量,则进行挂起操作  
            if (getPoolSize() == getMaximumPoolSize()) {  
                this.condition.await();  
            }  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } finally {  
            this.lock.unlock();  
        }  
    }  
      
    @Override  
    protected void afterExecute(Runnable r, Throwable t) {  
        super.afterExecute(r, t);  
        try {  
            this.lock.lock();  
            this.condition.signal();  
        } finally {  
            this.lock.unlock();  
        }  
    }  
      
      
}  


3. 调用例子说明:

生产者调用代码,JMSProducerTest.java

package com.ffcs.icity.test;  
  
import java.util.HashMap;  
import java.util.Map;  
  
import com.ffcs.icity.jms.JMSProducer;  
  
public class JMSProducerTest {  
  
      
    public static void main(String[] args) {  
          
        locationTest();  
        System.out.println("over.");  
    }  
      
    private static void locationTest() {  
        //**  JMSProducer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSProducer(因为其内部存在一个线程池)  
          
        //支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为tcp://localhost:61613。   
        //tcp和nio的区别  
        //nio://localhost:61617 以及 tcp://localhost:61616均可在 activemq.xml配置文件中进行配置  
        JMSProducer producer = new JMSProducer("nio://localhost:61617", "system", "manager");  
        Map<String, Object> map = new HashMap<String, Object>();  
        map.put("id", "1");  
        map.put("name", "sss1113333");  
        map.put("password", "password");  
        producer.send("test", map);  
    }  
      
}  

消费者调用代码,JMSConsumerTest.java

package com.ffcs.icity.test;  
  
import javax.jms.MapMessage;  
import javax.jms.Message;  
  
import com.ffcs.icity.jms.JMSConsumer;  
import com.ffcs.icity.jms.MessageHandler;  
import com.ffcs.icity.jms.MultiThreadMessageListener;  
  
public class JMSConsumerTest {  
  
      
    public static void main(String[] args) throws Exception {  
          
        //**  JMSConsumer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSConsumer(因为其内部存在一个线程池)  
  
        JMSConsumer consumer = new JMSConsumer();  
        consumer.setBrokerUrl("tcp://localhost:61616");  
        consumer.setQueue("test");  
        consumer.setUserName("system");  
        consumer.setPassword("manager");  
        consumer.setQueuePrefetch(500);  
        consumer.setMessageListener(new MultiThreadMessageListener(50,new MessageHandler() {  
            public void handle(Message message) {  
                try {  
                    System.out.println("name is " + ((MapMessage)message).getString("name"));  
                    Thread.sleep(5000);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        }));  
        consumer.start();  
          
//      Thread.sleep(5000);  
//      consumer.shutdown();  
          
    }  
      
      
}  


转自:http://blog.csdn.net/linwei_1029/article/details/16964943