Posts Spring Boot & MyBatis 实现乐观锁和悲观锁
Post
Cancel

Spring Boot & MyBatis 实现乐观锁和悲观锁

以转账操作为例,实现并测试乐观锁和悲观锁。

全部代码:https://github.com/imcloudfloating/Lock_Demo

死锁问题

当 A, B 两个账户同时向对方转账时,会出现如下情况:

时刻事务 1 (A 向 B 转账)事务 2 (B 向 A 转账)
T1Lock ALock B
T2Lock B (由于事务 2 已经 Lock A,等待)Lock A (由于事务 1 已经 Lock B,等待)

由于两个事务都在等待对方释放锁,于是死锁产生了,解决方案:按照主键的大小来加锁,总是先锁主键较小或较大的那行数据。

建立数据表并插入数据(MySQL)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
create table account
(
    id      int auto_increment
        primary key,
    deposit decimal(10, 2) default 0.00 not null,
    version int            default 0    not null
);

INSERT INTO vault.account (id, deposit, version) VALUES (1, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (2, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (3, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (4, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (5, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (6, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (7, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (8, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (9, 1000, 0);
INSERT INTO vault.account (id, deposit, version) VALUES (10, 1000, 0);

Mapper 文件

悲观锁使用 select … for update,乐观锁使用 version 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.cloud.demo.mapper.AccountMapper">
    <select id="selectById" resultType="com.cloud.demo.model.Account">
        select *
        from account
        where id = #{id}
    </select>
    <update id="updateDeposit" keyProperty="id" parameterType="com.cloud.demo.model.Account">
        update account
        set deposit=#{deposit},
            version = version + 1
        where id = #{id}
          and version = #{version}
    </update>
    <select id="selectByIdForUpdate" resultType="com.cloud.demo.model.Account">
        select *
        from account
        where id = #{id} for
        update
    </select>
    <update id="updateDepositPessimistic" keyProperty="id" parameterType="com.cloud.demo.model.Account">
        update account
        set deposit=#{deposit}
        where id = #{id}
    </update>
    <select id="getTotalDeposit" resultType="java.math.BigDecimal">
        select sum(deposit) from account;
    </select>
</mapper>

Mapper 接口

1
2
3
4
5
6
7
8
@Component
public interface AccountMapper {
    Account selectById(int id);
    Account selectByIdForUpdate(int id);
    int updateDepositWithVersion(Account account);
    void updateDeposit(Account account);
    BigDecimal getTotalDeposit();
}

Account POJO

1
2
3
4
5
6
@Data
public class Account {
    private int id;
    private BigDecimal deposit;
    private int version;
}

AccountService

在 transferOptimistic 方法上有个自定义注解 @Retry,这个用来实现乐观锁失败后重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@Slf4j
@Service
public class AccountService {

    public enum Result{
        SUCCESS,
        DEPOSIT_NOT_ENOUGH,
        FAILED,
    }

    @Resource
    private AccountMapper accountMapper;

    private BiPredicate<BigDecimal, BigDecimal> isDepositEnough = (deposit, value) -> deposit.compareTo(value) > 0;

    /**
     * 转账操作,悲观锁
     *
     * @param fromId 扣款账户
     * @param toId   收款账户
     * @param value  金额
     */
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public Result transferPessimistic(int fromId, int toId, BigDecimal value) {
        Account from, to;

        try {
            // 先锁 id 较大的那行,避免死锁
            if (fromId > toId) {
                from = accountMapper.selectByIdForUpdate(fromId);
                to = accountMapper.selectByIdForUpdate(toId);
            } else {
                to = accountMapper.selectByIdForUpdate(toId);
                from = accountMapper.selectByIdForUpdate(fromId);
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return Result.FAILED;
        }

        if (!isDepositEnough.test(from.getDeposit(), value)) {
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            log.info(String.format("Account %d is not enough.", fromId));
            return Result.DEPOSIT_NOT_ENOUGH;
        }

        from.setDeposit(from.getDeposit().subtract(value));
        to.setDeposit(to.getDeposit().add(value));

        accountMapper.updateDeposit(from);
        accountMapper.updateDeposit(to);

        return Result.SUCCESS;
    }

    /**
     * 转账操作,乐观锁
     *  @param fromId 扣款账户
     * @param toId   收款账户
     * @param value  金额
     */
    @Retry
    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public Result transferOptimistic(int fromId, int toId, BigDecimal value) {
        Account from = accountMapper.selectById(fromId),
                to = accountMapper.selectById(toId);

        if (!isDepositEnough.test(from.getDeposit(), value)) {
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return Result.DEPOSIT_NOT_ENOUGH;
        }

        from.setDeposit(from.getDeposit().subtract(value));
        to.setDeposit(to.getDeposit().add(value));

        int r1, r2;

        // 先锁 id 较大的那行,避免死锁
        if (from.getId() > to.getId()) {
            r1 = accountMapper.updateDepositWithVersion(from);
            r2 = accountMapper.updateDepositWithVersion(to);
        } else {
            r2 = accountMapper.updateDepositWithVersion(to);
            r1 = accountMapper.updateDepositWithVersion(from);
        }

        if (r1 < 1 || r2 < 1) {
            // 失败,抛出重试异常,执行重试
            throw new RetryException("Transfer failed, retry.");
        } else {
            return Result.SUCCESS;
        }
    }
}

使用Spring AOP 实现乐观锁失败后重试

自定义注解 Retry

1
2
3
4
5
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Retry {
    int value() default 3; // 重试次数
}

重试异常 RetryException

1
2
3
4
5
public class RetryException extends RuntimeException {
    public RetryException(String message) {
        super(message);
    }
}

重试的切面类

tryAgain 方法使用了 @Around 注解(表示环绕通知),可以决定目标方法在何时执行,或者不执行,以及自定义返回结果。这里首先通过 ProceedingJoinPoint.proceed() 方法执行目标方法,如果抛出了重试异常,那么重新执行直到满三次,三次都不成功则回滚并返回 FAILED。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Aspect
@Component
public class RetryAspect {

    @Pointcut("@annotation(com.cloud.demo.annotation.Retry)")
    public void retryPointcut() {

    }

    @Around("retryPointcut() && @annotation(retry)")
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public Object tryAgain(ProceedingJoinPoint joinPoint, Retry retry) throws Throwable {
        int count = 0;
        do {
            count++;
            try {
                return joinPoint.proceed();
            } catch (RetryException e) {
                if (count > retry.value()) {
                    log.error("Retry failed!");
                    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                    return AccountService.Result.FAILED;
                }
            }
        } while (true);
    }
}

单元测试

用多个线程模拟并发转账,经过测试,悲观锁除了账户余额不足,或者数据库连接不够以及等待超时,全部成功;乐观锁即使加了重试,成功的线程也很少,500 个平均也就十几个成功。

所以对于写多读少的操作,使用悲观锁,对于读多写少的操作,可以使用乐观锁。

完整代码请见 Github:https://github.com/imcloudfloating/Lock_Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class AccountServiceTest {

    // 并发数
    private static final int COUNT = 500;

    @Resource
    AccountMapper accountMapper;

    @Resource
    AccountService accountService;

    private CountDownLatch latch = new CountDownLatch(COUNT);
    private List<Thread> transferThreads = new ArrayList<>();
    private List<Pair<Integer, Integer>> transferAccounts = new ArrayList<>();

    @BeforeEach
    void setUp() {
        Random random = new Random(currentTimeMillis());
        transferThreads.clear();
        transferAccounts.clear();

        for (int i = 0; i < COUNT; i++) {
            int from = random.nextInt(10) + 1;
            int to;
            do{
                to = random.nextInt(10) + 1;
            } while (from == to);
            transferAccounts.add(new Pair<>(from, to));
        }
    }

    /**
     * 测试悲观锁
     */
    @Test
    void transferByPessimisticLock() throws Throwable {
        for (int i = 0; i < COUNT; i++) {
            transferThreads.add(new Transfer(i, true));
        }
        for (Thread t : transferThreads) {
            t.start();
        }
        latch.await();

        Assertions.assertEquals(accountMapper.getTotalDeposit(),
                BigDecimal.valueOf(10000).setScale(2, RoundingMode.HALF_UP));
    }

    /**
     * 测试乐观锁
     */
    @Test
    void transferByOptimisticLock() throws Throwable {
        for (int i = 0; i < COUNT; i++) {
            transferThreads.add(new Transfer(i, false));
        }
        for (Thread t : transferThreads) {
            t.start();
        }
        latch.await();

        Assertions.assertEquals(accountMapper.getTotalDeposit(),
                BigDecimal.valueOf(10000).setScale(2, RoundingMode.HALF_UP));
    }

    /**
     * 转账线程
     */
    class Transfer extends Thread {
        int index;
        boolean isPessimistic;

        Transfer(int i, boolean b) {
            index = i;
            isPessimistic = b;
        }

        @Override
        public void run() {
            BigDecimal value = BigDecimal.valueOf(
                    new Random(currentTimeMillis()).nextFloat() * 100
            ).setScale(2, RoundingMode.HALF_UP);

            AccountService.Result result = AccountService.Result.FAILED;
            int fromId = transferAccounts.get(index).getKey(),
                    toId = transferAccounts.get(index).getValue();
            try {
                if (isPessimistic) {
                    result = accountService.transferPessimistic(fromId, toId, value);
                } else {
                    result = accountService.transferOptimistic(fromId, toId, value);
                }
            } catch (Exception e) {
                log.error(e.getMessage());
            } finally {
                if (result == AccountService.Result.SUCCESS) {
                    log.info(String.format("Transfer %f from %d to %d success", value, fromId, toId));
                }
                latch.countDown();
            }
        }
    }
}

MySQL 配置

1
2
3
innodb_rollback_on_timeout='ON'
max_connections=1000
innodb_lock_wait_timeout=500
OLDER POST NEWER POST

Comments powered by Disqus.

内容

Search Results