【ActiveMQ入门-4】ActiveMQ学习-异步接收

总体说明:

1. 一个生产者/发布者:可以向多个目的地发送消息;
2. 每个目的地(destination)可以有多个订阅者或消费者;

如下图所示:



程序结构:

1. Publisher.java  :创建1个生产者和4个主题,遍历4个主题,生产者依次向4个主题中发送Message,共发送5次;
2.Consumer.java :消费者,创建8个消费者,每两个消费者订阅一个相同的主题,采用异步接收方式;
3. Listener.java   :异步监听


运行结果:

生产者:
消费者:



程序代码:

Publisher.java
  1. package com.ll.activemq;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MapMessage;
  7. import javax.jms.Message;
  8. import javax.jms.MessageProducer;
  9. import javax.jms.Session;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. import org.apache.activemq.command.ActiveMQMapMessage;
  12. public class Publisher {
  13. private ConnectionFactory factory;
  14. private Connection connection = null;
  15. private Session session;
  16. private String brokerURL = "tcp://localhost:61616";
  17. private MessageProducer producer;
  18. private Destination[] destinations;
  19. /**
  20. * 构造函数 创建连接、创建生产者
  21. *
  22. * @throws JMSException
  23. */
  24. public Publisher() throws JMSException {
  25. factory = new ActiveMQConnectionFactory(brokerURL);
  26. connection = factory.createConnection();
  27. try {
  28. connection.start();
  29. } catch (JMSException jmse) {
  30. connection.close();
  31. throw jmse;
  32. }
  33. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  34. producer = session.createProducer(null);
  35. }
  36. /**
  37. * 设置目的地
  38. *
  39. * @param stocks
  40. * :主题名列表
  41. * @throws JMSException
  42. */
  43. protected void setTopics(String[] stocks) throws JMSException {
  44. destinations = new Destination[stocks.length];
  45. for (int i = 0; i < stocks.length; i++) {
  46. destinations[i] = session.createTopic("STOCKS." + stocks[i]);
  47. }
  48. }
  49. /**
  50. * 创建消息
  51. *
  52. * @param stock
  53. * :主题名
  54. * @param session
  55. * @return
  56. * @throws JMSException
  57. */
  58. protected Message createStockMessage(String stock, Session session)
  59. throws JMSException {
  60. MapMessage message = session.createMapMessage();
  61. message.setString("stock", stock);
  62. message.setDouble("price", 1.00);
  63. message.setDouble("offer", 0.01);
  64. message.setBoolean("up", true);
  65. return message;
  66. }
  67. /**
  68. * 发送消息
  69. * 遍历所有主题(目的地),向每个目的地分别发送一个MapMessage
  70. * @param stocks
  71. * :主题名
  72. * @throws JMSException
  73. */
  74. protected void sendMessage(String[] stocks) throws JMSException {
  75. //遍历所有主题
  76. for (int i = 0; i < stocks.length; i++) {
  77. // 创建消息
  78. Message message = createStockMessage(stocks[i], session);
  79. System.out.println("发送: "
  80. + ((ActiveMQMapMessage) message).getContentMap()
  81. + " on destination: " + destinations[i]);
  82. // 往目的地发送消息
  83. producer.send(destinations[i], message);
  84. }
  85. }
  86. public void close() throws JMSException {
  87. try {
  88. if (null != connection)
  89. connection.close();
  90. } catch (Exception e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. public static void main(String[] argss) throws JMSException {
  95. String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };
  96. Publisher publisher = new Publisher();
  97. publisher.setTopics(topics);
  98. //每隔1s发送一次消息,共发送5次消息
  99. for (int i = 0; i < 5; i++) {
  100. System.out.println("发布者第:" + i + " 次发布消息...");
  101. publisher.sendMessage(topics);
  102. try {
  103. Thread.sleep(2000);
  104. } catch (InterruptedException e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. publisher.close();
  109. }
  110. }

Consumer.java

  1. package com.ll.activemq;
  2. import javax.jms.Connection;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MessageConsumer;
  6. import javax.jms.Session;
  7. import org.apache.activemq.ActiveMQConnectionFactory;
  8. public class Consumer {
  9. private ActiveMQConnectionFactory factory;
  10. private String brokerURL = "tcp://localhost:61616";
  11. private Connection connection = null;
  12. private Session session;
  13. public Consumer() throws JMSException {
  14. factory = new ActiveMQConnectionFactory(brokerURL);
  15. connection = factory.createConnection();
  16. connection.start();
  17. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  18. }
  19. public Session getSession() {
  20. return session;
  21. }
  22. public static void main(String[] args) throws JMSException {
  23. String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };
  24. Consumer consumer = new Consumer();
  25. for (String stock : topics) {
  26. //创建目的地
  27. Destination destination = consumer.getSession().createTopic(
  28. "STOCKS." + stock);
  29. //创建消费者
  30. MessageConsumer messageConsumer = consumer.getSession()
  31. .createConsumer(destination);
  32. MessageConsumer messageConsumer2 = consumer.getSession()
  33. .createConsumer(destination);
  34. //设置监听器
  35. messageConsumer.setMessageListener(new Listener());
  36. messageConsumer2.setMessageListener(new Listener());
  37. }
  38. }
  39. }

Listener.java

  1. package com.ll.activemq;
  2. import java.text.DecimalFormat;
  3. import javax.jms.MapMessage;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. public class Listener implements MessageListener {
  7. /**
  8. * 异步接收
  9. * 当有消息时,就会触发该事件
  10. */
  11. public void onMessage(Message message) {
  12. try {
  13. MapMessage map = (MapMessage)message;
  14. String stock = map.getString("stock");
  15. double price = map.getDouble("price");
  16. double offer = map.getDouble("offer");
  17. boolean up = map.getBoolean("up");
  18. DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
  19. System.out.println("接收消息:"+stock + " " + df.format(price) + " " + df.format(offer) + " " + (up?"up":"down"));
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }



延伸:
若是将上述程序中的 createTopic全部替换为createQueue,则运行结果如下:
生产者输出不变,这里就不截图了。
下面是消费者消费情况截图:
  


















附件列表

    原文地址:https://www.cnblogs.com/ssslinppp/p/4462947.html