提交 52d945ac 编写于 作者: 沉默王二's avatar 沉默王二 💬

并发编程第二弹

上级 5d168dcc
......@@ -7,8 +7,6 @@ tag:
- Java
---
# 1.ConcurrentHashmap简介 #
在使用HashMap时,在多线程情况下扩容会出现CPU接近100%的情况,因为hashmap并不是线程安全的,通常我们可以使用在java体系中古老的hashtable类,该类基本上所有的方法都采用synchronized进行线程安全的控制,可想而知,在高并发的情况下,每次只有一个线程能够获取对象监视器锁,这样的并发性能的确不令人满意。
另外一种方式通过Collections的`Map<K,V> synchronizedMap(Map<K,V> m)`将hashmap包装成一个线程安全的map。比如SynchronzedMap的put方法源码为:
......@@ -31,10 +29,11 @@ JDK 1.6版本关键要素:
至于为什么不用ReentrantLock而是Synchronzied呢?实际上,synchronzied做了很多的优化,包括偏向锁,轻量级锁,重量级锁,可以依次向上升级锁状态,但不能降级,因此,使用synchronized相较于ReentrantLock的性能会持平甚至在某些情况更优,具体的性能测试可以去网上查阅一些资料。另外,底层数据结构改变为采用数组+链表+红黑树的数据形式。
# 2.关键属性及类 #
## 关键属性及类
在了解ConcurrentHashMap的具体方法实现前,我们需要系统的来看一下几个关键的地方。
> **ConcurrentHashMap的关键属性**
### **ConcurrentHashMap的关键属性**
1. **table**
`volatile Node<K,V>[] table`:
......@@ -51,8 +50,8 @@ JDK 1.6版本关键要素:
该属性用来控制table数组的大小,根据是否初始化和是否正在扩容有几种情况:
- **当值为负数时:**如果为-1表示正在初始化,如果为-N则表示当前正有N-1个线程进行扩容操作;
- **当值为正数时:**如果当前数组为null的话表示table在初始化过程中,sizeCtl表示为需要新建数组的长度;
- **当值为负数时:** 如果为-1表示正在初始化,如果为-N则表示当前正有N-1个线程进行扩容操作;
- **当值为正数时:** 如果当前数组为null的话表示table在初始化过程中,sizeCtl表示为需要新建数组的长度;
- 若已经初始化了,表示当前数据容器(table数组)可用容量也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度n 乘以 加载因子loadFactor;
- 当值为0时,即数组长度为默认初始值。
......@@ -66,360 +65,403 @@ JDK 1.6版本关键要素:
而在大量的同步组件和并发容器的实现中使用CAS是通过`sun.misc.Unsafe`类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为java中的“指针”。该成员变量的获取是在静态代码块中:
```java
static {
try {
U = sun.misc.Unsafe.getUnsafe();
.......
} catch (Exception e) {
throw new Error(e);
}
}
static {
try {
U = sun.misc.Unsafe.getUnsafe();
.......
} catch (Exception e) {
throw new Error(e);
}
}
```
> **ConcurrentHashMap中关键内部类**
### **ConcurrentHashMap中关键内部类**
#### 1. **Node**
1. **Node**
Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域
```java
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
```
另外可以看出很多属性都是用volatile进行修饰的,也就是为了保证内存可见性。
2. **TreeNode**
#### 2. **TreeNode**
树节点,继承于承载数据的Node类。而红黑树的操作是针对TreeBin类的,从该类的注释也可以看出,也就是TreeBin会将TreeNode进行再一次封装
```java
**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
......
}
**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
......
}
```
3. **TreeBin**
#### 3. **TreeBin**
这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。
```java
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
......
}
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
......
}
```
4. **ForwardingNode**
#### 4. **ForwardingNode**
在扩容时才会出现的特殊节点,其key,value,hash全部为null。并拥有nextTable指针引用新的table数组。
```java
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}
```
> **CAS关键操作**
## **CAS关键操作**
在上面我们提及到在ConcurrentHashMap中会大量使用CAS修改它的属性和一些操作。因此,在理解ConcurrentHashMap的方法前我们需要了解下面几个常用的利用CAS算法来保障线程安全的操作。
1. **tabAt**
### 1. **tabAt**
```java
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
```
该方法用来获取table数组中索引为i的Node元素。
2. **casTabAt**
### 2. **casTabAt**
```java
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
```
利用CAS操作设置table数组中索引为i的元素
3. **setTabAt**
利用CAS操作设置table数组中索引为i的元素
### 3. **setTabAt**
```java
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
```
该方法用来设置table数组中索引为i的元素
该方法用来设置table数组中索引为i的元素
## 重点方法讲解
# 3.重点方法讲解 #
在熟悉上面的这核心信息之后,我们接下来就来依次看看几个常用的方法是怎样实现的。
## 3.1 实例构造器方法 ##
### 实例构造器方法
在使用ConcurrentHashMap第一件事自然而然就是new 出来一个ConcurrentHashMap对象,一共提供了如下几个构造器方法:
```java
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16
ConcurrentHashMap()
// 2. 给定map的大小
ConcurrentHashMap(int initialCapacity)
// 3. 给定一个map
ConcurrentHashMap(Map<? extends K, ? extends V> m)
// 4. 给定map的大小以及加载因子
ConcurrentHashMap(int initialCapacity, float loadFactor)
// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16
ConcurrentHashMap()
// 2. 给定map的大小
ConcurrentHashMap(int initialCapacity)
// 3. 给定一个map
ConcurrentHashMap(Map<? extends K, ? extends V> m)
// 4. 给定map的大小以及加载因子
ConcurrentHashMap(int initialCapacity, float loadFactor)
// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
```
ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请看注释,我们来看看第2种构造器,传入指定大小时的情况,该构造器源码为:
```java
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}
```
这段代码的逻辑请看注释,很容易理解,如果小于0就直接抛出异常,如果指定值大于了所允许的最大值的话就取最大值,否则,在对指定值做进一步处理。最后将cap赋值给sizeCtl,关于sizeCtl的说明请看上面的说明,**当调用构造器方法之后,sizeCtl的大小应该就代表了ConcurrentHashMap的大小,即table数组长度**。tableSizeFor做了哪些事情了?源码为:
```java
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
```
通过注释就很清楚了,该方法会将调用构造器方法时指定的大小转换成一个2的幂次方数,也就是说ConcurrentHashMap的大小一定是2的幂次方,比如,当指定大小为18时,为了满足2的幂次方特性,实际上concurrentHashMapd的大小为2的5次方(32)。
另外,需要注意的是,**调用构造器方法的时候并未构造出table数组(可以理解为ConcurrentHashMap的数据容器),只是算出table数组的长度,当第一次向ConcurrentHashMap插入数据的时候才真正的完成初始化创建table数组的工作**
## 3.2 initTable方法 ##
### initTable方法
直接上源码:
```java
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
```
代码的逻辑请见注释,有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里`n - (n >>> 2)`是不是刚好是`n-(1/4)n=(3/4)n`,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为`DEFAULT_CAPACITY`(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。
代码的逻辑请见注释,有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。
正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。
另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里`n - (n >>> 2)`是不是刚好是`n-(1/4)n=(3/4)n`,挺有意思的吧:)。
如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为`DEFAULT_CAPACITY`(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。
### put方法
## 3.3 put方法 ##
使用ConcurrentHashMap最长用的也应该是put和get方法了吧,我们先来看看put方法是怎样实现的。调用put方法时实际具体实现是putVal方法,源码如下:
```java
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
```
put方法的代码量有点长,我们按照上面的分解的步骤一步步来看。**从整体而言,为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied和CAS的方式**。在之前了解过HashMap以及1.8版本之前的ConcurrenHashMap都应该知道ConcurrentHashMap结构图,为了方面下面的讲解这里先直接给出,如果对这有疑问的话,可以在网上随便搜搜即可。
put方法的代码量有点长,我们按照上面的分解的步骤一步步来看。
![ConcurrentHashMap散列桶数组结构示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ConcurrentHashMap-179ac2e9-38e4-4f80-9b93-a966613e8838.png)
**从整体而言,为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied和CAS的方式**
在之前了解过HashMap以及1.8版本之前的ConcurrenHashMap都应该知道ConcurrentHashMap结构图,为了方面下面的讲解这里先直接给出,如果对这有疑问的话,可以在网上随便搜搜即可。
![ConcurrentHashMap散列桶数组结构示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ConcurrentHashMap-01.png)
如图(图片摘自网络),ConcurrentHashMap是一个哈希桶数组,如果不出现哈希冲突的时候,每个元素均匀的分布在哈希桶数组中。当出现哈希冲突的时候,是**标准的链地址的解决方式**,将hash值相同的节点构成链表的形式,称为“拉链法”,另外,在1.8版本中为了防止拉链过长,当链表的长度大于8的时候会将链表转换成红黑树。
table数组中的每个元素实际上是单链表的头结点或者红黑树的根节点。当插入键值对时首先应该定位到要插入的桶,即插入table数组的索引i处。那么,怎样计算得出索引i呢?当然是根据key的hashCode值。
如图(图片摘自网络),ConcurrentHashMap是一个哈希桶数组,如果不出现哈希冲突的时候,每个元素均匀的分布在哈希桶数组中。当出现哈希冲突的时候,是**标准的链地址的解决方式**,将hash值相同的节点构成链表的形式,称为“拉链法”,另外,在1.8版本中为了防止拉链过长,当链表的长度大于8的时候会将链表转换成红黑树。table数组中的每个元素实际上是单链表的头结点或者红黑树的根节点。当插入键值对时首先应该定位到要插入的桶,即插入table数组的索引i处。那么,怎样计算得出索引i呢?当然是根据key的hashCode值。
> 1. spread()重哈希,以减小Hash冲突
#### 1. spread()重哈希,以减小Hash冲突
我们知道对于一个hash表来说,hash值分散的不够均匀的话会大大增加哈希冲突的概率,从而影响到hash表的性能。因此通过spread方法进行了一次重hash从而大大减小哈希冲突的可能性。spread方法为:
```java
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
```
该方法主要是**将key的hashCode的低16位于高16位进行异或运算**,这样不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾,做到平衡的trade-off。
> 2.初始化table
#### 2. 初始化table
紧接着到第2步,会判断当前table数组是否初始化了,没有的话就调用initTable进行初始化,该方法在上面已经讲过了。
> 3.能否直接将新值插入到table数组中
#### 3. 能否直接将新值插入到table数组中
从上面的结构示意图就可以看出存在这样一种情况,如果插入值待插入的位置刚好所在的table数组为null的话就可以直接将值插入即可。那么怎样根据hash确定在table中待插入的索引i呢?很显然可以通过hash值与数组的长度取模操作,从而确定新值插入到数组的哪个位置。而之前我们提过ConcurrentHashMap的大小总是2的幂次方,(n - 1) & hash运算等价于对长度n取模,也就是hash%n,但是位运算比取模运算的效率要高很多,Doug lea大师在设计并发容器的时候也是将性能优化到了极致,令人钦佩。
确定好数组的索引i后,就可以可以tabAt()方法(该方法在上面已经说明了,有疑问可以回过头去看看)获取该位置上的元素,如果当前Node f为null的话,就可以直接用casTabAt方法将新值插入即可。
> 4.当前是否正在扩容
#### 4.当前是否正在扩容
如果当前节点不为null,且该节点为特殊节点(forwardingNode)的话,就说明当前concurrentHashMap正在进行扩容操作,关于扩容操作,下面会作为一个具体的方法进行讲解。
那么怎样确定当前的这个Node是不是特殊的节点了?是通过判断该节点的hash值是不是等于-1(MOVED),代码为`(fh = f.hash) == MOVED`,对MOVED的解释在源码上也写的很清楚了:
如果当前节点不为null,且该节点为特殊节点(forwardingNode)的话,就说明当前concurrentHashMap正在进行扩容操作,关于扩容操作,下面会作为一个具体的方法进行讲解。那么怎样确定当前的这个Node是不是特殊的节点了?是通过判断该节点的hash值是不是等于-1(MOVED),代码为(fh = f.hash) == MOVED,对MOVED的解释在源码上也写的很清楚了:
```java
static final int MOVED = -1; // hash for forwarding nodes
static final int MOVED = -1; // hash for forwarding nodes
```
> 5.当table[i]为链表的头结点,在链表中插入新值
#### 5. 当table[i]为链表的头结点,在链表中插入新值
在table[i]不为null并且不为forwardingNode时,并且当前Node f的hash值大于`0(fh >= 0)`的话说明当前节点f为当前桶的所有的节点组成的链表的头结点。那么接下来,要想向ConcurrentHashMap插入新值的话就是向这个链表插入新值。通过synchronized (f)的方式进行加锁以实现线程安全性。往链表中插入节点的部分代码为:
在table[i]不为null并且不为forwardingNode时,并且当前Node f的hash值大于0(fh >= 0)的话说明当前节点f为当前桶的所有的节点组成的链表的头结点。那么接下来,要想向ConcurrentHashMap插入新值的话就是向这个链表插入新值。通过synchronized (f)的方式进行加锁以实现线程安全性。往链表中插入节点的部分代码为:
```java
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到hash值相同的key,覆盖旧值即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到hash值相同的key,覆盖旧值即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
```
这部分代码很好理解,就是两种情况:1. 在链表中如果找到了与待插入的键值对的key相同的节点,就直接覆盖即可;2. 如果直到找到了链表的末尾都没有找到的话,就直接将待插入的键值对追加到链表的末尾即可
> 6.当table[i]为红黑树的根节点,在红黑树中插入新值
#### 6.当table[i]为红黑树的根节点,在红黑树中插入新值
按照之前的数组+链表的设计方案,这里存在一个问题,即使负载因子和Hash算法设计的再合理,也免不了会出现拉链过长的情况,一旦出现拉链过长,甚至在极端情况下,查找一个节点会出现时间复杂度为O(n)的情况,则会严重影响ConcurrentHashMap的性能,于是,在JDK1.8版本中,对数据结构做了进一步的优化,引入了红黑树。而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高ConcurrentHashMap的性能,其中会用到红黑树的插入、删除、查找等算法。当table[i]为红黑树的树节点时的操作为:
```java
if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
```
首先在if中通过`f instanceof TreeBin`判断当前table[i]是否是树节点,这下也正好验证了我们在最上面介绍时说的TreeBin会对TreeNode做进一步封装,对红黑树进行操作的时候针对的是TreeBin而不是TreeNode。这段代码很简单,调用putTreeVal方法完成向红黑树插入新节点,同样的逻辑,**如果在红黑树中存在于待插入键值对的Key相同(hash值相等并且equals方法判断为true)的节点的话,就覆盖旧值,否则就向红黑树追加新节点**
> 7.根据当前节点个数进行调整
#### 7. 根据当前节点个数进行调整
当完成数据新节点插入之后,会进一步对当前链表大小进行调整,这部分代码为:
```java
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
```
很容易理解,如果当前链表节点个数大于等于8(TREEIFY_THRESHOLD)的时候,就会调用treeifyBin方法将tabel[i](第i个散列桶)拉链转换成红黑树。
至此,关于Put方法的逻辑就基本说的差不多了,现在来做一些总结:
......@@ -437,186 +479,188 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步
8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。
## 3.4 get方法 ##
### get方法
看完了put方法再来看get方法就很容易了,用逆向思维去看就好,这样存的话我反过来这么取就好了。get方法源码为:
```java
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 重hash
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 2. table[i]桶节点的key与查找的key相同,则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 重hash
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 2. table[i]桶节点的key与查找的key相同,则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
```
代码的逻辑请看注释,首先先看当前的hash桶数组节点即table[i]是否为查找的节点,若是则直接返回;若不是,则继续再看当前是不是树节点?通过看节点的hash值是否为小于0,如果小于0则为树节点。如果是树节点在红黑树中查找节点;如果不是树节点,那就只剩下为链表的形式的一种可能性了,就向后遍历查找节点,若查找到则返回节点的value即可,若没有找到就返回null。
## 3.5 transfer方法 ##
### transfer方法
当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。transfer方法源码为:
```java
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//1. 新建Node数组,容量为之前的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//2. 新建forwardingNode引用,在之后会用到
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 3. 确定遍历中的索引i
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//4.将原数组中的元素复制到新数组中去
//4.5 for循环退出,扩容结束修改sizeCtl属性
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//4.1 当前数组中第i个元素为null,用CAS设置成特殊节点forwardingNode(可以理解成占位符)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//4.2 如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//4.3 处理当前节点为链表的头结点的情况,构造两个链表,一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//4.4 处理当前节点是TreeBin时的情况,操作和上面的类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//1. 新建Node数组,容量为之前的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//2. 新建forwardingNode引用,在之后会用到
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 3. 确定遍历中的索引i
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//4.将原数组中的元素复制到新数组中去
//4.5 for循环退出,扩容结束修改sizeCtl属性
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//4.1 当前数组中第i个元素为null,用CAS设置成特殊节点forwardingNode(可以理解成占位符)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//4.2 如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//4.3 处理当前节点为链表的头结点的情况,构造两个链表,一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//4.4 处理当前节点是TreeBin时的情况,操作和上面的类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
```
代码逻辑请看注释,整个扩容操作分为**两个部分**
......@@ -630,136 +674,139 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步
3. 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
4. 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。设置为新容量的0.75倍代码为 `sizeCtl = (n << 1) - (n >>> 1)`,仔细体会下是不是很巧妙,n<<1相当于n右移一位表示n的两倍即2n,n>>>1左右一位相当于n除以2即0.5n,然后两者相减为2n-0.5n=1.5n,是不是刚好等于新容量的0.75倍即2n*0.75=1.5n。最后用一个示意图来进行总结(图片摘自网络):
![ConcurrentHashMap扩容示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ConcurrentHashMap-eccd27ce-b994-4db2-9b4a-ead9b54647bb.png)
![ConcurrentHashMap扩容示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ConcurrentHashMap-02.png)
## 3.6 与size相关的一些方法 ##
### 与size相关的一些方法
对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为**不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值**,ConcurrentHashMap也是大费周章才计算出来的。
为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类
```java
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/******************************************/
/**
* 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
但它并不用返回当前hashmap的元素个数
*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/******************************************/
/**
* 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
但它并不用返回当前hashmap的元素个数
*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
```
> **mappingCount与size方法**
#### **mappingCount与size方法**
**mappingCount****size**方法的类似 从给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。
```java
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}
```
> **addCount方法**
#### **addCount方法**
在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。
```java
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
```
# 4. 总结 #
## 总结
JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。
......
......@@ -11,69 +11,74 @@ tag:
LockSupport位于java.util.concurrent.locks包下,有兴趣的可以直接去看源码,该类的方法并不是很多。LockSupprot是线程的阻塞原语,用来阻塞线程和唤醒线程。每个使用LockSupport的线程都会与一个许可关联,如果该许可可用,并且可在线程中使用,则调用park()将会立即返回,否则可能阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。但是注意许可**不可重入**,也就是说只能调用一次park()方法,否则会一直阻塞。
## LockSupport方法介绍
LockSupport中的方法不多,这里将这些方法做一个总结:
> **阻塞线程**
## **阻塞线程**
1. `void park()`:阻塞当前线程,如果调用unpark方法或者当前线程被中断,从能从park()方法中返回
2. `void park(Object blocker)`:功能同方法1,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
3. `void parkNanos(long nanos)`:阻塞当前线程,最长不超过nanos纳秒,增加了超时返回的特性;
4. `void parkNanos(Object blocker, long nanos)`:功能同方法3,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
5. `void parkUntil(long deadline)`:阻塞当前线程,知道deadline;
6. `void parkUntil(Object blocker, long deadline)`:功能同方法5,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
1. void park():阻塞当前线程,如果调用unpark方法或者当前线程被中断,从能从park()方法中返回
2. void park(Object blocker):功能同方法1,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
3. void parkNanos(long nanos):阻塞当前线程,最长不超过nanos纳秒,增加了超时返回的特性;
4. void parkNanos(Object blocker, long nanos):功能同方法3,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
5. void parkUntil(long deadline):阻塞当前线程,知道deadline;
6. void parkUntil(Object blocker, long deadline):功能同方法5,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;
## **唤醒线程**
> **唤醒线程**
`void unpark(Thread thread)`:唤醒处于阻塞状态的指定线程
void unpark(Thread thread):唤醒处于阻塞状态的指定线程
实际上LockSupport阻塞和唤醒线程的功能是依赖于sun.misc.Unsafe,这是一个很底层的类,有兴趣的可以去查阅资料,比如park()方法的功能实现则是靠unsafe.park()方法。
实际上LockSupport阻塞和唤醒线程的功能是依赖于sun.misc.Unsafe,这是一个很底层的类,有兴趣的可以去查阅资料,比如park()方法的功能实现则是靠unsafe.park()方法。另外在阻塞线程这一系列方法中还有一个很有意思的现象就是,每个方法都会新增一个带有Object的阻塞对象的重载方法。那么增加了一个Object对象的入参会有什么不同的地方了?示例代码很简单就不说了,直接看dump线程的信息。
另外在阻塞线程这一系列方法中还有一个很有意思的现象就是,每个方法都会新增一个带有Object的阻塞对象的重载方法。那么增加了一个Object对象的入参会有什么不同的地方了?示例代码很简单就不说了,直接看dump线程的信息。
**调用park()方法dump线程**
```java
"main" #1 prio=5 os_prio=0 tid=0x02cdcc00 nid=0x2b48 waiting on condition [0x00d6f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
"main" #1 prio=5 os_prio=0 tid=0x02cdcc00 nid=0x2b48 waiting on condition [0x00d6f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
```
**调用park(Object blocker)方法dump线程**
```
"main" #1 prio=5 os_prio=0 tid=0x0069cc00 nid=0x6c0 waiting on condition [0x00dcf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x048c2d18> (a java.lang.String)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
"main" #1 prio=5 os_prio=0 tid=0x0069cc00 nid=0x6c0 waiting on condition [0x00dcf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x048c2d18> (a java.lang.String)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
```
通过分别调用这两个方法然后dump线程信息可以看出,带Object的park方法相较于无参的park方法会增加 `parking to wait for <0x048c2d18> (a java.lang.String)`的信息,这种信息就类似于记录“案发现场”,有助于工程人员能够迅速发现问题解决问题。
有个有意思的事情是,我们都知道如果使用synchronzed阻塞了线程dump线程时都会有阻塞对象的描述,在java 5推出LockSupport时遗漏了这一点,在java 6时进行了补充。还有一点需要需要的是:**synchronzed致使线程阻塞,线程会进入到BLOCKED状态,而调用LockSupprt方法阻塞线程会致使线程进入到WAITING状态。**
有个有意思的事情是,我们都知道如果使用synchronzed阻塞了线程dump线程时都会有阻塞对象的描述,在java 5推出LockSupport时遗漏了这一点,在java 6时进行了补充。
## 一个例子
还有一点需要需要的是:**synchronzed致使线程阻塞,线程会进入到BLOCKED状态,而调用LockSupprt方法阻塞线程会致使线程进入到WAITING状态。**
用一个很简单的例子说说这些方法怎么用。
```java
public class LockSupportDemo {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "被唤醒");
});
thread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(thread);
}
}
public class LockSupportDemo {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "被唤醒");
});
thread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(thread);
}
}
```
thread线程调用LockSupport.park()致使thread阻塞,当mian线程睡眠3秒结束后通过LockSupport.unpark(thread)方法唤醒thread线程,thread线程被唤醒执行后续操作。另外,还有一点值得关注的是,**LockSupport.unpark(thread)可以指定线程对象唤醒指定的线程**
thread线程调用LockSupport.park()致使thread阻塞,当mian线程睡眠3秒结束后通过LockSupport.unpark(thread)方法唤醒thread线程,thread线程被唤醒执行后续操作。另外,还有一点值得关注的是,**`LockSupport.unpark(thread)`可以指定线程对象唤醒指定的线程**
---
......
......@@ -85,13 +85,15 @@ protected final boolean tryAcquire(int acquires) {
该方法是获取读锁被获取的次数,是将同步状态(int c)右移16次,即取同步状态的高16位,现在我们可以得出另外一个结论**同步状态的高16位用来表示读锁被获取的次数**。现在还记得我们开篇说的需要弄懂的第一个问题吗?读写锁是怎样实现分别记录读锁和写锁的状态的,现在这个问题的答案就已经被我们弄清楚了,其示意图如下图所示:
![读写锁的读写状态设计.png](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ReentrantReadWriteLock-609029e0-d0ed-41ee-9779-65e647bf91bc.png)
![读写锁的读写状态设计](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/ReentrantReadWriteLock-f714bdd6-917a-4d25-ac11-7e85b0ec1b14.png)
现在我们回过头来看写锁获取方法tryAcquire,其主要逻辑为:**当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增加写状态。**
### 写锁的释放
写锁释放通过重写AQS的tryRelease方法,源码为:
```java
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
......@@ -112,8 +114,11 @@ protected final boolean tryRelease(int releases) {
源码的实现逻辑请看注释,不难理解与ReentrantLock基本一致,这里需要注意的是,减少写状态` int nextc = getState() - releases;`只需要用**当前同步状态直接减去写状态的原因正是我们刚才所说的写状态是由同步状态的低16位表示的**
## 读锁详解
### 读锁的获取
看完了写锁,现在来看看读锁,读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取也就是一种共享式锁。按照之前对AQS介绍,实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为:
```java
protected final int tryAcquireShared(int unused) {
/*
......@@ -166,9 +171,12 @@ protected final int tryAcquireShared(int unused) {
```
代码的逻辑请看注释,需要注意的是 **当写锁被其他线程获取后,读锁获取失败**,否则获取成功利用CAS更新同步状态。另外,当前同步状态需要加上SHARED_UNIT(`(1 << SHARED_SHIFT)`即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的,这段代码就不展开说了,有兴趣可以看看。
代码的逻辑请看注释,需要注意的是 **当写锁被其他线程获取后,读锁获取失败**,否则获取成功利用CAS更新同步状态。
另外,当前同步状态需要加上SHARED_UNIT(`(1 << SHARED_SHIFT)`即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的,这段代码就不展开说了,有兴趣可以看看。
### 读锁的释放
读锁释放的实现主要通过方法tryReleaseShared,源码如下,主要逻辑请看注释:
```java
......@@ -208,34 +216,35 @@ protected final boolean tryReleaseShared(int unused) {
## 锁降级
读写锁支持锁降级,**遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁**,不支持锁升级,关于锁降级下面的示例代码摘自ReentrantWriteReadLock源码中:
```java
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
```
......
......@@ -10,6 +10,7 @@ tag:
## Condition简介
任何一个java对象都天然继承于Object类,在线程间实现通信的往往会应用到Object的几个方法:
- wait()
......@@ -31,21 +32,22 @@ tag:
参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:
> **针对Object的wait方法**
**针对Object的wait方法**
1. void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
2. long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者**超时**
3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
4. boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者**到了某个时间**
1. `void await() throws InterruptedException`:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
2. `long awaitNanos(long nanosTimeout)`:当前线程进入等待状态直到被通知,中断或者**超时**
3. `boolean await(long time, TimeUnit unit)throws InterruptedException`:同第二种,支持自定义时间单位
4. `boolean awaitUntil(Date deadline) throws InterruptedException`:当前线程进入等待状态直到被通知,中断或者**到了某个时间**
> **针对Object的notify/notifyAll方法**
**针对Object的`notify/notifyAll`方法**
1. void signal():唤醒一个等待在condition上的线程,将该线程从**等待队列**中转移到**同步队列**中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
2. void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程
1. `void signal()`:唤醒一个等待在condition上的线程,将该线程从**等待队列**中转移到**同步队列**中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
2. `void signalAll()`:与1的区别在于能够唤醒所有等待在condition上的线程
## Condition实现原理分析
### 等待队列
要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condiiton的源码。
......@@ -57,133 +59,158 @@ tag:
在锁机制的实现上,AQS内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到**同步队列**,同样的,condition内部也是使用同样的方式,内部维护了一个 **等待队列**,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量:
```java
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
```
这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。Node类有这样一个属性:
```java
//后继节点
Node nextWaiter;
//后继节点
Node nextWaiter;
```
进一步说明,**等待队列是一个单向队列**,而在之前说AQS时知道同步队列是一个双向队列。接下来我们用一个demo,通过debug进去看是不是符合我们的猜想:
```java
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
thread.start();
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
thread.start();
}
}
```
这段代码没有任何实际意义,甚至很臭,只是想说明下我们刚才所想的。新建了10个线程,没有线程先获取锁,然后调用condition.await方法释放锁将当前线程加入到等待队列中,通过debug控制当走到第10个线程的时候查看`firstWaiter`即等待队列中的头结点,debug模式下情景图如下:
![debug模式下情景图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-8c405604-f38b-4c9d-930b-1d9b20b36b40.png)
![debug模式下情景图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-01.png)
从这个图我们可以很清楚的看到这样几点:
从这个图我们可以很清楚的看到这样几点:1. 调用condition.await方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2....Thread-8;2. 等待队列是一个单向队列。通过我们的猜想然后进行实验验证,我们可以得出等待队列的示意图如下图所示:
1. 调用condition.await方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2....Thread-8;
2. 等待队列是一个单向队列。通过我们的猜想然后进行实验验证,我们可以得出等待队列的示意图如下图所示:
![等待队列的示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-5b7604b7-6bc1-49e9-b44f-ec148d8e2ac1.png)
![等待队列的示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-02.png)
同时还有一点需要注意的是:我们可以多次调用lock.newCondition()方法创建多个condition对象,也就是一个lock可以持有多个等待队列。而在之前利用Object的方式实际上是指在**对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列**。示意图如下:
同时还有一点需要注意的是:我们可以多次调用`lock.newCondition()`方法创建多个condition对象,也就是一个lock可以持有多个等待队列。
而在之前利用Object的方式实际上是指在**对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列**。示意图如下:
![AQS持有多个Condition.png](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-8fbcb5ad-8426-4684-abfe-c8018a770482.png)
![AQS持有多个Condition](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-03.png)
如图所示,ConditionObject是AQS的内部类,因此每个ConditionObject能够访问到AQS提供的方法,相当于每个Condition都拥有所属同步器的引用。
###await实现原理
**当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列,如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock**。接下来,我们还是从源码的角度去看,只有熟悉了源码的逻辑我们的理解才是最深的。await()方法源码为:
### await实现原理
**当调用`condition.await()`方法后会使得当前获取lock的线程进入到等待队列,如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock**
接下来,我们还是从源码的角度去看,只有熟悉了源码的逻辑我们的理解才是最深的。await()方法源码为:
```java
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
```
代码的主要逻辑**请看注释**,我们都知道**当当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理**。那么关于这个实现过程我们会有这样几个问题:1. 是怎样将当前线程添加到等待队列中去的?2.释放锁的过程?3.怎样才能从await方法退出?而这段代码的逻辑就是告诉我们这三个问题的答案。具体**请看注释**,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为:
代码的主要逻辑**请看注释**,我们都知道**当当前线程调用`condition.await()`方法后,会使得当前线程释放lock然后加入到等待队列中,直至被`signal/signalAll`后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理**
那么关于这个实现过程我们会有这样几个问题:
1. 是怎样将当前线程添加到等待队列中去的?
2. 释放锁的过程?
3. 怎样才能从await方法退出?
而这段代码的逻辑就是告诉我们这三个问题的答案。具体**请看注释**,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为:
```java
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}
```
这段代码就很容易理解了,将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。就是**通过尾插入的方式将当前线程封装的Node插入到等待队列中即可**,同时可以看出等待队列是一个**不带头结点的链式队列**,之前我们学习AQS时知道同步队列**是一个带头结点的链式队列**,这是两者的一个区别。将当前节点插入到等待对列之后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为:
这段代码就很容易理解了,将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。就是**通过尾插入的方式将当前线程封装的Node插入到等待队列中即可**,同时可以看出等待队列是一个**不带头结点的链式队列**,之前我们学习AQS时知道同步队列**是一个带头结点的链式队列**,这是两者的一个区别。
将当前节点插入到等待对列之后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为:
```java
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
```
这段代码就很容易理解了,**调用AQS的模板方法release方法释放AQS的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程**,如果释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await方法退出?现在回过头再来看await方法有这样一段逻辑:
```java
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
```
很显然,当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,出口就只剩下两个地方:
......@@ -194,9 +221,11 @@ tag:
总结下,就是**当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后** ,这是当前线程退出await方法的前提条件。
当退出while循环后就会调用`acquireQueued(node, savedState)`,该方法的作用是在**自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock)**。这样也说明了**退出await方法必须是已经获得了condition引用(关联)的lock**。到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await方法的理解加深。await方法示意图如下图:
当退出while循环后就会调用`acquireQueued(node, savedState)`,该方法的作用是在**自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock)**。这样也说明了**退出await方法必须是已经获得了condition引用(关联)的lock**
到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await方法的理解加深。await方法示意图如下图:
![await方法示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-95612e8e-e81e-4699-806a-c639dae647eb.png)
![await方法示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-04.png)
......@@ -227,19 +256,22 @@ condition还额外支持了超时机制,使用者可调用方法awaitNanos,awa
这段方法与上面的await方法基本一致,只不过减少了对中断的处理,并省略了reportInterruptAfterWait方法抛被中断的异常。
### signal/signalAll实现原理
**调用condition的signal或者signalAll方法可以将等待队列中等待时间最长的节点移动到同步队列中**,使得该节点能够有机会获得lock。按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition的signal方法是将头节点移动到同步队列中。我们来通过看源码的方式来看这样的猜想是不是对的,signal方法源码为:
```java
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
```
signal方法首先会检测当前线程是否已经获取lock,如果没有获取lock会直接抛出异常,如果获取的话再得到等待队列的头指针引用的节点,之后的操作的doSignal方法也是基于该节点。下面我们来看看doSignal方法做了些什么事情,doSignal方法源码为:
```java
private void doSignal(Node first) {
do {
......@@ -253,32 +285,35 @@ signal方法首先会检测当前线程是否已经获取lock,如果没有获
}
```
具体逻辑请看注释,真正对头节点做处理的逻辑在**transferForSignal**放,该方法源码为:
```java
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//2.将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//2.将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
```
关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。现在我们可以得出结论:**调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出**。signal执行示意图如下图:
关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。现在我们可以得出结论:
![signal执行示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-25606ca9-a108-43a4-a656-9346607186b1.png)
**调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出**。signal执行示意图如下图:
![signal执行示意图](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-05.png)
......@@ -287,82 +322,86 @@ signal方法首先会检测当前线程是否已经获取lock,如果没有获
sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已经知道d**oSignal方法只会对等待队列的头节点进行操作,**,而doSignalAll的源码为:
```java
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
```
该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每一个线程。
## await与signal/signalAll的结合思考
文章开篇提到等待/通知机制,通过使用condition提供的await和signal/signalAll方法就可以实现这种机制,而这种机制能够解决最经典的问题就是“生产者与消费者问题”,关于“生产者消费者问题”之后会用单独的一篇文章进行讲解,这也是面试的高频考点。await和signal和signalAll方法就像一个开关控制着线程A(等待方)和线程B(通知方)。它们之间的关系可以用下面一个图来表现得更加贴切:
![condition下的等待通知机制.png](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-b39062f6-440f-4870-8a44-4f0668db969b.png)
![condition下的等待通知机制.png](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/condition-06.png)
如图,**线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列**
## 一个例子
我们用一个很简单的例子说说condition的用法:
```java
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new waiter());
waiter.start();
Thread signaler = new Thread(new signaler());
signaler.start();
}
static class waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
} finally {
lock.unlock();
}
}
}
static class signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new waiter());
waiter.start();
Thread signaler = new Thread(new signaler());
signaler.start();
}
static class waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
} finally {
lock.unlock();
}
}
}
static class signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
```
输出结果为:
```
Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足
Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足
```
开启了两个线程waiter和signaler,waiter线程开始执行的时候由于条件不满足,执行condition.await方法使该线程进入等待状态同时释放锁,signaler线程获取到锁之后更改条件,并通知所有的等待线程后释放锁。这时,waiter线程获取到锁,并由于signaler线程更改了条件此时相对于waiter来说条件满足,继续执行。
......
......@@ -55,7 +55,9 @@ public class TestVector {
并发容器是Java 5 提供的在多线程编程下用于代替同步容器,针对不同的应用场景进行设计,提高容器的并发访问性,同时定义了线程安全的复合操作。
## 并发容器类介绍
整体架构(列举常用的容器类)
![](https://cdn.jsdelivr.net/gh/itwanger/toBeBetterJavaer/images/thread/map-a6a020a3-4573-4cf8-b5ae-1541ae45801c.png)
......@@ -85,13 +87,13 @@ public interface ConcurrentMap<K, V> extends Map<K, V> {
}
```
**putIfAbsent:**与原有put方法不同的是,putIfAbsent方法中如果插入的key相同,则不替换原有的value值;
**putIfAbsent:** 与原有put方法不同的是,putIfAbsent方法中如果插入的key相同,则不替换原有的value值;
**remove:**与原有remove方法不同的是,新remove方法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;
**remove:** 与原有remove方法不同的是,新remove方法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;
**replace(K,V,V):**增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进行替换操作;
**replace(K,V,V):** 增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进行替换操作;
**replace(K,V):**与上面的replace不同的是,此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;
**replace(K,V):** 与上面的replace不同的是,此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;
#### ConcurrentHashMap类
......@@ -581,7 +583,7 @@ public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
~~~
上面就是参考CopyOnWriteArrayList实现的CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的CopyOnWrite的复制思想,它最适用于“读多写少”的并发场景。
**场景:**假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。
**场景:** 假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。
~~~java
// 黑名单服务
......
......@@ -8,9 +8,7 @@ tag:
# 深入理解Java并发重入锁ReentrantLock
ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,**支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞**。在java关键字synchronized隐式支持重入性,synchronized通过获取自增,释放自减的方式实现重入。与此同时,ReentrantLock还支持**公平锁和非公平锁**两种方式。
## 重入性的实现原理
ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,**支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞**
要想支持重入性,就要解决两个问题:
......@@ -45,6 +43,7 @@ final boolean nonfairTryAcquire(int acquires) {
```
这段代码的逻辑也很简单,具体请看注释。为了支持重入性,在第二步增加了处理逻辑,如果该锁已经被线程所占有了,会继续检查占有线程是否为当前线程,如果是的话,同步状态加1返回true,表示可以再次获取成功。每次重新获取都会对同步状态进行加一的操作,那么释放的时候处理思路是怎样的了?(依然还是以非公平锁为例)核心方法为tryRelease:
```java
protected final boolean tryRelease(int releases) {
//1. 同步状态减1
......@@ -106,12 +105,13 @@ protected final boolean tryAcquire(int acquires) {
return true;
}
return false;
}
}
```
这段代码的逻辑与nonfairTryAcquire基本上一直,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。**公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁**
这段代码的逻辑与nonfairTryAcquire基本上一直,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。
**公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁**
---
......
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册