SpringBoot Rabbitmq发送消息

官方文档:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqp

引入依赖:

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

发送消息代码:

@RestController
@RequestMapping("/")
public class SenderMsgController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping(value = "/{str}")
    public void testSend(@RequestParam("str") String str) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            int millis = 500;
            Thread.sleep(new Long(millis));
            if (i%2 == 1) {
                String isr = "{
" + "	"dd":" + i + "}";

                amqpTemplate.convertAndSend("DirectExchange","test.1",isr,new MyMessageConverter());

            }else{
                String isr = "{
" + "	"dd":" + i + "}";
                amqpTemplate.convertAndSend("DirectExchange","test.2",isr,new MyMessageConverter());
            }
            System.out.println("第"+i+"次发送");
        }
    }
}
MyMessageConverter:
//MessagePostProcessor 接口可以对发送请求之前的Message 进行操作,这里我设置了contenttype为json格式
public class MyMessageConverter implements MessagePostProcessor {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
        return message;
    }
}

创建交换机、队列并互相绑定设置路由key

@Component
public class AmqpAdminConfig {

    @Autowired
    public AmqpAdmin amqpAdmin;


    @Bean
    public DirectExchange createDirectExchange(){
        DirectExchange directExchange = new DirectExchange("DirectExchange", false, false);
        amqpAdmin.declareExchange(directExchange);
        return directExchange;
    }

//    @Bean
//    public void createFanoutExchange(){
//        amqpAdmin.declareExchange(new FanoutExchange("FanoutExchange",false,false));
//    }

    @Bean
    public Queue createQueue1(){
        Queue queue = new Queue("queue-1", false, false, false);
        amqpAdmin.declareQueue(queue);
        return queue;
    }

    @Bean
    public Queue createQueue2(){
        Queue queue = new Queue("queue-2", false, false, false);
        amqpAdmin.declareQueue(queue);
        return queue;
    }

    @Bean
    public void createBinding1(){
        Binding bind = BindingBuilder.bind(createQueue1()).to(createDirectExchange()).with("test.1");
        amqpAdmin.declareBinding(bind);
    }
    @Bean
    public void createBinding2(){
        Binding bind = BindingBuilder.bind(createQueue2()).to(createDirectExchange()).with("test.2");
        amqpAdmin.declareBinding(bind);
    }

}

根据官方文档知道AmqpTemplate 和AmqpAdmin  已经自动配置,可直接注入使用,AmqpTemplate 封装了发送与接收的各种操作,AmqpAdmin  封装了针对交换机和消息队列的各种操作

原文地址:https://www.cnblogs.com/shiguotao-com/p/10445130.html