spring boot:用rocketmq消息订阅实现删除购物车商品功能(spring boot 2.3.3)

一,为什么要使用消息队列实现删除购物车商品功能?

     消息队列主要用来处理不需要立刻返回结果的业务,

     常见的例子:

     用户在下单后,要清除原购物车中的商品,

     这个处理过程不需要马上实现也不需要返回结果给用户,

     所以就适合使用队列来实现

说明:刘宏缔的架构森林是一个专注架构的博客,地址:https://www.cnblogs.com/architectforest

         对应的源码可以访问这里获取: https://github.com/liuhongdi/

说明:作者:刘宏缔 邮箱: 371125307@qq.com

二,演示项目的相关信息

1,项目地址

https://github.com/liuhongdi/mqcart

2,项目功能说明:

           演示了用rocketmq实现消息的发送和接收

3, 项目结构:如图:

三,配置文件说明

1,send/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <!--fastjson begin-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

2,receive/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <!--fastjson begin-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

说明:两个模块的pom.xml内容相同

3,receive/application.properties

server.port=8081

说明:两个模块同时运行时,需要把端口区分开,
         send不做修改,使用默认的8080端口
         receive这里指定使用8081端口

四,java代码说明

1,send/HomeController.java

@RestController
@RequestMapping("/home")
public class HomeController {
    @Autowired
    private Producer producer;
    //初始化并发送消息
    @RequestMapping("/send")
    public String send() throws Exception {
        //要删除的购物车的id
        List<Integer> cartList = new ArrayList<Integer>();
        cartList.add(1);
        cartList.add(2);
        cartList.add(3);
        //消息
        CartMsg msg = new CartMsg();
        msg.setUserId(1);
        msg.setCartList(cartList);

        String msgJson = JSON.toJSONString(msg);
        //生成一个信息,标签在这里手动指定
        Message message = new Message(RocketConstants.TOPIC, "carttag", msgJson.getBytes());
        //发送信息
        SendResult sendResult = producer.getProducer().send(message);
        System.out.println("生产者已发送一条信息,内容={"+sendResult+"}");

        return "success";
    }
}

2,send/CartMsg.java

//购物车消息
public class CartMsg {
    //用户id
    private int userId;
    public int getUserId() {
        return this.userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }

    //购物车id
    private List<Integer> cartList;
    public List<Integer> getCartList() {
        return this.cartList;
    }
    public void setCartList(List<Integer> cartList) {
        this.cartList = cartList;
    }
}

发送的消息体,
两个模块中的的CartMsg.java文件相同

3,send/RocketConstants.java

public class RocketConstants {
    //name server,有多个时用分号隔开
    public static final String NAME_SERVER = "127.0.0.1:9876";
    //topic的名字,应该从服务端先创建好,否则会报错
    public static final String TOPIC = "laoliutest";
}

配置name server和topic,

两个模块中的的RocketConstants.java文件相同

4,send/Producer.java

//消息生产者类
@Component
public class Producer {
    private String producerGroup = "cart_producer";
    private DefaultMQProducer producer;
    //构造
    public Producer(){
        //创建生产者
        producer = new DefaultMQProducer(producerGroup);
        //不开启vip通道
        producer.setVipChannelEnabled(false);
        //设定 name server
        producer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        start();
    }

    //使producer启动
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //返回producer
    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    //进行关闭的方法
    public void shutdown(){
        this.producer.shutdown();
    }
}

5,receive/Consumer.java

@Component
public class Consumer {

    //消费者实体对象
    private DefaultMQPushConsumer consumer;

    //消费者组
    public static final String CONSUMER_GROUP = "cart_consumer";

    //构造函数 用来实例化对象
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        //指定消费模式
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //指定订阅主题
        //指定订阅标签,*代表所有标签
        consumer.subscribe(RocketConstants.TOPIC, "*");
        //注册一个消费消息的Listener
        //对消息的消费在这里实现
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //遍历接收到的消息
            try {
                for (Message msg : msgs) {
                    //得到消息的body
                    String body = new String(msg.getBody(), "utf-8");
                    //用json转成对象
                    CartMsg msgOne = JSON.parseObject(body, CartMsg.class);
                    //打印用户id
                    System.out.println(msgOne.getUserId());
                    //打印消息内容
                    System.out.println("消费者已接收到消息-topic={"+msg.getTopic()+"}, 消息内空={"+body+"}");
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.println("消费者 启动成功=======");
    }
}

这个是消息的消费者

6,其他消息的非关键代码可访问github

五,演示效果

1,访问send模块的controller:

http://127.0.0.1:8080/home/send

显示发送成功:

success

查看控制台:

生产者已发送一条信息,内容={SendResult [sendStatus=SEND_OK, msgId=C0A803D5113442A57993512ADA8E0000, 
offsetMsgId=7F00000100002A9F0000000000003AE8, messageQueue=MessageQueue [topic=laoliutest, brokerName=broker-a, queueId=3], queueOffset=0]}

2,查看receive模块的控制台:

1
消费者已接收到消息-topic={laoliutest}, 消息内容={{"cartList":[1,2,3],"userId":1}

收到了消息,消息内容可解析

六,查看spring boot版本:

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )\___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)
原文地址:https://www.cnblogs.com/architectforest/p/13680757.html