springboot+redis实现消息队列

文章参考

SpringBoot(9) 基于Redis消息队列实现异步操作

https://blog.csdn.net/wilsonsong1024/article/details/80573611

所做的改进

  • 博客中实用的是jedis操作,在springboot的年代,我们不需要去写redis的操作工具类了。
  • 直接上redisTemplate的使用。
  • handler的处理需要根据业务需求改造。
  • 增加了测试部分

觉得后期的改进

  1. 消费redis的时候,看看有没有阻塞的策略(我的代码中是一直查询,感觉不太好)
  2. 消费线程,直接使用的是new thread。这个不太好管理(后期用线程池优化)

 涉及spring和springboot使用的部分

  • RedisTemplate序列化的配置,以及api相关的应用
  • fastjson的JSONObject等相关使用
  • InitializingBean,ApplicationContextAware是使用,以及其实现接口的作用
  • 线程池相关的问题学习
  • 模板模式的抽象能力


代码

pom.xml文件

    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.pig</groupId>
    <artifactId>about-redis</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>about-redis</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
AboutRedisApplication(这种redis序列化,可以在redis客户端中看到字符串)
package com.pig.aboutredis;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@SpringBootApplication
public class AboutRedisApplication {

    public static void main(String[] args) {
        try{

            SpringApplication.run(AboutRedisApplication.class, args);
        }catch (Exception e){
            e.printStackTrace();
        }
    }



    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String,Object> template=new RedisTemplate<>();
        template.setConnectionFactory(factory);

        Jackson2JsonRedisSerializer serializer=new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om=new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(om);

        template.setValueSerializer(serializer);
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        // 如果这里不设置的话,hash的类型如果是自定义类型,就会报错,序列化问题
        template.setHashValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }

}
EventModel (return this 值得思考学习)
package com.pig.aboutredis.messagequeue.common;


import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

import java.util.HashMap;
import java.util.Map;

public class EventModel {


    private EventType type;
    private int actorId;
    private int entityType;
    private int entityId;
    private int entityOwerId;
    public Map<String,String> exts=new HashMap<>();

    public EventModel() {
    }

    public EventModel(EventType type) {
        this.type = type;
    }

    public EventType getType() {
        return type;
    }

    public EventModel setType(EventType type) {
        this.type = type;
        return this;
    }

    public int getActorId() {
        return actorId;
    }

    public EventModel setActorId(int actorId) {
        this.actorId = actorId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityOwerId() {
        return entityOwerId;
    }

    public EventModel setEntityOwerId(int entityOwerId) {
        this.entityOwerId = entityOwerId;
        return this;
    }

    public String getExts(String key) {
        return exts.get(key);
    }

    public EventModel setExts(String key,String value) {
        exts.put(key,value);
        return this;
    }

    @Override
    public String toString() {
        return "EventModel{" +
                "type=" + type +
                ", actorId=" + actorId +
                ", entityType=" + entityType +
                ", entityId=" + entityId +
                ", entityOwerId=" + entityOwerId +
                '}';
    }
}
EventType
package com.pig.aboutredis.messagequeue.common;

//事件的各种类型
public enum EventType {
    LIKE(0),COMMENT(1),LOGIN(2),MAIL(3);

    private int value;
    EventType(int value){
        this.value=value;
    }

    public int getValue(){
        return value;
    }

}
EventProducer
package com.pig.aboutredis.messagequeue.producer;

import com.alibaba.fastjson.JSONObject;
import com.pig.aboutredis.messagequeue.common.EventModel;
import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.security.Key;

@Component
@Slf4j
public class EventProducer {

    @Autowired
    private RedisTemplate redisTemplate;


    // 把事件分发出去
    // 就是存储到redis中的list数据类型中
    public boolean fireEvent(EventModel model){
        try{
            String jsonString = JSONObject.toJSONString(model);
            String queueKey = RedisKeyUtil.getEventQueueKey();
            redisTemplate.opsForList().leftPush(queueKey,jsonString);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

}
RedisKeyUtil 这个在我的代码没有任何意义,只需要一个共同的key(消费者和生产者)
package com.pig.aboutredis.messagequeue.utils;

public class RedisKeyUtil {


    private static final String SPLIT=":";
    private static final String BIZ_LIKE="LIKE";
    private static final String BIZ_DISLIKE="DISLIKE";
    private static final String BIZ_EVENTQUEUE="EVENTQUEUE";


    public static String getLikeKey(int entityType,int entityId){
        return BIZ_LIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId);
    }

    public static String getDisLikeKey(int entityType,int entityId){
        return BIZ_DISLIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId);
    }
    public static String getEventQueueKey(){
        return BIZ_EVENTQUEUE;
    }

}
EventHandler 这里用到了模板设计模式,很多抽象接口,使用到了模板模式
package com.pig.aboutredis.messagequeue.consumer;

import com.pig.aboutredis.messagequeue.common.EventModel;
import com.pig.aboutredis.messagequeue.common.EventType;

import java.util.List;

// 模板设计模式
public interface EventHandler {

    void doHandler(EventModel model);

    List<EventType> getSupportEventTypes();
}
EventConsumer 这种方法也可以自动注入所有EventHandler类型的bean
@Autowired
private Map<String,EventHandler> beans;
package com.pig.aboutredis.messagequeue.consumer;

import com.alibaba.fastjson.JSON;
import com.pig.aboutredis.messagequeue.common.EventModel;
import com.pig.aboutredis.messagequeue.common.EventType;
import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Handler;

@Component
@Slf4j
public class EventConsumer implements InitializingBean,ApplicationContextAware {

    private Map<EventType,List<EventHandler>> config=new HashMap<>();

    // 获取spring的context
    private ApplicationContext context;

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context=applicationContext;
    }


    public void afterPropertiesSet() throws Exception {
        Map<String,EventHandler> beans=context.getBeansOfType(EventHandler.class);

        if(!CollectionUtils.isEmpty(beans)){
            for (Map.Entry<String,EventHandler> entry:beans.entrySet()){
                // 当前handler能够处理的类型
                // 例如:LikeHandler 能够处理 EventType.LIKE
                List<EventType> eventTypes = entry.getValue().getSupportEventTypes();
                // 初始化 config
                // key: EventType.LIKE类型 value: LikeHandler能够处理type的handler
                for (EventType type : eventTypes) {
                    // 如果不包含,就创建一个可以
                    // 如果包含,就add进入list
                    if(!config.containsKey(type)){
                        config.put(type,new ArrayList<EventHandler>());
                    }
                    config.get(type).add(entry.getValue());
                }
            }
        }

        // 消费策略
        // 有机会,优化为线程池类型。
        // 真正的线上环境,是不可能这样创建线程的,这样不好控制,没有高可用,不方便管理
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    String key = RedisKeyUtil.getEventQueueKey();
                    // producer是从left进入,consumer从right消费
                    String rightPop = (String) redisTemplate.opsForList().rightPop(key);
                    // 需要优化,redis api是否可以阻塞,如果有阻塞的可能,那么这里就不用判断了
                    if(StringUtils.isEmpty(rightPop)){
                        continue;
                    }
                    EventModel eventModel = JSON.parseObject(rightPop, EventModel.class);
                    if(!config.containsKey(eventModel.getType())){
                        log.error("不能识别的事件");
                        continue;
                    }
                    // 一个类型,多个handler可以消费
                    // 这里的消费策略,需要根据业务情况设置
                    for (EventHandler handler : config.get(eventModel.getType())) {
                        handler.doHandler(eventModel);
                    }

                }
            }
        });
        thread.start();

    }
}
LikeHandler
package com.pig.aboutredis.messagequeue.consumer.handler;

import com.pig.aboutredis.messagequeue.common.EventModel;
import com.pig.aboutredis.messagequeue.common.EventType;
import com.pig.aboutredis.messagequeue.consumer.EventHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;


@Component
@Slf4j
public class LikeHandler implements EventHandler {


    @Override
    public void doHandler(EventModel model) {
        log.info("LikeHandler 消费了你的数据,开始消费");
        log.info("LikeHandler 消费了你的数据,消费{}",model);
        log.info("LikeHandler 消费了你的数据,结束消费");

    }

    @Override
    public List<EventType> getSupportEventTypes() {
        return Arrays.asList(EventType.LIKE);
    }
}

测试

ProducerController
package com.pig.aboutredis.messagequeue.controller;


import com.pig.aboutredis.messagequeue.common.EventModel;
import com.pig.aboutredis.messagequeue.common.EventType;
import com.pig.aboutredis.messagequeue.producer.EventProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {


    @Autowired
    private EventProducer producer;


    @GetMapping("/queue")
    public String redisQueue(){
        EventModel model=new EventModel();
        model.setType(EventType.LIKE);
        model.setActorId(11);
        producer.fireEvent(model);
        model.setActorId(22);
        producer.fireEvent(model);
        model.setActorId(33);
        producer.fireEvent(model);
        model.setActorId(44);
        producer.fireEvent(model);
        model.setActorId(55);
        producer.fireEvent(model);
        return "i am your father";
    }
}

http://localhost:8080/aboutredis/queue

日志结果

2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=11, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=22, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=33, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.538 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=44, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费
2020-04-02 22:28:53.623 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,开始消费
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,消费EventModel{type=LIKE, actorId=55, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消费了你的数据,结束消费



目的

写这个博客的目的是:熟悉redis和springboot
熟悉redis和springboot不能只是停留在api的使用阶段。
而是要使用起来得心应手,就是想用redis和springboot能干的事情,都能够很轻松的写出来。
一句话:可以用它来展现自己所想。

后期有机会研究一下下面专题:(百度“redis实现”出现的关键词)

  1. redis实现session共享
  2. redis实现分布式锁原理
  3. redis实现分布式事务
  4. redis实现布隆过滤器
原文地址:https://www.cnblogs.com/windy13/p/12623609.html