Java 多线程编程之四 CAS、ABA 问题、锁

大纲

CAS

CAS 的介绍

CAS 的全称是 Compare-And-Swap(比较并交换),它是 CPU 并发原语,用于比较当前内存中的值与预期值是否相等,如果相等则进行更新操作,整个操作是原子性的,可用于解决多线程并发访问共享变量时的数据一致性问题。CAS 并发原语体现在 Java 语言中就是 sun.misc.Unsafe 类的各个方法,调用 UnSafe 类中的 CAS 方法,JVM 会帮开发者实现出 CAS 汇编指令,这是一种完全依赖于硬件平台的功能,通过它可以实现原子操作。再次强调,由于 CAS 是一种系统原语,原语属于操作系统范畴,是由若干条指令组成的,用于完成某项功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断(打断、加塞)。简而言之,CAS 是一系列 CPU 的原子指令,不会造成所谓的数据不一致问题,也就是说 CAS 是线程安全的。

提示

  • CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的更新值 B。
  • CAS 会比较当前线程工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较(自旋),直到主内存和工作内存中的值一致为止。

Java 中的 CAS 实现

  • 在 Java 中,CAS 是一种无锁的同步方式,常用于实现原子操作和乐观锁,适用于并发竞争不激烈的业务场景,可以提高并发性能。
  • 在 Java 中,CAS 是使用 java.util.concurrent.atomic 包中提供的原子类实现的,常用的原子类包括 AtomicInteger、AtomicReference 等,而原子类的底层是依靠 Unsafe 类实现原子性。

CAS 的原理

AtomicInteger 类

首先看看 AtomicInteger.getAndIncrement() 方法的源码,可以发现底层调用了一个 Unsafe 类的 getAndAddInt() 方法。

Unsafe 类

在 Java 中,CAS 操作的执行都依赖于 Unsafe 类的方法。Unsafe 是 CAS 的核心类,由于 Java 方法无法直接访问底层操作系统,需要通过本地(Native)方法来访问。Unsafe 相当于一个后门,基于该类可以直接操作特定的内存数据。Unsafe 类存在 于 sun.misc 包中,其内部方法可以像 C 语言的指针一样直接操作内存。特别注意,Unsafe 类的所有方法都是 native 修饰的,也就是说 Unsafe 类中的方法都直接调用操作系统底层资源执行相应的任务。Atomic 修饰的包装类(如 AtomicInteger)之所以能够保证原子性,依靠的就是底层的 Unsafe 类。

valueOffset 变量

valueOffset 是 AtomicInteger 类中变量 value 的内存地址偏移量,因为 Unsafe 类就是根据内存地址偏移量来获取数据的。从下面的源码能够看到,通过 this 指针和 valueOffset 得知变量 value 的完整内存地址后,就可以获取到变量 value 的值,然后执行加 1 的操作。

保证多线程之间的内存可见性

变量 v 是线程将变量 value 从主内存中拷贝到工作内存中的副本。也就是说,线程每次都要从主内存拿到最新的值,并拷贝一份副本到自己的工作内存,然后再执行 weakCompareAndSetInt() 方法跟主内存的值进行比较。线程不可以越过高速缓存直接操作主内存。在执行加 1 操作之前,需要比较工作内存和主内存中的变量值是否一致,假设执行 weakCompareAndSetInt() 方法返回 false,那么就会一直执行 do while 循环,直到期望的值(工作内存中的值)和当前值(主内存中的值)一致。这里没有用 synchronized,而是用 CAS,这样既可以提高并发性能,又可以够保证数据一致性。因为每个线程进来后,都会进入的 do while 循环,然后不断地获取主内存中的变量值,并判断工作内存中的变量值是否为最新值,然后再将更改后的变量值写回主内存。由于 AtomicInteger 中的 value 变量被 volatile 修饰,当某个线程将更改后的变量值写回主内存后,会立刻通知其他线程,这样其他线程就可以立刻感知到最新的变量值。值得一提的是,CAS 思想 + Unsafe 类 = 自旋(自我旋转)。

  • 假设线程 A 和线程 B 同时执行 getAndAddInt() 方法(两者分别跑在不同的 CPU 上)
    • AtomicInteger 里面的 value 原始值为 3,即主内存中 AtomicInteger 的 value 为 3,根据 JMM 模型,线程 A 和线程 B 各自持有一份值为 3 的副本,分别存储在各自的工作内存
    • 线程 A 通过 getIntVolatile() 方法拿到的 value 值为 3,此时刚好线程 A 被挂起(该线程失去 CPU 执行权)
    • 线程 B 通过 getIntVolatile() 方法拿到的 value 值也是 3,此时刚好线程 B 没有被挂起,并执行了 weakCompareAndSetInt() 方法,比较值时主内存中的值也是 3,线程 B 成功修改主内存中的值为 4
    • 线程 A 恢复运行,执行 weakCompareAndSetInt() 方法进行比较,发现自己工作内存中的值 3 和主内存中的值 4 不一致,说明该值已经被其它线程修改过了,那么线程 A 本次修改失败,只能够重新读取主内存中的值,也就是再执行一次 do while 循环
    • 线程 A 重新获取 value 值,因为变量 value 被 volatile 修饰,所以其它线程对它的修改,线程 A 总能够立刻感知到最新值,线程 A 继续执行 weakCompareAndSetInt() 方法进行比较和交换,直到成功执行写入操作为止

底层汇编代码

提示

  • Unsafe 类中的 weakCompareAndSetInt() 是一个本地(Native)方法,该方法的实现位于 unsafe.cpp 中。
  • 更多关于 Unsafe 类的介绍,请阅读 《Unsafe 介绍及 CAS 原理解析》

在下述的 C++ 代码中,会调用 Atomic::cmpxchg(x, addr, e) 函数实现比较和交换,其中参数 x 是即将更新的值,参数 e 是原内存的值

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

在下述的 Linux x86 实现中,Atomic::cmpxchg() 函数直接内嵌调用了 cmpxchgl 汇编指令

1
2
3
4
5
6
7
8
inline jint Atomic::cmpxchg(jint exchange_value, volatile jint *dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP( % 4) "cmpxchgl %1,(%3)"
: "=a"(exchange_value)
: "r"(exchange_value), "a"(compare_value), "r"(dest), "r"(mp)
: "cc", "memory");
return exchange_value;
}

CAS 的优缺点

  • 优点

    • 无锁:CAS 是一种基于硬件原子操作的无锁算法,避免了传统锁机制带来的性能损耗。
    • 高效:由于 CAS 是在硬件层面实现的原子操作,比使用锁的方式执行速度更快。
    • 避免死锁:CAS 操作不会导致线程死锁,因为它不需要获取锁来完成操作,而是基于原子性来完成操作。
    • 可实现乐观并发控制:CAS 实现了乐观锁,当多个线程尝试更新同一内存位置时,只有一个线程能够成功,其他线程可以根据失败重试或者执行其他操作。
  • 缺点

    • 自旋重试:CAS 操作在并发量大的情况下,可能会导致自旋重试次数过多,消耗 CPU 资源。
    • ABA 问题:CAS 操作可能会出现 ABA 问题,即在进行比较时,内存位置值已经被改变两次,导致原子操作无法正确判断值是否被修改。
    • 不能保证多个操作的原子性:CAS 只能保证单个操作的原子性,不能保证多个操作的原子性,这在复杂的业务场景下可能会带来一些问题。

总的来说,CAS 是一种高效的无锁机制,适合在并发度不高、竞争不激烈的业务场景下使用,但需要小心处理 CAS 可能带来的 ABA 问题和自旋重试次数过多的情况。

注意事项

  • 在多线程环境下,对一个共享变量执行写操作时,可以通过循环 CAS 的方式来保证操作的原子性。对于多个共享变量的写操作,循环 CAS 就无法保证操作的原子性,这个时候只能用锁来保证原子性。
  • CAS 有可能需要多次比较,导致循环时间长,CPU 资源开销大。因为执行的是 do while 循环,如果比较不成功会一直在循环。最极端的情况,就是某个线程一直取到的值(预期值)和内存中的值不一样,这样就会无限循环。

ABA 问题

面试连环套路

面试官往往会通过 AtomicInteger 引出一系列问题,环环相扣,如 CAS -> Unsafe 类 -> CAS 底层实现 -> ABA -> 原子引用更新 -> 如何规避 ABA 问题。

ABA 问题概述

假设现在有两个线程,分别是 T1 和 T2,然后 T1 执行某个操作的时间为 10 秒,T2 执行某个时间的操作是 2 秒。刚开始两个线程分别从主内存中获取 A 值,但是因为 T2 线程的执行速度更快,它先把主内存的值改成 B,然后再将主内存的值修改成 A,然后 T2 线程执行完毕。T1 线程在 10 秒后,执行完毕,判断主内存中的值为 A,并且和自己预期的值一样,它就简单认为主内存中的值没有被改动过,就直接将主内存中的值修改成 B;但是实际上主内存中的值在这期间可能经历了 ABCDEFA 一系列变化,也就是在这期间主内存中的值经历了 “狸猫换太子”。简而言之,ABA 问题就是,在某个线程获取主内存值后,该内存值在更改并写入主内存的时候,已经被其他线程修改了 N 次,只是最终又被改成原来的值,导致该线程误认为主内存的值没有被其他线程更改过。这种问题可能会影响到一些使用比较操作来判断数据状态的情况,如 CAS 操作。为了解决 ABA 问题,通常需要使用一些方法,比如使用版本号、引入额外的数据等手段来避免数据发生 ABA 问题。

CAS 导致的 ABA 问题

CAS 算法实现的一个重要前提是需要取出内存中某一时刻的数据,并在当下时刻比较并交换,那么在这个时间差内数据可能会发生变化。比如,一个线程 T1 从内存位置 V 中取出 A 值,这时候另外一个线程 T2 也从内存位置 V 中取出 A 值,并且线程 T2 执行了一些操作将内存位置 V 的值改成了 B,然后线程 T2 又将内存位置 V 的值改成 A,这时候线程 T1 进行 CAS 操作时发现内存中的值仍然是 A,然后线程 T1 成功写入。尽管线程 T1 的 CAS 操作执行成功,但是不代表这个过程就是没有问题的。简而言之,CAS 最大的问题是只管开头和结尾的值,也就是头和尾的值是一样的,那就修改成功,中间的这个过程,可能会被修改过。

ABA 问题产生

原子包装类(如 AtomicInteger)或者原子引用类(如 AtomicReference)会产生 ABA 问题,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 产生 ABA 问题的代码
*/
public class ABADemo {

public static AtomicReference<Integer> atomicReference = new AtomicReference<Integer>(100);

public static void main(String[] args) {
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "t1").start();

new Thread(() -> {
try {
// 暂定两秒t2线程,保证上面的t1线程完成了一次ABA操作
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean result = atomicReference.compareAndSet(100, 102);
System.out.println(result + " " + atomicReference.get());
}, "t2").start();
}

}

程序运行输出的结果:

1
true 102

ABA 问题解决

可以使用 JDK 中的 AtomicStampedReference 时间戳原子引用类(版本号原子引用类)解决 ABA 问题,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 解决 ABA 问题的代码
*/
public class AtomicStampedReferenceDemo {

public static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "初始版本号: " + stamp);

try {
// 暂定一秒t1线程,保证下面的t2线程拿到的初始版本号与t1的初始版本号一致
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "第一次修改后的版本号: " + atomicStampedReference.getStamp());

atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "第二次修改后的版本号: " + atomicStampedReference.getStamp());
}, "t1").start();

new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "初始版本号: " + stamp);

try {
// 暂定两秒t2线程,保证上面的t1线程完成了一次ABA操作
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean result = atomicStampedReference.compareAndSet(100, 102, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "是否修改成功:" + result + ",当前实际最新的版本号为: " + atomicStampedReference.getStamp());
System.out.println("当前实际最新值为:" + atomicStampedReference.getReference());
}, "t2").start();
}

}

程序运行输出的结果:

1
2
3
4
5
6
t1初始版本号: 1
t2初始版本号: 1
t1第一次修改后的版本号: 2
t1第二次修改后的版本号: 3
t2是否修改成功:false,当前实际最新的版本号为: 3
当前实际最新值为:100

观察程序的运行结果可以发现,线程 t1,在进行 ABA 操作后,版本号变更成了 3;而线程 t2 在进行操作的时候,就会出现操作失败,因为版本号和当初拿到的不一样,这就可以避免 ABA 问题的产生。

ABA 问题总结

利用原子引用 + 版本号(类似时间戳)机制可以解决 ABA 问题,比如可以直接使用 JDK 提供的版本号原子引用类 AtomicStampedReference

Java 常用锁

自旋锁

自旋锁的介绍

自旋锁(SpinLock)是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式不断尝试获取锁。这样的优点是减少线程上下文切换的消耗,缺点是循环获取锁的操作会消耗 CPU 资源。上面提到的 CAS(比较并交换),底层使用的就是自旋。自旋的本质就是多次尝试,多次访问,不会阻塞的状态就是自旋。在 CAS 中,Unsafe 类使用自旋锁的代码如下图所示:

juc-spinlock-1

自旋锁的优缺点

自旋锁是一种用于多线程编程中实现互斥访问的同步机制。它的优缺点如下:

  • 优点
    • 效率相对较高:自旋锁适用于对共享资源的竞争短暂且频繁的情况,相较于其他同步机制(如互斥锁),它的开销更小,因为当获取锁失败时,线程会进入忙等待状态(自旋),不会进入阻塞状态,减少了线程上下文切换的开销。
    • 预期等待时间短:由于自旋锁不会将请求资源的线程挂起,当持有锁的线程释放锁后,其他线程能够更快地获取锁,减少了等待时间。
  • 缺点
    • 占用 CPU 资源:自旋锁会导致线程在获取锁之前处于忙等状态,消耗 CPU 资源,尤其是在高并发的情况下,如果自旋时间过长,会增加系统的负担,降低 CPU 利用率。
    • 优先级反转:当一个低优先级线程持有锁,而高优先级线程在自旋等待时,可能会导致高优先级线程长时间得不到执行(即优先级反转问题)。
    • 存在死锁风险:自旋锁使用不当可能会导致死锁情况的发生,比如在多个线程之间相互等待对方释放锁。

手写一个自旋锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* 手写自旋锁
*/
public class SpinLockDemo {

// 原子引用线程
private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
// 当前线程
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + " come in");
// 自旋锁实现,如果期望值是 null,则更新为当前线程,否者一直自旋
while (!atomicReference.compareAndSet(null, thread)) {

}
}

public void myUnLock() {
// 当前线程
Thread thread = Thread.currentThread();
// 释放自旋锁
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + " unlock");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(() -> {
// 获取锁
spinLockDemo.myLock();

// 等待 5 秒,模拟业务执行
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { }

// 释放锁
spinLockDemo.myUnLock();
}, "t1").start();

// 让 main 线程暂停 1 秒,保证 t1 线程比 t2 线程先执行
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { }

new Thread(() -> {
// 获取锁
spinLockDemo.myLock();

// 等待 1 秒,模拟业务执行
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { }

// 释放锁
spinLockDemo.myUnLock();
}, "t2").start();
}

}

程序运行输出的结果:

1
2
3
4
t1 come in
t2 come in
t1 unlock
t2 unlock

可重入锁(递归锁)

可重入锁的介绍

可重入锁又叫 “递归锁”,指的是同一个线程在外层函数获得锁之后,在内层递归函数仍然能够获取到该锁,也就是同一个线程在外层方法获取到锁的时候,在进入内层方法会自动获取到锁(代码如下)。简而言之,可重入锁保证了线程可以进入任何一个它已经拥有锁的所有同步代码块。ReentrantLockSynchronized 都是典型的可重入锁。可重入锁最大的作用是可以避免死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 只要线程进入 method1 方法后,那么它也能直接进入 method2 方法,因为它们所拥有的锁是同一把锁
*/
public class LockDemo {

public synchronized void method1() {
method2();
}

public synchronized void method2() {

}
}

Synchronized 的验证代码

首先编写了一个资源类 Phone,拥有两个加了 synchronized 的同步方法,分别是 sendSMS()sendEmail()。另外,在 sendSMS() 方法中,调用了 sendEmail() 方法。最后在主线程中,开启了两个线程进行测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 验证 Synchronized 是可重入锁 (递归锁)
*/
class Phone {

public synchronized void sendSMS() throws Exception {
System.out.println(Thread.currentThread().getName() + " invoked sendSMS()");
// 在同步方法中,调用另外一个同步方法
sendEmail();
}

public synchronized void sendEmail() throws Exception {
System.out.println(Thread.currentThread().getName() + " invoked sendEmail()");
}
}

public class SynchronizedDemo {

public static void main(String[] args) {
Phone phone = new Phone();

// 线程一
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t1").start();

// 线程二
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}

}

程序运行输出的结果:

1
2
3
4
t1 invoked sendSMS()
t1 invoked sendEmail()
t2 invoked sendSMS()
t2 invoked sendEmail()

从上述运行结果可以发现,当 t1 线程进入 sendSMS() 方法的时候,拥有了一把锁,同时 t2 线程无法进入,直到 t1 线程拿着锁,执行完 sendEmail() 方法后,才释放锁,这样 t2 才能够执行 sendSMS() 方法。

ReentrantLock 的验证代码

首先资源类 Phone 实现了 Runnable 接口,重写了 run() 方法,并在里面调用 getLock() 方法。另外,在进入 getLock() 方法的时候就加了锁,而且在方法里面又调用另外一个加了锁的 setLock() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 验证 ReentrantLock 是可重入锁 (递归锁)
*/
class Phone implements Runnable {

private Lock lock = new ReentrantLock();

public void getLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get Lock");
setLock();
} finally {
lock.unlock();
}
}

public void setLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " set Lock");
} finally {
lock.unlock();
}
}

@Override
public void run() {
getLock();
}

}

public class ReentrantLockDemo {

public static void main(String[] args) {
Phone phone = new Phone();

Thread t1 = new Thread(phone, "t1");
Thread t2 = new Thread(phone, "t2");

t1.start();
t2.start();
}

}

程序运行输出的结果:

1
2
3
4
t1 get Lock
t1 set Lock
t2 get Lock
t2 set Lock

可以发现运行结果跟加 synchronized 修饰的方法是一致的,都是线程在外层的方法获取到锁之后,线程就能够直接执行内层的同步代码。

大厂面试题之一

在上述的 getLock() 方法中,如果加两把锁会是什么情况呢?

1
2
3
4
5
6
7
8
9
10
11
public void getLock() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get Lock");
setLock();
} finally {
lock.unlock();
lock.unlock();
}
}

最后得到的结果也是一样的,因为无论 getLock() 方法里面有多少把锁,其实它们都是同一把锁,也就是说用同一把钥匙就能够打开多把锁。

大厂面试题之二

在上述的 getLock() 方法中,如果加了两把锁,但是只解开一把锁,会出现什么情况呢?

1
2
3
4
5
6
7
8
9
10
public void getLock() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get Lock");
setLock();
} finally {
lock.unlock();
}
}

会出现死锁现象,也就是 t1 线程执行完成后,没有完全释放锁,导致 t2 线程一直在等待获取锁。简而言之,申请了几次锁,最后就需要释放几次锁。

大厂面试题之三

在上述的 getLock() 方法中,如果只加一把锁,但是后面解锁两次,又会出现什么情况呢?

1
2
3
4
5
6
7
8
9
10
public void getLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get Lock");
setLock();
} finally {
lock.unlock();
lock.unlock();
}
}

在这种情况下,程序运行会直接抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
t1 get Lock
t1 set Lock
t2 get Lock
t2 set Lock
Exception in thread "t1" Exception in thread "t2" java.lang.IllegalMonitorStateException
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:149)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1302)
at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:439)
at com.java.interview.test.Phone.getLock(ReentrantLockTest.java:17)
at com.java.interview.test.Phone.run(ReentrantLockTest.java:32)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.IllegalMonitorStateException
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:149)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1302)
at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:439)
at com.java.interview.test.Phone.getLock(ReentrantLockTest.java:17)
at com.java.interview.test.Phone.run(ReentrantLockTest.java:32)
at java.base/java.lang.Thread.run(Thread.java:834)

公平锁和非公平锁

公平锁和非公平锁的介绍

  • 公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先到先得。
  • 非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。在高并发的情况下,有可能会造成优先级反转,或者饥饿现象(即某个线程一直获取得不到锁)。
  • 在 JUC 包中,公平锁和非公平锁用的都是 ReentrantLock,而 ReentrantLock 默认是非公平锁。

公平锁和非公平锁的区别

  • 公平锁:公平锁就是很公平,在并发情况下,每个线程在获取锁时会查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照 FIFO 规则从队列中取到自己。
  • 非公平锁:非公平锁比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采取类似公平锁那种方式(等待队列)处理。
  • 在 JUC 包中,ReentrantLock 可以通过构造方法指定该锁是否公平(代码如下),默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。对于 synchronized 而言,也是一种非公平锁。
1
Lock lock = new ReentrantLock(true);    // 创建一个可重入锁,true 表示公平锁,false 表示非公平锁,默认是非公平锁

读锁 (共享锁)、写锁 (独占锁)、互斥锁

读锁和写锁的介绍

  • 读锁(共享锁):指该锁可被多个线程所持有,对于 ReentrantReadWriteLock 来讲,其读锁是共享锁,其写锁是独占锁。
  • 写锁(独占锁):指该锁一次只能被一个线程所持有,对于 ReentrantLockSynchronized 而言都是写锁(独占锁)。
  • 互斥锁:互斥锁是读锁(共享锁)和写锁(独占锁)的统一名称。

为什么会有写锁和读锁?

原来使用 ReentrantLock 创建锁的时候,那是独占锁,也就是说同一时刻只能有一个线程访问。但是有一个读写分离的场景,读的时候希望同时进行,那么原来写锁(独占锁)的并发性能就没这么好了,因为读锁并不会造成数据不一致的问题,因此多个线程同时读一个资源没有任何问题。所以为了满足并发量,读取共享资源应该可以同时进行,但是如果一个线程想去写共享资源,那就不应该再有其它线程可以对该资源进行读或写操作。读锁可保证并发读是非常高效的,其中读写、写读、写写的过程是互斥的,而读读是可以共存的。

读锁和写锁的使用案例

这里模拟实现一个读写缓存(如 MyBatis 的缓存实现),假设刚开始没有加锁的时候,会出现什么情况呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

class MyCache {

// 使用 volatile 修饰,必须保证可见性
private volatile Map<String, Object> cache = new HashMap<>();

public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + " 正在写入:" + key);
try {
// 模拟网络拥堵,延迟 0.2 秒
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成");
}

public Object get(String key) {
System.out.println(Thread.currentThread().getName() + " 正在读取:");
try {
// 模拟网络拥堵,延迟 0.3 秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成:" + value);
return value;
}
}

public class ReadWriteLockDemo {

public static void main(String[] args) {
MyCache myCache = new MyCache();

// 启动 5 个线程并发写
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, "write-" + i).start();
}

// 启动 5 个线程并发读
for (int i = 0; i < 5; i++) {
// lambda 表达式内部必须是 final
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, "read-" + i).start();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
write-0 正在写入:0
write-1 正在写入:1
write-3 正在写入:3
write-4 正在写入:4
write-2 正在写入:2
read-2 正在读取:
read-4 正在读取:
read-3 正在读取:
read-0 正在读取:
read-1 正在读取:
write-4 写入完成
read-1 读取完成:null
read-2 读取完成:null
write-2 写入完成
write-0 写入完成
read-3 读取完成:null
write-3 写入完成
write-1 写入完成
read-4 读取完成:4
read-0 读取完成:0

从上述运行结果可以发现,在并发写入的时候,写操作都被其它线程打断了,也就是写操作不具备原子性,这就造成了某个线程还没写完,其它线程又开始写,最终导致数据不一致。


上面的代码是没有加锁的,这样就会造成线程在执行写操作的时候,被其它线程频繁打断,也就是写操作不具备原子性,这个时候就需要用到读写锁来解决原子性问题。这里的读锁和写锁的区别在于,写锁允许同一时刻只有一个线程持有,可以独占式地执行写操作,而读锁允许同一时刻有多个线程同时持有,可以并发式地执行读操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个读写锁,它是一个读写锁融为一体的锁,在使用的时候,需要手动转换
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

// 获取写锁,当执行写操作的时候,需要转换成写锁
rwLock.writeLock().lock();

// 释放写锁
rwLock.writeLock().unlock();

// 获取读锁,当执行读操作的时候,需要转换成读锁
rwLock.readLock().lock();

// 释放读锁
rwLock.readLock().unlock();

使用读写锁改造后的完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/*
* 验证读写锁,简单模拟 MyBatis 的缓存实现
* 读操作:多个线程可以同时读同一个资源,并不会造成数据不一致的问题
* 写操作:原子 + 独占,整个过程必须是一个完整的统一体,中间不许被分割和被打断
*/
class MyCache {

// 使用 volatile 修饰,必须保证可见性
private volatile Map<String, Object> cache = new HashMap<>();

// 创建一个读写锁
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

public void put(String key, Object value) {
// 获取写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写入:" + key);
try {
// 模拟网络拥堵,延迟 0.2 秒
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放写锁
rwLock.writeLock().unlock();
}
}

public Object get(String key) {
// 获取读锁
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在读取:");
try {
// 模拟网络拥堵,延迟 0.3 秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = cache.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成:" + result);
return result;
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放读锁
rwLock.readLock().unlock();
}
return null;
}
}

public class ReadWriteLockDemo {

public static void main(String[] args) {
MyCache myCache = new MyCache();

// 启动 5 个线程并发写
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, "write-" + i).start();
}

// 启动 5 个线程并发读
for (int i = 0; i < 5; i++) {
// lambda 表达式内部必须是 final
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, "read-" + i).start();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
write-1 正在写入:1
write-1 写入完成
write-4 正在写入:4
write-4 写入完成
write-0 正在写入:0
write-0 写入完成
write-2 正在写入:2
write-2 写入完成
read-0 正在读取:
read-2 正在读取:
read-4 正在读取:
read-0 读取完成:0
read-2 读取完成:2
read-4 读取完成:4
write-3 正在写入:3
write-3 写入完成
read-3 正在读取:
read-1 正在读取:
read-3 读取完成:3
read-1 读取完成:1

从上述运行结果可以发现,写入操作是一个一个线程执行的,并且在每个线程执行写操作期间都不会被打断,即不存在原子性问题;而执行读操作的时候,是多个线程并发执行读操作。

Java 同步辅助类

CountDownLatch (倒计时门栓)

在 Java 中,CountDownLatch 的作用是允许一个或多个线程等待其他线程完成操作。它是通过一个计数器来实现,该计数器初始化为一个正整数,每当一个线程完成了自己的任务时,计数器的值就会减 1。当计数器达到零时,所有等待的线程就会被唤醒。CountDownLatch 广泛用于多线程编程中,特别是在一些需要等待其他线程执行完毕再继续执行的场景中,比如实现线程间协作、并行计算等。常见的应用场景包括多线程计算任务划分、线程池任务等待完成、主线程等待子线程完成等。

CountDownLatch 的介绍

CountDownLatch(倒计时门栓)是一个同步辅助类,在多线程环境中用来协调多个线程之间的执行顺序。它主要有两个常用的方法,分别是 await() 方法和 countDown() 方法。当一个或多个线程调用 await() 方法时,调用线程就会被阻塞。其它线程调用 countDown() 方法时会将计数器减 1,而调用 countDown() 方法的线程则不会被阻塞。当计数器的值变成零时,因调用 await() 方法被阻塞的线程会被唤醒,然后恢复执行。

CountDownLatch 的使用案例

现在有这样一个场景,假设一个自习室里有 7 个人,其中有一个是班长,班长的主要职责就是在其它 6 个同学走了后,关灯并锁上教室门,最后自己离开。因此班长是需要最后一个走的,那么有什么方法能够控制班长这个线程是最后一个执行,而其它线程是随机执行的呢?这个时候,就比较适合使用 CountDownLatch 来解决类似的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

public static void main(String[] args) {

// 计数器
CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 上完自习,离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}

try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " 班长关灯离开教室");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
2 上完自习,离开教室
4 上完自习,离开教室
6 上完自习,离开教室
1 上完自习,离开教室
5 上完自习,离开教室
3 上完自习,离开教室
main 班长关灯离开教室

CyclicBarrier (循环屏障)

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier),简称 “循环屏障”。它是一个同步辅助类,允许一组线程相互等待,直到它们都到达一个共同的屏障点(也叫同步点)。一旦所有线程都到达该屏障点,屏障将打开并且所有线程可以继续执行。CyclicBarrier 在多线程编程中通常用于线程间协作、并行计算等,例如要求一组线程在达到某个状态后同时执行下一阶段的任务,或者拆分一个大任务为多个子任务并行执行,最后合并执行结果。举一个现实中的例子,公司的员工全都到齐了,才可以开始开会。

CyclicBarrier 的介绍

CyclicBarrier 跟 CountDownLatch 刚好相反,也就是做加法,开始值是 0,加到某个值的时候就执行特定的任务。CyclicBarrier 可以让一组线程到达一个屏障点时被阻塞,直到最后一个线程到达屏障点时,屏障才会开门,所有被屏障拦截的线程才会继续执行。线程到达屏障点是通过调用 CyclicBarrier 的 await() 方法来实现。CyclicBarrier 类有一个核心的构造方法,它接受一个 int 类型的 parties 参数,用于指定在屏障打开之前必须调用 await() 方法的参与者数量。此外,构造方法还可以接受一个可选的 Runnable 参数,该参数定义了当屏障打开时要执行的操作。值得一提的是,CyclicBarrier 是可重用的,一旦所有等待线程都到达屏障点,屏障将重置并可以再次使用。

CyclicBarrier 的使用案例

这里将使用 CyclicBarrier,模拟集齐了 7 个龙珠,才可以召唤神龙,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

public static void main(String[] args) {

// 定义一个循环屏障,参数一是需要累加的值,参数二是需要执行的任务
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("开始召唤神龙");
});

// 启动 7 个线程,进行龙珠收集,当一个线程收集到的时候,需要让它执行 await 方法,等待 7 个线程全部收集完成后,才执行原先定义好任务
for (int i = 1; i <= 7; i++) {
final int index = i;
new Thread(() -> {
System.out.println("收集到第 " + index + " 颗龙珠");
try {
// 先到的被阻塞,等待全部线程执行完成
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}

}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
收集到第 1 颗龙珠
收集到第 3 颗龙珠
收集到第 2 颗龙珠
收集到第 7 颗龙珠
收集到第 5 颗龙珠
收集到第 6 颗龙珠
收集到第 4 颗龙珠
开始召唤神龙

Semaphore (信号量)

Semaphore (信号量) 是一种常用的同步工具,用于控制对共享资源的并发访问。在多线程编程中,Semaphore 可以用来限制并发访问某个共享资源的线程数量,避免资源竞争和提高系统性能,比如用于电商秒杀、停车场停车等业务场景。

Semaphore 的介绍

Semaphore 内部维护了一个计数器,该计数器会随着线程的获取和释放许可证而增减。它提供了两个核心方法,acquire()release(),分别用于获取许可证和释放许可证。Semaphore 可以指定初始许可证数量,这决定了同时允许多少个线程并发访问共享资源。当许可证数量为 1 时,Semaphore 可以用作互斥锁(Mutex);当许可证数量大于 1 时,Semaphore 可以用作限流器(RateLimiter)。Semaphore 的常见应用场景如下:

  • 资源池管理:Semaphore 可以限制对有限资源的并发访问,例如数据库连接池或线程池中的资源管理。
  • 控制并发线程数:Semaphore 可以控制同时执行的线程数量,例如限制同时访问某个接口的请求数量。
  • 控制任务流量:Semaphore 可以限制任务的执行速率,例如限制某个任务在单位时间内的执行次数。
  • 实现互斥锁:Semaphore 可以用于实现互斥锁的功能,通过设置 permits 为 1,保证同一时间只有一个线程可以访问共享资源。

Semaphore 的使用案例

这里将使用 Semaphore,模拟 7 辆车抢占 3 个停车位,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {

public static void main(String[] args) {

// 初始化一个信号量,参数一是 3(模拟 3 个停车位),参数二是 false(即非公平锁)
Semaphore semaphore = new Semaphore(3, false);

// 启动 7 个线程,模拟 7 台车抢占车位
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
// 抢占停车位,代表有一辆车已经抢占到了车位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到车位");

// 每辆车停车 3 秒
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 停车 3 秒后离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放停车位
semaphore.release();
}
}, "t-" + i).start();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
t-2 抢到车位
t-3 抢到车位
t-1 抢到车位
t-1 停车 3 秒后离开车位
t-2 停车 3 秒后离开车位
t-3 停车 3 秒后离开车位
t-6 抢到车位
t-4 抢到车位
t-7 抢到车位
t-6 停车 3 秒后离开车位
t-7 停车 3 秒后离开车位
t-5 抢到车位
t-4 停车 3 秒后离开车位
t-5 停车 3 秒后离开车位