提交 148b7f19 编写于 作者: 沉默王二's avatar 沉默王二 💬

上级 49057d6c
......@@ -294,7 +294,6 @@ export const sidebarConfig = sidebar({
"cas",
"aqs",
"lock",
"suo",
"pianxiangsuo",
"reentrantLock",
"ReentrantReadWriteLock",
......
......@@ -264,7 +264,7 @@ head:
- [synchronized锁的到底是什么?](thread/synchronized.md)
- [CAS详解](thread/cas.md)
- [AQS详解](thread/aqs.md)
- [大致了解下Java的锁接口和](thread/lock.md)
- [JUC 包下的那些](thread/lock.md)
- [公司空降一个美团大佬,彻底把Java中的锁”讲清楚了](thread/suo.md)
- [Java 15 终于把难搞的偏向锁移除了](thread/pianxiangsuo.md)
- [深入理解Java并发重入锁ReentrantLock](thread/reentrantLock.md)
......
......@@ -28,7 +28,16 @@ AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高
### AQS 的数据结构
AQS 内部使用了一个 [volatile](https://javabetter.cn/thread/volatile.html) 的变量 state 来作为资源的标识。同时定义了几个获取和改变 state 的 protected 方法,子类可以覆盖这些方法来实现自己的逻辑:
AQS 内部使用了一个 [volatile](https://javabetter.cn/thread/volatile.html) 的变量 state 来作为资源的标识。
```java
/**
* The synchronization state.
*/
private volatile int state;
```
同时定义了几个获取和改变 state 的 protected 方法,子类可以覆盖这些方法来实现自己的逻辑:
```java
getState()
......@@ -38,20 +47,22 @@ compareAndSetState()
这三种操作均是原子操作,其中 compareAndSetState 的实现依赖于 Unsafe 的 `compareAndSwapInt()` 方法。
而 AQS 类本身实现的是一些排队和阻塞的机制,比如具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)。它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针 head 和 tail 用于标识队列的头部和尾部。其数据结构如图
AQS 内部使用了一个先进先出(FIFO)的[双端队列](https://javabetter.cn/collection/arraydeque.html),并使用了两个指针 head 和 tail 用于标识队列的头部和尾部。其数据结构如下图所示
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/thread/aqs-c294b5e3-69ef-49bb-ac56-f825894746ab.png)
但它并不是直接储存线程,而是储存拥有线程的 Node 节点。
## 资源共享模式
![](https://cdn.tobebetterjavaer.com/stutymore/aqs-20230805211157.png)
### 资源共享模式
资源有两种共享模式,或者说两种同步方式:
- 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如 ReentrantLock
- 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如 Semaphore/CountDownLatch
- 独占模式(Exclusive):资源是独占的,一次只能有一个线程获取。如 [ReentrantLock](https://javabetter.cn/thread/reentrantLock.html)
- 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如 Semaphore/[CountDownLatch](https://javabetter.cn/thread/CountDownLatch.html)
一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如`ReadWriteLock`
一般情况下,子类只需要根据需求实现其中一种模式就可以,当然也有同时实现两种模式的同步类,如`ReadWriteLock`
AQS 中关于这两种资源共享模式的定义源码(均在内部类 Node 中)。我们来看看 Node 的结构:
......@@ -100,23 +111,19 @@ private Node addWaiter(Node mode) {
}
```
> 注意:通过 Node 我们可以实现两个队列,一是通过 prev 和 next 实现 CLH 队列(线程同步队列,双向队列),二是 nextWaiter 实现 Condition 条件上的等待线程队列(单向队列),这个 Condition 主要用在 ReentrantLock 类中。
注意:通过 Node 我们可以实现两个队列,一是通过 prev 和 next 实现 CLH 队列(线程同步队列、双向队列),二是 nextWaiter 实现 Condition 条件上的等待线程队列(单向队列),这个 Condition 主要用在 ReentrantLock 类中。
## AQS 的主要方法源码解析
### AQS 的主要源码解析
AQS 的设计是基于**模板方法模式**的,它有一些方法必须要子类去实现的,它们主要有:
- isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- `isHeldExclusively()`:该线程是否正在独占资源。只有用到 condition 才需要去实现它。
- `tryAcquire(int)`:独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
- `tryRelease(int)`:独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
- `tryAcquireShared(int)`:共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- `tryReleaseShared(int)`:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
这些方法虽然都是`protected`方法,但是它们并没有在 AQS 具体实现,而是直接抛出异常(这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法):
这些方法虽然都是`protected`的,但是它们并没有在 AQS 具体实现,而是直接抛出异常:
```java
protected boolean tryAcquire(int arg) {
......@@ -124,11 +131,15 @@ protected boolean tryAcquire(int arg) {
}
```
这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法:
![](https://cdn.tobebetterjavaer.com/stutymore/aqs-20230805211732.png)
而 AQS 实现了一系列主要的逻辑。下面我们从源码来分析一下获取和释放资源的主要逻辑:
### 获取资源
#### 获取资源
获取资源的入口是 acquire(int arg)方法。arg 是要获取的资源的个数,在独占模式下始终为 1。我们先来看看这个方法的逻辑:
获取资源的入口是 `acquire(int arg)`方法。arg 是要获取的资源个数,在独占模式下始终为 1。我们先来看看这个方法的逻辑:
```java
public final void acquire(int arg) {
......@@ -138,25 +149,22 @@ public final void acquire(int arg) {
}
```
首先调用 tryAcquire(arg)尝试去获取资源。前面提到了这个方法是在子类具体实现的。
首先调用 tryAcquire 尝试去获取资源。前面提到了这个方法是在子类中具体实现的。
如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的 Node 是独占式的。这个方法的具体实现:
如果获取资源失败,就通过 `addWaiter(Node.EXCLUSIVE)` 方法把这个线程插入到等待队列中。其中传入的参数代表要插入的 Node 是独占式的。这个方法的具体实现:
```java
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
......@@ -179,45 +187,48 @@ private Node enq(final Node node) {
}
```
> 上面的两个函数比较好理解,就是在队列的尾部插入新的 Node 节点,但是需要注意的是由于 AQS 中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过 CAS 自旋的方式保证了操作的线程安全性。
上面的两个方法比较好理解,就是在队列的尾部插入新的 Node 节点,但是需要注意的是由于 AQS 中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过 CAS 自旋的方式保证了操作的线程安全性。
OK,现在回到最开始的 aquire(int arg)方法。现在通过 addWaiter 方法,已经把一个 Node 放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的。具体的实现我们来看看 acquireQueued 方法
OK,现在回到最开始的 aquire 方法。现在通过 addWaiter 方法,已经把一个 Node 放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的。具体的实现我们来看看 acquireQueued 方法:
```java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted用于记录线程是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
for (;;) { // 自旋操作
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
// 如果前驱节点是head节点,并且尝试获取同步状态成功
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
// 设置当前节点为head节点
setHead(node);
p.next = null; // help GC
// 前驱节点的next引用设为null,帮助垃圾回收器回收该节点
p.next = null;
// 获取同步状态成功,将failed设为false
failed = false;
// 返回线程是否被中断过
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果应该让当前线程阻塞并且线程在阻塞时被中断,则将interrupted设为true
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取同步状态失败,取消尝试获取同步状态
if (failed)
cancelAcquire(node);
}
}
```
> 这里 parkAndCheckInterrupt 方法内部使用到了 LockSupport.park(this),顺便简单介绍一下 park
>
> LockSupport 类是 Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数
>
> - park(boolean isAbsolute, long time):阻塞当前线程
> - unpark(Thread jthread):使给定的线程停止阻塞
这里 parkAndCheckInterrupt 方法内部使用到了 `LockSupport.park(this)`,顺便简单介绍一下 park 方法
LockSupport 类是 Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的方法,归结到 Unsafe 里,只有两个
- `park(boolean isAbsolute, long time)`:阻塞当前线程
- `unpark(Thread jthread)`:使给定的线程停止阻塞
所以**结点进入等待队列后,是调用 park 使它进入阻塞状态的。只有头结点的线程是处于活跃状态的**
......@@ -227,13 +238,13 @@ final boolean acquireQueued(final Node node, int arg) {
- acquireShared:申请共享模式的资源
- acquireSharedInterruptibly:申请可中断的资源(共享模式)
> 可中断的意思是,在线程中断时可能会抛出`InterruptedException`
可中断的意思是,在线程中断时可能会抛出`InterruptedException`
总结起来的一个流程图:
![acquire流程](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/thread/aqs-a0689bb2-9b18-419d-9617-6d292fbd439d.jpg)
## 释放资源
#### 释放资源
释放资源相比于获取资源来说,会简单许多。在 AQS 中只有一小段实现。源码:
......@@ -270,13 +281,11 @@ private void unparkSuccessor(Node node) {
}
```
---
`java.util.concurrent.locks.ReentrantLock`的实现中,`tryRelease(arg)`会减少持有锁的数量,如果持有锁的数量变为0,释放锁并返回true。
如果`tryRelease(arg)`成功释放了锁,那么接下来会检查队列的头结点。如果头结点存在并且waitStatus不为0(这意味着有线程在等待),那么会调用`unparkSuccessor(Node h)`方法来唤醒等待的线程。
> 编辑:沉默王二,内容大部分来源以下三个开源仓库:
>
> - [深入浅出 Java 多线程](http://concurrent.redspider.group/)
> - [并发编程知识总结](https://github.com/CL0610/Java-concurrency)
> - [Java 八股文](https://github.com/CoderLeixiaoshuai/java-eight-part)
> 编辑:沉默王二,编辑前的内容来源于朋友开源的这个仓库:[深入浅出 Java 多线程](http://concurrent.redspider.group/),强烈推荐。
---
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册