RabbitMQ基础组件封装实践1

1、实现基础组件实现关键点

基础组件封装设计-迅速消息发送

基础组件封装设计-确认消息发送

基础组件封装设计-延迟消息发送

2、基础组件需要实现的功能

迅速、延迟、可靠

消息异步化序列化

链接池化、高性能

完备的补偿机制

3、创建工程

rabbit-common : 公共模块

rabbit-api: 提供给第三方使用

rabbit-core-producer: 用于发送消息(核心)

rabbit-task: 用于做可靠性处理 

首先创建rabbit-parent工程

pom.xml 如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <modules>
        <module>rabbit-common</module>
        <module>rabbit-api</module>
        <module>rabbit-core-producer</module>
        <module>rabbit-task</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.16.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rabbit-parent</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>rabbit-parent</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>8</java.version>
        <fasterxml.uuid.version>3.1.4</fasterxml.uuid.version>
        <org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version>
        <druid.version>1.0.24</druid.version>
        <!--  当当网:分布式定时任务-->
        <elastic-job.version>2.1.4</elastic-job.version>
        <guava.version>20.0</guava.version>
        <commons-langs3.version>3.3.1</commons-langs3.version>
        <commons-io.version>2.4</commons-io.version>
        <commons-collections.version>3.2.2</commons-collections.version>
        <curator.version>2.11.0</curator.version>
        <fastjson.version>1.1.26</fastjson.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons-io.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
       <!--        对json格式的支持-->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>${org.codehaus.jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.uuid</groupId>
            <artifactId>java-uuid-generator</artifactId>
            <version>${fasterxml.uuid.version}</version>
        </dependency>

    </dependencies>
</project>

  

二、rabbit-api: 提供给第三方使用 

1、创建Message类

@Data
public class Message implements Serializable {


    private static final long serialVersionUID = -6142443586615984421L;

    /**
     * 消息的唯一ID
     */
    private String messageId;


    /**
     * 消息的主题
     */
    private String topic;

    /**
     * 消息的路由规则
     */
    private String routingKey = "";

    /**
     * 消息的附加属性
     */
    private Map<String,Object> attributes = new HashMap<>();

    /**
     * 延迟消息的参数配置
     */
    private int delayMills;

    /**
     * 消息类型.默认为Confirm消息
     */
    private String messageType = MessageType.CONFIRM;

    public Message(){

    }

    public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills, String messageType) {
        this.messageId = messageId;
        this.topic = topic;
        this.routingKey = routingKey;
        this.attributes = attributes;
        this.delayMills = delayMills;
        this.messageType = messageType;
    }
}

  

2、创建消息类型类

public final class MessageType {

    /**
     * 迅速消息: 不需要保障消息的可靠性,也不需要做confirm确认
     */
    public  final static String RAPID = "0";


    /**
     * 确认消息: 不需要保障消息的可靠性,但是会做消息confirm确认
     */
    public  final static String CONFIRM = "1";


    /**
     * 可靠性消息: 一定消息保障消息100%投递,不允许有任何的消息丢失
     * PS: 保障数据库和所发的消息的原子性操作(最终一致的)
     */
    public  final static String RELIANT = "2";
}

  

3、构建消息类,采用构造者模式

public class MessageBuilder {

    /**
     * 消息的唯一ID
     */
    private String messageId;


    /**
     * 消息的主题
     */
    private String topic;

    /**
     * 消息的路由规则
     */
    private String routingKey = "";

    /**
     * 消息的附加属性
     */
    private Map<String,Object> attributes = new HashMap<>();

    /**
     * 延迟消息的参数配置
     */
    private int delayMills;

    /**
     * 消息类型.默认为Confirm消息
     */
    private String messageType = MessageType.CONFIRM;


    private MessageBuilder(){

    }

    public static MessageBuilder create(){
        return new MessageBuilder();
    }

    public MessageBuilder withMessageId(String messageId){
        this.messageId = messageId;
        return  this;
    }

    public MessageBuilder withTopic(String topic){
        this.topic = topic;
        return  this;
    }

    public MessageBuilder withRoutingKey(String routingKey){
        this.routingKey = routingKey;
        return  this;
    }

    public MessageBuilder withAttributes(Map<String,Object> attributes){
        this.attributes = attributes;
        return  this;
    }

    public MessageBuilder withAttribute(String key, Object object){
        this.attributes.put(key,object);
        return  this;
    }

    public MessageBuilder withMessageId(int delayMills){
        this.delayMills = delayMills;
        return  this;
    }

    public MessageBuilder withMessageType(String messageType){
        this.messageType = messageType;
        return  this;
    }

    public Message build(){
        if(messageId == null){
            messageId = UUID.randomUUID().toString();
        }
        if(topic == null){
            throw new MessageRunTimeException("this topic is null");
        }
        Message message = new Message(messageId,topic,routingKey,attributes,delayMills,messageType);
        return  message;
    }


}

  

4、创建异常类

MessageException类

public class MessageException  extends  Exception{


    private static final long serialVersionUID = -8283764568495174322L;

    public MessageException(){
        super();
    }

    public MessageException(String message){
        super(message);
    }

    public MessageException(String message, Throwable cause){
        super(message, cause);
    }

    public MessageException( Throwable cause){
        super( cause);
    }
}

  

创建MessageRunTimeException 类

public class MessageRunTimeException extends  RuntimeException{


    private static final long serialVersionUID = -2591307228826723236L;

    public MessageRunTimeException(){
        super();
    }

    public MessageRunTimeException(String message){
        super(message);
    }

    public MessageRunTimeException(String message, Throwable cause){
        super(message, cause);
    }

    public MessageRunTimeException(Throwable cause){
        super( cause);
    }
}

  

5、创建消息生成者接口

public interface MessageProducer {

    void send(Message message) throws MessageRunTimeException;

    /**
     * 消息的发送,附带SendCallback回调执行响应的业务逻辑处理
     * @param message
     * @throws MessageRunTimeException
     */
    void send(Message message, SendCallback sendCallback) throws MessageRunTimeException;

    void send(List<Message> messages) throws MessageRunTimeException;
}

  

发送回调接口。 回调函数处理

public interface SendCallback {

    void onSuccess();

    void onFailure();
}

  

6、创建消费者监听消息类

public interface MessageListener {

    void onMessage(Message message);
}

 

三、rabbit-common : 公共模块 

1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-common</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>rabbit-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
    </dependencies>

</project>

  

2、创建序列化和反序列化接口

public interface Serializer {

    byte[] serializeRaw(Object data);

    String serialize(Object data);

    byte[] deserializeRaw(Object data);

    <T> T deserialize(String content);

    <T> T deserialize(byte[] content);

}

  

3、创建接口SerializerFactory

public interface SerializerFactory {
    Serializer create();
}

  

4、序列化反序列化实现

public class JacksonSerializer implements Serializer {

    private static final Logger LOGGER = LoggerFactory.getLogger(JacksonSerializer.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true);
        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
        mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
    }

    private final JavaType type;

    private JacksonSerializer(JavaType type) {
        this.type = type;
    }

    public JacksonSerializer(Type type) {
        this.type = mapper.getTypeFactory().constructType(type);
    }

    public static JacksonSerializer createParametricType(Class<?> cls) {
        return new JacksonSerializer(mapper.getTypeFactory().constructType(cls));
    }

    @Override
    public byte[] serializeRaw(Object data) {
        try {
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            LOGGER.error("序列化出错", e);
        }
        return null;
    }

    @Override
    public String serialize(Object data) {
        try {
            return mapper.writeValueAsString(data);
        } catch (JsonProcessingException e) {
            LOGGER.error("序列化出错", e);
        }
        return null;
    }

    @Override
    public byte[] deserializeRaw(Object data) {
        return new byte[0];
    }

    @Override
    public <T> T deserialize(String content) {
        try {
            return mapper.readValue(content, type);
        } catch (IOException e) {
            LOGGER.error("反序列化出错", e);
        }
        return null;
    }

    @Override
    public <T> T deserialize(byte[] content) {
        try {
            return mapper.readValue(content, type);
        } catch (IOException e) {
            LOGGER.error("反序列化出错", e);
        }
        return null;
    }

}

  

 5、序列化工厂实现

public class JacksonSerializerFactory implements SerializerFactory{

	public static final SerializerFactory INSTANCE = new JacksonSerializerFactory();
	
	@Override
	public Serializer create() {
		return JacksonSerializer.createParametricType(Message.class);
	}

}

  

6、普通消息Convert

public class GenericMessageConverter implements MessageConverter {
	
	private Serializer serializer;

	public GenericMessageConverter(Serializer serializer) {
		Preconditions.checkNotNull(serializer);
		this.serializer = serializer;
	}

	/**
	 * 使用序列化: org.springframework.amqp.core.Message 转为Object
	 */
	@Override
	public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
		return this.serializer.deserialize(message.getBody());
	}

	/**
	 * 使用反序列化 将自己的object转换为org.springframework.amqp.core.Message
	 */
	@Override
	public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
			throws MessageConversionException {
		return new org.springframework.amqp.core.Message(this.serializer.serializeRaw(object), messageProperties);
	}

}

  

  

7、扩展消息Convert,

引用了GenericMessageConverter,可以理解为装饰者模式。比如设置过期时间。

public class RabbitMessageConverter implements MessageConverter {

	private GenericMessageConverter delegate;
	
//	private final String delaultExprie = String.valueOf(24 * 60 * 60 * 1000);
	
	public RabbitMessageConverter(GenericMessageConverter genericMessageConverter) {
		Preconditions.checkNotNull(genericMessageConverter);
		this.delegate = genericMessageConverter;
	}
	
	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
//		messageProperties.setExpiration(delaultExprie);
		com.example.api.Message message = (com.example.api.Message)object;
		messageProperties.setDelay(message.getDelayMills());
		return this.delegate.toMessage(object, messageProperties);
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		com.example.api.Message msg = (com.example.api.Message) this.delegate.fromMessage(message);
		return msg;
	}

}

  

四、rabbit-core-producer

1、增加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-core-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>rabbit-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>7.0.0-beta1</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

</project>

 

2、自动装配

创建META-INF文件夹下创建spring.facories文件

 内容为:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.example.producer.autoconfigure.RabbitProducerAutoConfiguration

  

3、发送消息的实际实现类

@Component
public class ProducerClient implements MessageProducer {

    @Autowired
    RabbitBroker rabbitBroker;

    @Override
    public void send(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message.getTopic());
        String messageType = message.getMessageType();
        switch (messageType){
            case MessageType.RAPID:
                rabbitBroker.rapidSend(message);
                break;
            case MessageType.CONFIRM:
                rabbitBroker.confirmSend(message);
                break;
            case MessageType.RELIANT:
                rabbitBroker.reliantSend(message);
                break;
            default:
                break;
        }
    }

    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {

    }

    @Override
    public void send(List<Message> messages) throws MessageRunTimeException {

    }
}

  

4、创建接口 RabbitBroker 

 作用: 具体发送不同种类消息的接口

public interface RabbitBroker {

    void rapidSend(Message message);
    void confirmSend(Message message);
    void reliantSend(Message message);
    void sendMessages();
}

  

创建实现类RabbitBrokerImpl 

@Slf4j
@Component
public class RabbitBrokerImpl implements  RabbitBroker {

    @Autowired
    private RabbitTemplateContainer rabbitTemplateContainer;


    @Autowired
    private MessageStoreService messageStoreService;

    @Override
    public void rapidSend(Message message) {
        message.setMessageType(MessageType.RAPID);
        sendKernel(message);
    }

    /**
     * 发送消息的核心方法, 使用异步线程池进行发送消息
     * @param message
     */
    private void sendKernel(Message message) {
        AsyncBaseQueue.submit(() -> {
            CorrelationData correlationData = new CorrelationData(
                    String.format("%s#%s", message.getMessageId(), System.currentTimeMillis()));
            String topic = message.getTopic();
            String routingKey = message.getRoutingKey();
            RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
            rabbitTemplate.convertAndSend(topic,routingKey, message, correlationData);
            log.info("#RabbitBrokerImpl.sendKernel# 发送消息到RabbitMQ,messageId={}", message.getMessageId());
        });

    }

    @Override
    public void confirmSend(Message message) {
        message.setMessageType(MessageType.CONFIRM);
        sendKernel(message);
    }

    @Override
    public void reliantSend(Message message) {
        message.setMessageType(MessageType.RELIANT);
        BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
        if(bm == null) {
            //1. 把数据库的消息发送日志先记录好
            Date now = new Date();
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setMessageId(message.getMessageId());
            brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
            //tryCount 在最开始发送的时候不需要进行设置
            brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
            brokerMessage.setCreateTime(now);
            brokerMessage.setUpdateTime(now);
            brokerMessage.setMessage(message);
            messageStoreService.insert(brokerMessage);
        }
        //2. 执行真正的发送消息逻辑
        sendKernel(message);
    }

    @Override
    public void sendMessages() {

    }
}

  

创建异步消息队列(使用线程池)

@Slf4j
public class AsyncBaseQueue {

    private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();

    private static final int QUEUE_SIZE = 10000;

    private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_SIZE),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("rabbitmq_client_async_sender");
                    return t;
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    log.error("async sender is error rejected, runnable:{}, executor:{}", r, executor);
                }
            }
    );

    public static  void submit(Runnable runnable){
        senderAsync.submit(runnable);
    }

}

  

池化RabbitTemplate

/**
 * @description: 池化RabbitTemplate
 * 每一个topic对应一个RabbitTemplate的好处
 * 1、提高发送的效率
 * 2、可以根据不同的需求定制不同的RabbitTemplate,比如每一个topic都有自己的routingKey规则
 * @author:
 * @create: 2020-08-01 17:22
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

    private Map<String /* TOPIC */, RabbitTemplate> rabbitTemplateMap = Maps.newConcurrentMap();

    private Splitter splitter = Splitter.on("#");

    private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;

    @Autowired
    private ConnectionFactory connectionFactory;


    @Autowired
    private MessageStoreService messageStoreService;

    public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message);
        String topic = message.getTopic();
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(topic);
        if(rabbitTemplate != null){
            return rabbitTemplate;
        }
        log.info("topic={} is not exists, create", topic);
        RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
        newTemplate.setExchange(topic);
        newTemplate.setRoutingKey(message.getRoutingKey());
        newTemplate.setRetryTemplate(new RetryTemplate());

        // 添加序列化反序列化和converter对象
        Serializer serializer = serializerFactory.create();
        GenericMessageConverter gmc = new GenericMessageConverter(serializer);
        RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
        newTemplate.setMessageConverter(rmc);

        String messageType = message.getMessageType();
        if(!MessageType.RAPID.equals(messageType)){
            newTemplate.setConfirmCallback(this);
        }
        rabbitTemplateMap.putIfAbsent(topic,newTemplate);

        return  rabbitTemplateMap.get(topic);

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //消息应答
       List<String> strings =  splitter.splitToList(correlationData.getId());
       String messageId = strings.get(0);
       long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
       if(ack){
           log.info("发送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }else {
           log.info("发送消息失败,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }
        if(ack) {
            //	当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK

            // 	如果当前消息类型为reliant 我们就去数据库查找并进行更新
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.succuess(messageId);
            }
            log.info("发送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
        } else {
            log.info("发送消息失败,confirm messageId={}, sendTime={}" , messageId, sendTime);

        }
    }
}

  

 5、数据准备

创建数据库和表

createtable.sql

USE broker_message;

CREATE TABLE IF NOT EXISTS borker_message (
	message_id VARCHAR(128) NOT NULL,
	message VARCHAR(4000),
	try_count INT(4) DEFAULT 0,
	STATUS VARCHAR(10) DEFAULT '',
	next_retry TIMESTAMP ,
	create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
	update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
	PRIMARY KEY(message_id)

) ENGINE=INNODB DEFAULT CHARSET=utf8;

  

rabbit-core-producer工程下

创建BorkerMessage类

public class BrokerMessage implements Serializable {
	
	private static final long serialVersionUID = 7447792462810110841L;

	private String messageId;

    private Message message;

    private Integer tryCount = 0;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId == null ? null : messageId.trim();
    }

    public Message getMessage() {
		return message;
	}

	public void setMessage(Message message) {
		this.message = message;
	}

	public Integer getTryCount() {
        return tryCount;
    }

    public void setTryCount(Integer tryCount) {
        this.tryCount = tryCount;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status == null ? null : status.trim();
    }

    public Date getNextRetry() {
        return nextRetry;
    }

    public void setNextRetry(Date nextRetry) {
        this.nextRetry = nextRetry;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}

  

定义BrokerMessageMapper 接口

@Mapper
public interface BrokerMessageMapper {
	
    int deleteByPrimaryKey(String messageId);
    
    int insert(BrokerMessage record);

    int insertSelective(BrokerMessage record);

    BrokerMessage selectByPrimaryKey(String messageId);

    int updateByPrimaryKeySelective(BrokerMessage record);

    //int updateByPrimaryKeyWithBLOBs(BrokerMessage record);

    int updateByPrimaryKey(BrokerMessage record);
	
	void changeBrokerMessageStatus(@Param("brokerMessageId") String brokerMessageId, @Param("brokerMessageStatus") String brokerMessageStatus, @Param("updateTime") Date updateTime);

	List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus") String brokerMessageStatus);
	
	List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus") String brokerMessageStatus);
	
	int update4TryCount(@Param("brokerMessageId") String brokerMessageId, @Param("updateTime") Date updateTime);
	
}

增加BrokerMessageMapper.xml文件

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.producer.mapper.BrokerMessageMapper" >
  <resultMap id="BaseResultMap" type="com.example.producer.entity.BrokerMessage" >
    <id column="message_id" property="messageId" jdbcType="VARCHAR" />
    <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.example.common.mybatis.handler.MessageJsonTypeHandler" />
    <result column="try_count" property="tryCount" jdbcType="INTEGER" />
    <result column="status" property="status" jdbcType="VARCHAR" />
    <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <sql id="Base_Column_List" >
    message_id, message, try_count, status, next_retry, create_time, update_time
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
    select 
    <include refid="Base_Column_List" />
    from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
    delete from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </delete>
  <insert id="insert" parameterType="com.example.producer.entity.BrokerMessage" >
    insert into broker_message (message_id, message, try_count, 
      status, next_retry, create_time, 
      update_time)
    values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER}, 
      #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" parameterType="com.example.producer.entity.BrokerMessage" >
    insert into broker_message
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        message_id,
      </if>
      <if test="message != null" >
        message,
      </if>
      <if test="tryCount != null" >
        try_count,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="nextRetry != null" >
        next_retry,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        #{messageId,jdbcType=VARCHAR},
      </if>
      <if test="message != null" >
        #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.example.producer.entity.BrokerMessage" >
    update broker_message
    <set >
      <if test="message != null" >
        message = #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.example.producer.entity.BrokerMessage" >
    update broker_message
    set message = #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
      try_count = #{tryCount,jdbcType=INTEGER},
      status = #{status,jdbcType=VARCHAR},
      next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  

  <update id="changeBrokerMessageStatus" >
    update broker_message bm
    set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
      	bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}  
  </update>
  
  
  <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
  	<![CDATA[
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
	    and bm.next_retry < sysdate()
    ]]>
  </select>
  
  <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
  </select>
  
  
   <update id="update4TryCount" >
    update broker_message bm
    set bm.try_count = bm.try_count + 1,
      bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
   </update>
   
   
</mapper>

 

增加配置文件rabbit-producer-message.properties

使用了阿里巴巴的druid的数据源

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://118.xx.xx.101:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=xxx
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

  

增加数据库配置

1) RabbitProducerDataSourceConfiguration 增加数据源,并读取配置文件

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
	
	private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
	
	@Value("${rabbit.producer.druid.type}")
	private Class<? extends DataSource> dataSourceType;
	
	@Bean(name = "rabbitProducerDataSource")
	@Primary
	@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
	public DataSource rabbitProducerDataSource() throws SQLException {
		DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
		LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
		return rabbitProducerDataSource;
	}
	
    public DataSourceProperties primaryDataSourceProperties(){
        return new DataSourceProperties();
    }
    
    public DataSource primaryDataSource(){
        return primaryDataSourceProperties().initializeDataSourceBuilder().build();
    }
	
}

  

2)执行SQL脚本createtable.sql,如果表不存在,则创建表

@Configuration
public class BrokerMessageConfiguration {

    @Autowired
    private DataSource rabbitProducerDataSource;
    
    @Value("classpath:createtable.sql")
    private Resource schemaScript;
    
    @Bean
    public DataSourceInitializer initDataSourceInitializer() {
    	System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
        final DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(rabbitProducerDataSource);
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }

    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }
}

  

 3) 整合Mybatis

@Configuration
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {

	@Resource(name= "rabbitProducerDataSource")
	private DataSource rabbitProducerDataSource;
	
	@Bean(name="rabbitProducerSqlSessionFactory")
	public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
		
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(rabbitProducerDataSource);
		ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		try {
			bean.setMapperLocations(resolver.getResources("classpath:com/example/producer/mapping/*.xml"));
			SqlSessionFactory sqlSessionFactory = bean.getObject();
			sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
			return sqlSessionFactory;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	@Bean(name="rabbitProducerSqlSessionTemplate")
	public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
	
}

  

4) 配置扫描

@Configuration
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
	
	@Bean(name="rabbitProducerMapperScannerConfigurer")
    public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.example.producer.mapper");
        return mapperScannerConfigurer;
    }

}

   

原文地址:https://www.cnblogs.com/linlf03/p/13414434.html