谷粒商城消息队列(三十九)

248-260 消息队列

代码提交到:https://gitee.com/dalianpai/gulimall

其实只要看过尚硅谷的springboot视频,就会发现和里面讲解的都是一模一样。

也简单学过别的机构的rabbitmq视频:https://www.cnblogs.com/dalianpai/category/1795311.html

测试代码如下:

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessageTest(){
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setCreateTime(new Date());
        orderReturnApplyEntity.setSkuName("wgr");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnApplyEntity);
    }


    @Test
    public void createExchange(){
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("交换机创建成功");

    }

    @Test
    public void createQueue(){
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("队列创建成功");
    }

    @Test
    public void createBing(){
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange"
                , "hello-java", null);
        amqpAdmin.declareBinding(binding);
    }

}

接受类:

@RabbitListener(queues={"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );

        return new PageUtils(page);
    }

    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnApplyEntity content,
                               Channel channel){
        System.out.println("接受的消息。。"+content);
        System.out.println(content.getSkuName());
        //channel内按顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag==>"+deliveryTag);
        //签收
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

配置类:

/**
 * @author WGR
 * @create 2020/7/27 -- 13:15
 */
@Slf4j
@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     *定制RabbitTemplate
     * 1.服务收到消息就回调
     *    1)spring.rabbitmq.publisher-confirms=true
     *    2)设置确认回调confirmCallback
     * 2.消息正确抵达队列进行回调
     *    1)spring.rabbitmq.publisher-returns=true
     *    2) spring.rabbitmq.template.mandatory=true
     *    设置确认回调ReturnCallback
     * 3.消费端确认(保证每个消费被正确消费,此时才可以broker删除这个消息)
     *    1)默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
     *       问题:
     *           我们收到很多消息,自动回复服务器ack,只有一个消息处理成功,宕机了,就会发生消息丢失。
     *           消费者手动确认模式,只要我们没有明确告诉MQ,货物被签收,没有ACK
     *           消息就一直是unacked状态,即使Consumer宕机。消息不会丢失,会重新变成ready
     *   2)如何签收:
     *      channel.basicAck(deliveryTag,false);签收获取
     *      channel.basicNack(deliveryTag,false,true);拒签
     *
     */

    @PostConstruct   //再配置类对象创建完成以后,执行这个方法
    public void initRabbitTemplate(){
        RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("发送成功");
                }else {
                    System.out.println("发送失败");
                }
            }
        };
        rabbitTemplate.setConfirmCallback(confirmCallback);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
            }
        });
    }


}

发送消息:

/**
 * @author WGR
 * @create 2020/7/27 -- 14:47
 */
@RestController
public class RabbitController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("sendMq")
    public String sendMq(){
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setCreateTime(new Date());
        orderReturnApplyEntity.setSkuName("wgr");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnApplyEntity);
        return  "ok";
    }

}

课件的几个图




原文地址:https://www.cnblogs.com/dalianpai/p/13427764.html