Mybaties插入、修改和删除操作自动同步至Elasticsearch

前段时间写了一个项目,用于拦截mybaties插入、更新和删除操作,并将修改动作自动同步至elasticsearch。项目代码中的更新操作带上了@Param注解,用于定位更新的数据,且第一个参数用的是唯一键。上文已经介绍了Mybaties插件基本原理,项目基于该原理实现数据库操作的同步。

代码已开源至GitHub,点击地址查看源码。

本项目运行的Elasticsearch版本是7.9.0,java客户端采用spring-data-elasticsearch-4.0.0,之前的很多接口方法都过时。

为了更清晰的理解这个同步器,项目中简单模拟了一个简单的电商场景,包含下单、支付、发货、收货和删除操作,数据库含两张表:订单主表“test_trade_order”和商品明细表“test_trade_order_line”,具体字段可看项目源码。我这里给出了Elasticsearch实体,字段和数据库对应。

下面列出了部分代码。

1. OrderEntity和OrderLineEntity

/**
 * 单据ES实体
 *
 * @author Doflamingo
 */
@Data
// 用了表达式生成索引名
@Document(indexName = "#{esConfig.name(T (com.mingo.es.sync.constant.type.EsIndexType).ORDER)}")
public class OrderEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    @Field(type = FieldType.Keyword)
    private String id;

    @Field(type = FieldType.Keyword)
    private String tradeNo;

    @Field(type = FieldType.Long)
    private Long buyerId;

    @Field(type = FieldType.Long)
    private Long sellerId;

    @Field(type = FieldType.Integer)
    private Integer type;

    @Field(type = FieldType.Integer)
    private Integer status;

    @Field(type = FieldType.Double)
    private BigDecimal amount;

    @Field(type = FieldType.Double)
    private BigDecimal discountAmount;

    @Field(type = FieldType.Double)
    private BigDecimal originAmount;

    @Field(type = FieldType.Double)
    private BigDecimal payAmount;

    @Field(type = FieldType.Date)
    private Date payTime;

    @Field(type = FieldType.Date)
    private Date deliveryTime;

    @Field(type = FieldType.Date)
    private Date receivingTime;

    @Field(type = FieldType.Date)
    private Date createTime;

    @Field(type = FieldType.Date)
    private Date updateTime;

    @Field(type = FieldType.Integer)
    private Integer version;

    @Field(type = FieldType.Text)
    private String extData;

    @Field(type = FieldType.Integer)
    private Integer deleted;

    @Field(type = FieldType.Nested)
    private List<OrderLineEntity> lines;
}
/**
 * 单据明细es Entity
 */
@Data
public class OrderLineEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    @Field(type = FieldType.Long)
    private Long id;

    @Field(type = FieldType.Keyword)
    private String tradeNo;

    @Field(type = FieldType.Text)
    private String lineNo;

    @Field(type = FieldType.Text)
    private String itemCode;

    @Field(type = FieldType.Text)
    private String itemName;

    @Field(type = FieldType.Text)
    private String unitCode;

    @Field(type = FieldType.Text)
    private String unitName;

    @Field(type = FieldType.Integer)
    private Integer type;

    @Field(type = FieldType.Double)
    private BigDecimal itemPrice;

    @Field(type = FieldType.Double)
    private BigDecimal price;

    @Field(type = FieldType.Double)
    private BigDecimal discountPrice;

    @Field(type = FieldType.Double)
    private BigDecimal itemQty;

    @Field(type = FieldType.Double)
    private BigDecimal totalPrice;

    @Field(type = FieldType.Double)
    private BigDecimal paidPrice;

    @Field(type = FieldType.Date)
    private Date createTime;

    @Field(type = FieldType.Date)
    private Date updateTime;

    @Field(type = FieldType.Integer)
    private Integer version;

    @Field(type = FieldType.Text)
    private String extData;

    @Field(type = FieldType.Integer)
    private Integer deleted;
}

2. ElasticsearchInterceptor拦截器

本文的场景还不到拦截到参数、结果集或Statement的粒度,所以将数据库表插入、更新和删除操作同步至elasticsearch,只需用mybaties插件拦截Executorint update(MappedStatement ms, Object parameter) throws SQLException方法,也就是业务在执行insert、update或者delete操作前后都会被该方法拦截。将操作拦截下来,判断结果是否正确或者满足,在执行elasticsearch的对应操作。

代码如下

/**
 * MyBaties 同步Elasticsearch拦截
 * Executor.update(MappedStatement ms, Object parameter)
 *
 * @author Doflamingo
 */
@Component
@Slf4j
@Intercepts({
        @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class ElasticsearchInterceptor implements Interceptor {

    /**
     * ES同步处理器
     */
    private final Map<String, AbstractEsSyncHandler> handlerMap = new HashMap<>();

    /**
     * 拦截处理
     *
     * @param invocation
     * @return
     * @throws Throwable
     */
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object res = invocation.proceed();

        Object[] args = invocation.getArgs();
        if (2 == args.length) {
            MappedStatement statement = (MappedStatement) args[0];

            // mapper中的方法
            // 如 com.mingo.es.sync.mybaties.mapper.OrderMapper.insert
            String key = statement.getId();
            // 处理
            AbstractEsSyncHandler esSynHandler = handlerMap.get(key);
            if (null != esSynHandler) {
                try {
                    esSynHandler.handler(statement, args[1], res);
                } catch (Exception e) {
                    log.error("ES同步异常:", e);
                    // 为了简便,示例中这里抛出了异常,会影响业务,如果是含事务场景将会回滚数据库
                    // 如果不想影响业务,不是强实时,这里可以走顺序消息,异步存入ES
                    throw new EsSyncException("ES同步异常:" + e.getMessage());
                }
            }
        }

        return res;
    }

    /**
     * 生成代理对象
     *
     * @param target
     * @return
     */
    @Override
    public Object plugin(Object target) {
        // 代理对象,用于触发intercept()
        return Plugin.wrap(target, this);
    }

    @Override
    public void setProperties(Properties properties) {
    }

    /**
     * 注册es处理器
     *
     * @param key
     * @param synHandler
     */
    public void regHandler(String key, AbstractEsSyncHandler synHandler) {
        handlerMap.put(key, synHandler);
    }
}

3. ES同步处理器

这里只给出了插入和更新处理。

/**
 * 插入数据es同步处理器
 *
 * @author Doflamingo
 */
@Slf4j
@Component("insertEsSyncHandler")
class InsertSyncHandlerImpl extends AbstractEsSyncHandler {

    public InsertSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
        // 注册
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.insert", this);
    }

    /**
     * 创建文档
     *
     * @param statement
     * @param parameter
     */
    private void create(MappedStatement statement, Object parameter) {
        log.info("保存索引:{}", ((OrderDO) parameter).getTradeNo());
        // es实体
        OrderEntity document = convertor.doToEsEntity((OrderDO) parameter);
        // 保存
        OrderEntity save = repository.save(document);
        log.info("保存索引成功:{}", save.getId());
    }

    /**
     * 同步处理器
     *
     * @param statement
     * @param parameter
     */
    @Override
    public void handler(MappedStatement statement, Object parameter, Object res) {
        // sql执行失败不会保存,这里判断了返回结果是否null或者大于0
        if (this.checkResult(res)) {
            this.create(statement, parameter);
        }
    }
}
/**
 * 更新数据es同步处理器
 *
 * @author Doflamingo
 */
@Slf4j
@Component("updateEsSyncHandler")
class UpdateSyncHandlerImpl extends AbstractEsSyncHandler {

    public UpdateSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
        // 注册
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updatePayStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateDeliveryStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateReceivingStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateStatus", this);
    }

    /**
     * 修改文档
     *
     * @param statement
     * @param parameter
     */
    private void update(MappedStatement statement, Object parameter) {
        String tradeNo = null;
        Object obj = ((MapperMethod.ParamMap) parameter).get("param1");
        if (obj instanceof OrderDO) {
            tradeNo = ((OrderDO) obj).getTradeNo();
        } else {
            tradeNo = obj.toString();
        }
        Optional<OrderEntity> optional = repository.findById(tradeNo);
        log.info("修改索引:{}", tradeNo);
        optional.ifPresent(orderEntity -> {
            this.updateEntity(orderEntity, (MapperMethod.ParamMap) parameter);
            repository.save(orderEntity);
            log.info("修改索引成功:{}", orderEntity.getTradeNo());
        });
    }

    /**
     * 更新索引字段
     *
     * @param orderEntity
     * @param paramMap
     */
    private void updateEntity(OrderEntity orderEntity, MapperMethod.ParamMap paramMap) {
        Field[] fields = orderEntity.getClass().getDeclaredFields();
        Map<String, Field> fieldMap = Arrays.stream(fields).collect(Collectors.toMap(Field::getName, Function.identity()));
        paramMap.forEach((k, v) -> {
            if (!"tradeNo".equals(k) && !"id".equals(k) && !k.toString().startsWith("param")) {
                Field field = fieldMap.get(k);
                if (null != field) {
                    field.setAccessible(true);
                    try {
                        field.set(orderEntity, v);
                    } catch (IllegalAccessException e) {
                    }
                }
            }
        });
    }

    /**
     * 同步处理器
     *
     * @param statement
     * @param parameter
     */
    @Override
    public void handler(MappedStatement statement, Object parameter, Object res) {
        if (this.checkResult(res)) {
            this.update(statement, parameter);
        }
    }
}

4. 测试类TestOrderServiceImplTest

package com.mingo.es.sync.service.impl;

import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.util.Date;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class TestOrderServiceImplTest {

    @Autowired
    private TestOrderService testOrderService;

    @Autowired
    protected EsOrderDocRepository repository;

    // 测试单号
    private final String testTradeNo = "20200919018243198";

    // 插入一条数据
    @Test
    void create() {
        OrderDO orderDO = new OrderDO();
        orderDO.setTradeNo(testTradeNo);
        orderDO.setAmount(BigDecimal.ONE);
        orderDO.setBuyerId(9527L);
        orderDO.setCreateTime(new Date());
        orderDO.setDiscountAmount(BigDecimal.ZERO);
        orderDO.setOriginAmount(BigDecimal.ONE);
        orderDO.setType(1);
        orderDO.setSellerId(18899L);
        orderDO.setStatus(1);

        OrderLineDO lineDO = new OrderLineDO();
        lineDO.setCreateTime(orderDO.getCreateTime());
        lineDO.setDiscountPrice(BigDecimal.ZERO);
        lineDO.setItemCode("6352678819");
        lineDO.setItemName("泡椒凤爪");
        lineDO.setUnitCode("DAI");
        lineDO.setUnitName("袋");
        lineDO.setItemPrice(BigDecimal.ONE);
        lineDO.setPrice(BigDecimal.ONE);
        lineDO.setPaidPrice(BigDecimal.ONE);
        lineDO.setDiscountPrice(BigDecimal.ZERO);
        lineDO.setTotalPrice(BigDecimal.ONE);
        lineDO.setItemQty(BigDecimal.ONE);
        lineDO.setLineNo("1");
        lineDO.setTradeNo(orderDO.getTradeNo());
        lineDO.setType(1);
        orderDO.setLines(Lists.newArrayList(lineDO));
        testOrderService.create(orderDO);
    }

    // 支付操作,更新数据
    @Test
    void updatePayStatus() {
        testOrderService.updatePayStatus(testTradeNo, 3, BigDecimal.ONE, new Date());
    }
    
    // 发货回调操作,更新数据
    @Test
    void updateDeliveryStatus() {
        testOrderService.updateDeliveryStatus(testTradeNo, 5,  new Date());
    }

    // 收货回调操作,更新数据
    @Test
    void updateReceivingStatus() {
        testOrderService.updateReceivingStatus(testTradeNo, 7,  new Date());
    }

    // 取消订单,更新数据
    @Test
    void cancel() {
        testOrderService.cancel(testTradeNo);
    }

    // 物理删除操作,删除数据
    @Test
    void delete() {
        testOrderService.delete(testTradeNo);
    }
}

4.1 插入数据运行结果

数据库表:

ES索引:

4.2 支付更新数据运行结果

数据库表:

ES索引:

另外,发货、收货和逻辑删除的更新操作一样,这里没列出结果。

4.3 物理删除数据

数据库表:

ES索引:

数据同步的一致。

5. 并发测试类ConcurrentTestOrderServiceImplTest

package com.mingo.es.sync.service.impl;

import com.mingo.es.sync.document.OrderEntity;
import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class ConcurrentTestOrderServiceImplTest {

    @Autowired
    private TestOrderService testOrderService;

    @Autowired
    protected EsOrderDocRepository repository;

    private final AtomicInteger atomicInteger = new AtomicInteger(1);
    private final String YYYYMMDDHHMMSS = "yyyyMMddHHmmssSSS";
    private final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern(YYYYMMDDHHMMSS);

    public String getTradeNo() {
        return LocalDateTime.now().format(FORMATTER) + atomicInteger.getAndAdd(1);
    }

    // 12个线程
    private final ExecutorService executorService =
            new ThreadPoolExecutor(
                    12,
                    12,
                    0,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<>()
            );

    /**
     * 用于测试该时间后的数据删除:2020-09-19 22:05:33
     */
    private final Date start = new Date(1600524351243L);

    @Test
    void create() throws Exception {

        for (int i = 500; i > 0; i--) {
            // 启动
            executorService.execute(() -> {
                OrderDO orderDO = new OrderDO();
                orderDO.setTradeNo(this.getTradeNo());
                orderDO.setAmount(BigDecimal.ONE);
                orderDO.setBuyerId(9527L);
                orderDO.setCreateTime(new Date());
                orderDO.setDiscountAmount(BigDecimal.ZERO);
                orderDO.setOriginAmount(BigDecimal.ONE);
                orderDO.setType(1);
                orderDO.setSellerId(18899L);
                orderDO.setStatus(1);

                OrderLineDO lineDO = new OrderLineDO();
                lineDO.setCreateTime(orderDO.getCreateTime());
                lineDO.setDiscountPrice(BigDecimal.ZERO);
                lineDO.setItemCode("6352678819");
                lineDO.setItemName("泡椒凤爪");
                lineDO.setUnitCode("DAI");
                lineDO.setUnitName("袋");
                lineDO.setItemPrice(BigDecimal.ONE);
                lineDO.setPrice(BigDecimal.ONE);
                lineDO.setPaidPrice(BigDecimal.ONE);
                lineDO.setDiscountPrice(BigDecimal.ZERO);
                lineDO.setTotalPrice(BigDecimal.ONE);
                lineDO.setItemQty(BigDecimal.ONE);
                lineDO.setLineNo("1");
                lineDO.setTradeNo(orderDO.getTradeNo());
                lineDO.setType(1);
                orderDO.setLines(Lists.newArrayList(lineDO));
                testOrderService.create(orderDO);
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(20, TimeUnit.SECONDS);
    }

    @Test
    void updatePayStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updatePayStatus(entity.getTradeNo(), 3, BigDecimal.ONE, new Date())
        );
    }

    @Test
    void updateDeliveryStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updateDeliveryStatus(entity.getTradeNo(), 5, new Date())
        );
    }

    @Test
    void updateReceivingStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updateReceivingStatus(entity.getTradeNo(), 7, new Date())
        );
    }

    @Test
    void cancel() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.cancel(entity.getTradeNo())
        );
    }

    @Test
    void delete() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.delete(entity.getTradeNo())
        );
    }
}

spring-elasticsearch-data功能强大,可以通过方法命名简化查询。EsOrderDocRepository代码

package com.mingo.es.sync.repository;

import com.mingo.es.sync.document.OrderEntity;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

import java.util.Date;
import java.util.List;

/**
 * EsOrderDocRepository
 *
 * @author Doflamingo
 */
public interface EsOrderDocRepository extends ElasticsearchRepository<OrderEntity, String> {

    /**
     * 根据tradeNo查询单据信息
     *
     * @param tradeNo
     * @return
     */
    OrderEntity findByTradeNo(String tradeNo);

    /**
     * 查询大于某一个时刻的单据
     *
     * @return
     */
    List<OrderEntity> findByCreateTimeGreaterThan(Date createTime);
}

并发场景下插入很更新正常。

原创 Doflamingo https://www.cnblogs.com/doflamingo
原文地址:https://www.cnblogs.com/doflamingo/p/13698012.html