线程安全——ReetrantLock(九)

前言

synchronized 属于隐式锁,即锁的持有与释放都是隐式的,我们无需干预,而本篇我们要讲解的是显式锁,即锁的持有和释放都必须由我们手动编写。

Lock

在 Java 1.5 中,官方在 concurrent 并发包中加入了 Lock 接口,该接口中提供了 lock()方法和 unLock()方法对显式加锁和显式释放锁操作进行支持。

加锁和解锁,如下:

1
2
3
4
5
6
7
Lock lock = new ReentrantLock();
lock.lock();
try{
//临界区......
}finally{
lock.unlock();
}

当前线程使用 lock()方法与 unlock()对临界区进行包围,其他线程由于无法持有锁将无法进入临界区直到当前线程释放锁,注意 unlock()操作必须在 finally 代码块中,这样可以确保即使临界区执行抛出异常,线程最终也能正常释放锁。

Lock 接口还提供了锁以下相关方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Lock {
//加锁
void lock();

//解锁
void unlock();

//可中断获取锁,与lock()不同之处在于可响应中断操作,即在获
//取锁的过程中可中断,注意synchronized在获取锁时是不可中断的
void lockInterruptibly() throws InterruptedException;

//尝试非阻塞获取锁,调用该方法后立即返回结果,如果能够获取则返回true,否则返回false
boolean tryLock();

//根据传入的时间段获取锁,在指定时间内没有获取锁则返回false,如果在指定时间内当前线程未被中并断获取到锁则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

//获取等待通知组件,该组件与当前锁绑定,当前线程只有获得了锁
//才能调用该组件的wait()方法,而调用后,当前线程将释放锁。
Condition newCondition();

可见 Lock 对象锁还提供了 synchronized 所不具备的其他同步特性:

  • 如可中断锁的获取
  • 超时中断锁的获取
  • 等待唤醒机制的多条件变量 Condition 等

这也使得 Lock 锁在使用上具有更大的灵活性。

ReetrantLook

JDK 1.5 新增的类,实现了 Lock 接口,是一个基于 AQS 并发组件的并发控制类。ReetrantLock 本身也是一种支持重进入的锁,即该锁可以支持一个线程对资源重复加锁,同时也支持公平锁与非公平锁。

注意: ReetrantLock 支持对同一线程重加锁,但是加锁多少次,就必须解锁多少次,这样才可以成功释放锁。

并发基础组件 AQS

主要为两个类:

  • AbstractOwnableSynchronizer:抽象类,定义了存储独占当前锁的线程和获取的方法

  • AbstractQueuedSynchronizer:抽象类,AQS 框架核心类,其内部以虚拟队列的方式管理线程的锁获取与锁释放,其中获取锁(tryAcquire 方法)和释放锁(tryRelease 方法)并没有提供默认实现,需要子类重写这两个方法实现具体逻辑,目的是使开发人员可以自由定义获取锁以及释放锁的方式。

AbstractQueuedSynchronizer 又称为队列同步器(后面简称 AQS),我们来看看它的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* AQS抽象类
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;

//指向同步的队尾
private transient volatile Node tail;

//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;

//省略其他代码......
}

AbstractQueuedSynchronizer 内部通过 state 来控制同步状态

  • state=0 时,则说明没有任何线程占有共享资源的锁。
  • state=1 时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。

AQS 内部通过内部类 Node 构成双向链表完成 FIFO 的同步队列,同时利用内部类 ConditionObject 构建等待队列。 当 Condition 调用 wait() 方法后,线程将会加入等待队列中,而当 Condition 调用 signal() 方法后,线程将从等待队列转移动同步队列中进行锁竞争。

同步模型如下:

JAVA并发——ReetrantLock_2020-03-17-17-13-50.png

Node 是 AQS 的内部类,其数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;

//标识线程已处于结束状态
static final int CANCELLED = 1;
//等待被唤醒状态
static final int SIGNAL = -1;
//条件状态,
static final int CONDITION = -2;
//在共享模式中使用表示获得的同步状态会被传播
static final int PROPAGATE = -3;

//等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
volatile int waitStatus;

//同步队列中前驱结点
volatile Node prev;

//同步队列中后继结点
volatile Node next;

//请求锁的线程
volatile Thread thread;

//等待队列中的后继结点,这个与Condition有关,稍后会分析
Node nextWaiter;

//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

//获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

//.....
}

主要包含了需要同步的线程本身 以及 线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。每个 Node 结点内部关联其前继结点 prev 和后继结点 next,这样可以方便线程释放锁后快速唤醒下一个在等待的线程。

SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式

  • 共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore
  • 独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock。

基于 ReetrantLock 分析 AQS 独占模式实现过程

ReentrantLock 内部存在 3 个实现类,分别是 SyncNonfairSyncFairSync

其中 Sync 继承自 AQS 实现了解锁 tryRelease() 方法,

NonfairSync(非公平锁)、 FairSync(公平锁)则继承自 Sync,实现了获取锁的 tryAcquire() 方法

ReentrantLock 的所有方法调用都通过间接调用 AQS 和 Sync 类及其子类来完成的。

非公平锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
//加锁
final void lock() {
//执行CAS操作,获取同步状态
if (compareAndSetState(0, 1))
//成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则再次请求同步状态
acquire(1);
}
}

也就是说,通过 CAS 机制保证并发的情况下只有一个线程可以成功将 state 设置为 1,获取到锁;

此时,其它线程在执行 compareAndSetState 时,因为 state 此时不是 0,所以会失败并返回 false,执行 acquire(1);再次请求同步:

1
2
3
4
5
6
public final void acquire(int arg) {
//再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这里传入参数 arg 是 state 的值,因为要获取锁,所以这里一般传递参数为 1,进入方法后首先会执行 tryAcquire(1) 方法,在前面分析过该方法在 AQS 中并没有具体实现,而是交由子类实现,因此该方法是由 ReetrantLock 类内部类实现的:

1
2
3
4
5
6
7
//NonfairSync类
static final class NonfairSync extends Sync {

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

JAVA并发——ReetrantLock_2020-03-17-17-41-52.png

假设有三个线程:线程 1 已经获得到了锁,线程 2 正在同步队列中排队,此时线程 3 执行 lock 方法尝试获取锁的时,线程 1 正好释放了锁,将 state 更新为 0,那么线程 3 就可能在线程 2 还没有被唤醒之前去获取到这个锁。

如果此时还没有获取到锁(nonfairTryAcquire 返回 false),那么接下来会把该线程封装成 node 去同步队列里排队,代码层面上执行的是 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

ReetrantLock 为独占锁,所以传入的参数为 Node.EXCLUSIVE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
//将请求同步状态失败的线程封装成结点
Node node = new Node(Thread.currentThread(), mode);

Node pred = tail;
//如果是第一个结点加入肯定为空,跳过。
//如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS执行尾部结点替换,尝试在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次加入或者CAS操作没有成功执行enq入队操作
enq(node);
return node;
}

同步队列中的结点会进入一个自旋过程,自旋的意思就是原地转圈圈:即结点都在观察时机准备获取同步状态,自旋过程是在 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法中执行的,先看前半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,死循环
for (;;) {
//获取前结点
final Node p = node.predecessor();
当且仅当p为头结点才尝试获取同步状态,FIFO
if (p == head && tryAcquire(arg)) {
//此时当前node前驱节点为head且已经tryAcquire获取到了锁,正在执行了它的相关信息
//已经没有任何用处了,所以现在需要考虑将它GC掉
//将node设置为头结点
setHead(node);
//清空原来头结点的引用便于GC
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//最终都没能获取同步状态,结束该线程的请求
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
//设置为头结点
private void setHead(Node node) {
head = node;
//清空结点数据以便于GC
node.thread = null;
node.prev = null;
}

当然如果前驱结点不是 head 而它又没有获取到锁,那么执行如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

interrupted = true;
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前结点的等待状态
int ws = pred.waitStatus;
//如果为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
//如果ws>0 则说明是结束状态,
//遍历前驱结点直到找到没有结束状态的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws小于0又不是SIGNAL状态,
//则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
//将当前线程挂起,线程会阻塞住
LockSupport.park(this);
//获取线程中断状态,interrupted()是判断当前中断状态,
//并非中断线程,因此可能true也可能false,并返回
return Thread.interrupted();
}

通常我们在设计队列时,我们需要考虑如何最大化的减少后续排队节点对于 CPU 的消耗,而在 AQS 中,只要当前节点的前驱节点不是头结点,再把当前节点加到队列后就会执行 LockSupport.park(this);将当前线程挂起,这样可以最大程度减少 CPU 消耗。

AQS 通过最简单的 CAS 和 LockSupport 的 park,设计出了高效的队列模型和机制:

  1. AQS 结构其实是在第二个线程获取锁的时候再初始化的,就是 lazy-Init 的思想,最大程度减少不必要的代码执行的开销

  2. 为了最大程度上提升效率,尽量避免线程间的通讯,采用了双向链表的 Node 结构去存储线程

  3. 为了最大程度上避免 CPU 上下文切换执行的消耗,在设计排队线程时,只有头结点的下一个的线程在一直重复执行获取锁,队列后面的线程会通过 LockSupport 进行休眠。

非公平锁的释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//ReentrantLock类的unlock
public void unlock() {
sync.release(1);
}

//AQS类的release()方法
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继结点的线程
unparkSuccessor(h);
return true;
}
return false;
}

//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {

int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判断状态是否为0,如果是则说明已释放同步状态
if (c == 0) {
free = true;
//设置Owner为null
setExclusiveOwnerThread(null);
}
//设置更新同步状态
setState(c);
return free;
}

一句话总结:释放锁首先就是把 volatile 类型的变量 state 减 1。state 从 1 变成 0.

unparkSuccessor(h)的作用的唤醒后续的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unparkSuccessor(Node node) {
//这里,node是head节点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}

从代码执行操作来看,这里主要作用是用 unpark()唤醒同步队列中最前边未放弃线程(也就是状态为 CANCELLED 的线程结点 s)。

公平锁实现

非公平锁与公平锁最大的区别,即公平锁在线程请求到来时先会判断同步队列是否存在结点,如果存在先执行同步队列中的结点线程,当前线程将封装成 node 加入同步队列等待。

而非公平锁呢,当线程请求到来时,不管同步队列是否存在线程结点,直接上去尝试获取同步状态,获取成功直接访问共享资源。

请注意在绝大多数情况下,非公平锁才是我们理想的选择,毕竟从效率上来说非公平锁总是胜于公平锁。

总结

重入锁 ReentrantLock,是一个基于 AQS 并发框架的并发控制类,其内部实现了 3 个类,分别是 Sync、NoFairSync 以及 FairSync 类,其中 Sync 继承自 AQS,实现了释放锁的模板方法 tryRelease(int),而 NoFairSync 和 FairSync 都继承自 Sync,实现各种获取锁的方法 tryAcquire(int)。

ReentrantLock 的所有方法实现几乎都间接调用了这 3 个类,因此当我们在使用 ReentrantLock 时,大部分使用都是在间接调用 AQS 同步器中的方法。

AQS 在设计时将性能优化到了极致,具体体现在同步队列的 park 和 unpark,初始化 AQS 时的懒加载,以及线程之间通过 Node 这样的数据结构从而避免线程间通讯造成的额外开销,这种由释放锁的线程主动唤醒后续线程的方式也是我们再实际过程中可以借鉴的。

文章目录
  1. 1. 前言
  2. 2. Lock
  3. 3. ReetrantLook
    1. 3.1. 并发基础组件 AQS
    2. 3.2. 基于 ReetrantLock 分析 AQS 独占模式实现过程
      1. 3.2.1. 非公平锁实现
      2. 3.2.2. 公平锁实现
  4. 4. 总结
|