7、Topic

Topics

  • In the previous tutorial we improved our logging system. Instead of using a fanout exchange only capable of dummy broadcasting,we used a direct one ,and gained a possibility of selectively receiving the logs.
  • Although using the direct exchange improved our system,it still has limitations - it can't do routing based on multiple criteria.
  • In our logging system we might want to subscribe to not only logs based on severity,but also based on the source which emitted the log.You might know this concept from the syslog unix tool,which routes logs based on both severity and facility.
  • That would give us a lot of flexibility - we may want to listen to just critical error coming from 'cron' but also all logs from 'kern'.
  • To implement that in our logging system we need to learn about a more complex topic exchange.

Topic exchange

  • Message sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words,delimited by dots.The words can be anything, but usually they specify some features connected to the message.A few valid routing key examples:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit".There can be as many words in the routing key as you like, up to the limit of 255 bytes.

  • The binding key must also be in the same form.The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key.However there are two important special cases for binding keys:

    • (star) can substitute for exactly one word.
    • (hash) can substitute for zero or more words.
  • In this example,we're going to send message which all describe animals.The messages will be sent with a routing key that consists of three words(two dots).The first word in the routing key will describe speed,second a colour and third a species:"..".

  • We created three bindings: Q1 is bound with binding key ".orange." and Q2 with "..rabbit" and "lazy.#".

  • These bindings can be summarised as:

    • Q1 is interested in all the orange animals.
    • Q2 wants to hear everything about rabbits, and everything about lazy animals.
  • A message with a routing key set to "quick.orange.rabbit" will be delivered to both queues. Message "lazy.orange.elephant" also will go to both of them. On the other hand "quick.orange.fox" will only go to the first queue, and "lazy.brown.fox" only to the second. "lazy.pink.rabbit" will be delivered to the second queue only once, even though it matches two bindings. "quick.brown.fox" doesn't match any binding so it will be discarded.

  • What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.

  • On the other hand "lazy.orange.male.rabbit", even though it has four words, will match the last binding and will be delivered to the second queue.

Code

  •   public class EmitLogTopic {
          private static Log log = LogFactory.getLog(EmitLogTopic.class);
          private static final String EXCHANGE_NAME="topic_logs";
          public static void main(String[] argv)
          {
              ConnectionFactory connFactory = new ConnectionFactory();
              connFactory.setHost("localhost");
              Connection conn=null;
              Channel channel=null;
              try {
                  conn=connFactory.newConnection();
                  channel=conn.createChannel();
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                  //String[] severity={"info.#","warning.*","*.error.*"};
                  String message="info.hello,world";
                  channel.basicPublish(EXCHANGE_NAME,"info.fasfa.fasfas.fasfa",null,message.getBytes());//消息以info开头
                  System.out.println("sent:"+message);
                  /*for(String s:severity)
                  {
                      channel.basicPublish(EXCHANGE_NAME,s,null,message.getBytes());
                      System.out.println("sent:"+s+" "+message);
                  }*/
    
              } catch (IOException e) {
                  log.error(e);
              } catch (TimeoutException e) {
                  log.error(e);
              } finally {
                  if (channel!=null)
                  {
                      try {
                          channel.close();
                      } catch (IOException e) {
                          log.error(e);
                      } catch (TimeoutException e) {
                          log.error(e);
                      }
                  }
                  if (conn!=null)
                  {
                      try {
                          conn.close();
                      } catch (IOException e) {
                          log.error(e);
                      }
                  }
              }
    
          }
      }
      public class ReceiveLogTopic {
          private static Log log= LogFactory.getLog(ReceiveLogTopic.class);
          private static final String EXCHANGE_NAME="topic_logs";
          public static void main(String[] argv)
          {
              ConnectionFactory connFactory = new ConnectionFactory();
              connFactory.setHost("localhost");
              /*Connection conn=null;
              Channel channel=null;*/
              try {
                  final Connection conn=connFactory.newConnection();
                  final Channel channel=conn.createChannel();
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                  String queueName=channel.queueDeclare().getQueue();
    
                  String[] severity={"info.#","warning.*","*.error.*"};
                  /*for (String s:severity)
                  {
                      channel.queueBind(queueName,EXCHANGE_NAME,s);
                  }*/
                  channel.queueBind(queueName,EXCHANGE_NAME,"info.#");//消费者关注关于info的消息,消息以info开头
                  Consumer consumer=new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String message=new String(body,"UTF-8");
                          System.out.println("receive:"+envelope.getRoutingKey()+" "+message);
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              log.error(e);
                          }finally {
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  channel.basicConsume(queueName,false,consumer);
              } catch (IOException e) {
                  log.error(e);
              } catch (TimeoutException e) {
                  log.error(e);
              } finally {
              }
          }
      }
    

Summary

  • 消费者通过使用通配符关注一个有关的话题,当符合适配条件的消息传递路由器时,就会被分发到该监听队列,被消费者获得。
  • 例如 消费者关注info消息(info.#),当所有routing key 以info开头的消息都会被分发到。
  • 所有这些都是以consumer为关注点的。
原文地址:https://www.cnblogs.com/Black-Cobra/p/8926252.html