【mysql】mysql增加version字段实现乐观锁,实现高并发下的订单库存的并发控制,通过开启多线程同时处理模拟多个请求同时到达的情况 + 同一事务中使用多个乐观锁的情况处理

mysql增加version字段实现乐观锁,实现高并发下的订单库存的并发控制,通过开启多线程同时处理模拟多个请求同时到达的情况

=============================================================

完整的代码请到GIthub查看:https://github.com/AngelSXD/swapping

多个线程处理完后再做事情:https://www.cnblogs.com/sxdcgaq8080/p/9456006.html

=============================================================

先说说同一个事务中使用一个乐观锁的情况:

核心功能点:

1.先做查询 【查询时候把version带出来】

<select id="findByUid" parameterType="String" resultType="com.sxd.swapping.domain.GoodsStock">
        select
        uid uid,
        version version,
        sale_num saleNum,
        stock stock
        from
        goods_stock
        where
        uid = #{uid}

    </select>

2.再做更新【更新的时候判断version是不是查出来时候的version,如果是,则更新,更新时顺便version+1即可。否则不更新】

    <update id="updateStock" parameterType="com.sxd.swapping.domain.GoodsStock">
        update
        goods_stock
        set
        <if test="stock != -1">
            stock = stock - #{buyNum},
        </if>
        sale_num = sale_num + #{buyNum},
        version  = version + 1
        where
        uid = #{uid}
        and
        version = #{version}
    </update>

=============================================================

1.实体对应数据表

/**
 * 低配版本的 商品库存表
 */
@Entity
@Table
@Getter
@Setter
public class GoodsStock  extends BaseBean {

    private String goodsName;//商品名称

    private String goodsPrice;//商品价格

    private Long buyNum;//购买数量

    private Long saleNum;//销售量

    private Long stock;//商品库存       库存为-1  代表无限量库存

    private Integer version;//版本号

    @Transient
    private Integer threadCount;//模拟并发访问的线程数量 实际业务中不用这个字段  仅用作本次测试接口使用

}
View Code

2.mybatis的mapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.sxd.swapping.dao.mybatis.GoodsStockMapper">

    <update id="updateStock" parameterType="com.sxd.swapping.domain.GoodsStock">
        update
        goods_stock
        set
        <if test="stock != -1">
            stock = stock - #{buyNum},
        </if>
        sale_num = sale_num + #{buyNum},
        version  = version + 1
        where
        uid = #{uid}
        and
        version = #{version}
    </update>


    <select id="findByUid" parameterType="String" resultType="com.sxd.swapping.domain.GoodsStock">
        select
        uid uid,
        version version,
        sale_num saleNum,
        stock stock
        from
        goods_stock
        where
        uid = #{uid}

    </select>

</mapper>
View Code

mybatis的mapper.java

package com.sxd.swapping.dao.mybatis;

import com.sxd.swapping.domain.GoodsStock;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

@Mapper
public interface GoodsStockMapper {

    int updateStock(GoodsStock goodsStock);

    GoodsStock findByUid(@Param("uid") String uid);
}
View Code

3.serviceImpl层代码

    @Autowired
    GoodsStockMapper mapper;

    /**
     * 数据库加 version 版本号
     *
     * 实现 数据库乐观锁
     *
     * 实现高并发下库存的并发控制机制
     *
     * 要保证事务一致性,要么都使用mybatis  要么都使用jpa
     * @param map
     * @param entity
     * @param threadNum
     * @return
     */
    @Override
    @Transactional
    public void updateStock(Map<Integer,String> map, GoodsStock entity, Integer threadNum) {

        String  uid = entity.getUid();
        Long buyNum = entity.getBuyNum();
        String msg = "";
        //判断库存是否足够
        GoodsStock old = mapper.findByUid(uid);
        Long stock = old.getStock();
        System.out.println("线程"+threadNum+"---------->正在工作");
        if (stock >= buyNum){
            old.setBuyNum(buyNum);
            if (mapper.updateStock(old) > 0 ){
                msg = "库存扣除成功,剩余库存数量:";
            }else {
                msg = "库存扣除失败,剩余库存数量:";
            }
            Long nowStock = mapper.findByUid(uid).getStock();
            msg +=nowStock;
        }else {
            msg = "库存不足,剩余库存数量:"+stock;
        }
        map.put(threadNum,msg);
    }
View Code

4.controller层代码:

 /**
     * uid代表            同一时间 大家都来买这一件东西
     * threadCount代表    同时会有多少人在操作
     * buyNum代表         同一个人的一次购买量
     * @param entity
     * @return
     */
    @RequestMapping(value = "/concurrentStock",method = RequestMethod.POST)
    public UniVerResponse<Map<Integer,String>> concurrentStock(@RequestBody GoodsStock entity){
        UniVerResponse.checkField(entity,"uid","threadCount","buyNum");
        UniVerResponse<Map<Integer,String>> res = new UniVerResponse<>();

        String uid = entity.getUid();

        GoodsStock old = service.findByUid(uid);
        if (old != null){
            //设置一个线程安全的Map记录各个线程是否成功执行
            Map<Integer,String> map = new ConcurrentHashMap<Integer, String>();


            Integer threadCount = entity.getThreadCount();
            //所有线程阻塞,然后统一开始
            CountDownLatch begin = new CountDownLatch(1);

            //主线程阻塞,直到所有分线程执行完毕
            CountDownLatch end = new CountDownLatch(threadCount);

            //开始多线程
            begin.countDown();
            for (Integer i = 0; i < threadCount; i++) {
                Runnable runnable = buyGoods(map,entity,i,begin,end);
                new Thread(runnable).start();
            }

            //多个线程都执行结束
            try {
                end.await();
                res.beTrue(map);
            } catch (InterruptedException e) {
                e.printStackTrace();
                res.beFalse("多线程执行失败",UniVerResponse.ERROR_BUSINESS,null);
            }
        }else {
            res.beFalse("商品不存在",UniVerResponse.ERROR_BUSINESS,null);
        }
        return  res;
    }


    //多线程的方法
    public Runnable buyGoods(Map<Integer,String> map, GoodsStock entity, Integer threadNum,CountDownLatch begin,CountDownLatch end){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {

                try {
                    System.out.println("线程"+threadNum+":--------------------->开始工作");
                    begin.await();

                    service.updateStock(map,entity,threadNum);

                    end.countDown();
                    System.out.println("线程"+threadNum+":--------------------->结束工作");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        return runnable;
    }
View Code

5.发送请求

第一次请求:

下图所示,仅有线程编号为3的 线程  购买成功,其他都购买失败。

第二次请求:

第三次请求:

第四次请求:

最后一次请求:

二.再说说在同一个事务中使用多个乐观锁的情况

===============================================================================================

下面仅写一段代码举个例子即可:

即 第一步操作,第二步 都会使用乐观锁

如果执行失败有两种情况:

  1.数据库连接断开,sql真正的执行出错

  2.sql成功执行,但是其实update执行失败,因为version对应不起来

所以需要注意的是 如果使用乐观锁执行失败[失败情况2],那么需要自己手动去抛出异常,去保证事务的一致性!!!

因为失败情况1自己会抛出RuntimeException

因为下面示例代码中的第一步操作如果失败了会直接返回  所以并没有去抛异常

 /**
     * 进行兑换
     *
     * 1.减少会员积分总数[加乐观锁]
     *
     * 2.减少商品库存 增加商品销量[加乐观锁]
     *
     * 3.新增兑换记录
     *
     *
     * @param entity
     * @return
     */
    @Override
    @Transactional
    public boolean insert(ExchangeOrder entity,String integralUid,Integer buyIntegral) {

        boolean isSuccess = false;

        //1.减少会员积分
        IntegralDetail integralDetail = integralDetailMapper.findByIntegralId(integralUid);
        integralDetail.setIntegralValue(buyIntegral);//sql 做减操作
        isSuccess = (integralDetailMapper.deductIntegral(integralDetail) > 0);

        if (isSuccess){
            //2.减少商品库存  增加商品销量
            IntegralGoods integralGoods = integralGoodsMapper.findByUid(entity.getIntegralGoodsId());
            //无限库存不做修改
            if (integralGoods.getStock() != -1) {
                integralGoods.setStock(entity.getBuyNum());
            }
            //增加销量
            integralGoods.setSaleNum(entity.getBuyNum());
            integralGoods.initUpdateDataMen();
            isSuccess = (integralGoodsMapper.updateStock(integralGoods) > 0);

            if (isSuccess){
                //3.新增兑换记录
                mapper.insert(entity);
            }else{
                throw new RunException("销量增加失败,请稍后再试");
            }
        }

        return isSuccess;
    }
原文地址:https://www.cnblogs.com/sxdcgaq8080/p/9454161.html