map.md 51.2 KB
Newer Older
沉默王二's avatar
沉默王二 已提交
1
---
沉默王二's avatar
沉默王二 已提交
2 3 4
title: 聊聊Java的并发集合容器ConcurrentHashMap、阻塞队列和 CopyOnWrite 容器
shortTitle: Java的并发容器
description: Java 的并发集合容器提供了在多线程环境中高效访问和操作的数据结构。这些容器通过内部的同步机制实现了线程安全,使得开发者无需显式同步代码就能在并发环境下安全使用。
沉默王二's avatar
沉默王二 已提交
5 6 7
category:
  - Java核心
tag:
沉默王二's avatar
沉默王二 已提交
8 9 10 11 12
  - Java并发编程
head:
  - - meta
    - name: keywords
      content: Java,并发编程,多线程,Thread,并发集合容器
沉默王二's avatar
沉默王二 已提交
13 14
---

沉默王二's avatar
沉默王二 已提交
15
# 14.19 Java的并发容器
沉默王二's avatar
沉默王二 已提交
16

沉默王二's avatar
沉默王二 已提交
17
java.util 包下提供了一些容器类([集合框架](https://javabetter.cn/collection/gailan.html)),其中 Vector 和 Hashtable 是线程安全的,实现方式比较粗暴,通过在方法上加「[sychronized](https://javabetter.cn/thread/synchronized-1.html)」关键字实现。
沉默王二's avatar
沉默王二 已提交
18

沉默王二's avatar
沉默王二 已提交
19
但即便是 Vector 这样线程安全的类,在应对多线程的复合操作时也需要在客户端继续加锁以保证原子性。来看下面的例子:
沉默王二's avatar
沉默王二 已提交
20 21 22 23 24 25 26 27 28 29

```java
public class TestVector {
	private Vector<String> vector;

	//方法一
	public  Object getLast(Vector vector) {
	    int lastIndex = vector.size() - 1;
	    return vector.get(lastIndex);
	}
沉默王二's avatar
沉默王二 已提交
30

沉默王二's avatar
沉默王二 已提交
31 32 33 34 35
	//方法二
	public  void deleteLast(Vector vector) {
	    int lastIndex = vector.size() - 1;
	    vector.remove(lastIndex);
	}
沉默王二's avatar
沉默王二 已提交
36

沉默王二's avatar
沉默王二 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
	//方法三
	public  Object getLastSysnchronized(Vector vector) {
		synchronized(vector){
			int lastIndex = vector.size() - 1;
			return vector.get(lastIndex);
		}
	}

	//方法四
	public  void deleteLastSysnchronized(Vector vector) {
		synchronized (vector){
			int lastIndex = vector.size() - 1;
			vector.remove(lastIndex);
		}
	}
沉默王二's avatar
沉默王二 已提交
52

沉默王二's avatar
沉默王二 已提交
53 54 55
}
```

沉默王二's avatar
沉默王二 已提交
56
如果方法一和方法二是一个组合的话,那么当方法一获取到了`vector`的 size 之后,方法二已经执行完毕,这样就会导致程序出现错误。
沉默王二's avatar
沉默王二 已提交
57

沉默王二's avatar
沉默王二 已提交
58
如果方法三与方法四组合的话,就还需在内部加锁来保证 `vector` 上的原子性操作。
沉默王二's avatar
沉默王二 已提交
59

沉默王二's avatar
沉默王二 已提交
60
于是并发容器就应用而生了,它们是线程安全的,可以在多线程环境下高效地访问和操作数据,而不需要额外的同步措施。
沉默王二's avatar
沉默王二 已提交
61

沉默王二's avatar
沉默王二 已提交
62
### 并发容器类
沉默王二's avatar
沉默王二 已提交
63

沉默王二's avatar
沉默王二 已提交
64
整体架构如下图所示:
沉默王二's avatar
沉默王二 已提交
65

沉默王二's avatar
沉默王二 已提交
66
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/thread/map-a6a020a3-4573-4cf8-b5ae-1541ae45801c.png)
沉默王二's avatar
沉默王二 已提交
67

沉默王二's avatar
沉默王二 已提交
68
### 并发 Map
沉默王二's avatar
沉默王二 已提交
69

沉默王二's avatar
沉默王二 已提交
70
#### ConcurrentMap 接口
沉默王二's avatar
沉默王二 已提交
71

沉默王二's avatar
沉默王二 已提交
72
ConcurrentMap 接口继承了 Map 接口,在 Map 接口的基础上又定义了四个方法:
沉默王二's avatar
沉默王二 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

```java
public interface ConcurrentMap<K, V> extends Map<K, V> {

    //插入元素
    V putIfAbsent(K key, V value);

    //移除元素
    boolean remove(Object key, Object value);

    //替换元素
    boolean replace(K key, V oldValue, V newValue);

    //替换元素
    V replace(K key, V value);
沉默王二's avatar
沉默王二 已提交
88

沉默王二's avatar
沉默王二 已提交
89 90 91
}
```

沉默王二's avatar
沉默王二 已提交
92
**putIfAbsent:** 与原有 put 方法不同的是,putIfAbsent 如果插入的 key 相同,则不替换原有的 value 值;
沉默王二's avatar
沉默王二 已提交
93

沉默王二's avatar
沉默王二 已提交
94
**remove:** 与原有 remove 方法不同的是,新 remove 方法中增加了对 value 的判断,如果要删除的 key-value 不能与 Map 中原有的 key-value 对应上,则不会删除该元素;
沉默王二's avatar
沉默王二 已提交
95

沉默王二's avatar
沉默王二 已提交
96
**replace(K,V,V):** 增加了对 value 值的判断,如果 key-oldValue 能与 Map 中原有的 key-value 对应上,才进行替换操作;
沉默王二's avatar
沉默王二 已提交
97

沉默王二's avatar
沉默王二 已提交
98
**replace(K,V):** 与上面的 replace 不同的是,此 replace 不会对 Map 中原有的 key-value 进行比较,如果 key 存在则直接替换;
沉默王二's avatar
沉默王二 已提交
99

沉默王二's avatar
沉默王二 已提交
100
#### ConcurrentHashMap
沉默王二's avatar
沉默王二 已提交
101

沉默王二's avatar
沉默王二 已提交
102
ConcurrentHashMap 同 [HashMap](https://javabetter.cn/collection/hashmap.html) 一样,也是基于散列表的 map,但是它提供了一种与 Hashtable 完全不同的加锁策略,提供了更高效的并发性和伸缩性。
沉默王二's avatar
沉默王二 已提交
103

沉默王二's avatar
沉默王二 已提交
104
ConcurrentHashMap 在 JDK 1.7 和 JDK 1.8 中有一些区别。这里我们分开介绍一下。
沉默王二's avatar
沉默王二 已提交
105 106 107

**JDK 1.7**

沉默王二's avatar
沉默王二 已提交
108
ConcurrentHashMap 在 JDK 1.7 中,提供了一种粒度更细的加锁机制,这种机制叫分段锁「Lock Striping」。整个哈希表被分为多个段,每个段都独立锁定。读取操作不需要锁,写入操作仅锁定相关的段。这减小了锁冲突的几率,从而提高了并发性能。
沉默王二's avatar
沉默王二 已提交
109

沉默王二's avatar
沉默王二 已提交
110
这种机制的优点:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能。
沉默王二's avatar
沉默王二 已提交
111 112 113

可以这样理解分段锁,就是**将数据分段,对每一段数据分配一把锁**。当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

沉默王二's avatar
沉默王二 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
有些方法需要跨段,比如 `size()``isEmpty()``containsValue()`,它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完后,再按顺序释放所有段的锁。如下图:

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816155810.png)

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组构成的。Segment 是一种可重入的锁 [ReentrantLock](https://javabetter.cn/thread/reentrantLock.html),HashEntry 则用于存储键值对数据。

一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。

单一的 Segment 结构如下:

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816160155.png)

像这样的 Segment 对象,在 ConcurrentHashMap 集合中有多少个呢?有 2 的 N 次方个,共同保存在一个名为 segments 的数组当中。 因此整个 ConcurrentHashMap 的结构如下:

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816160223.png)

可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。

Case1:不同 Segment 的并发写入(可以并发执行)

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816160301.png)

Case2:同一 Segment 的一写一读(可以并发执行)

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816160316.png)

Case3:同一 Segment 的并发写入
沉默王二's avatar
沉默王二 已提交
141

沉默王二's avatar
沉默王二 已提交
142
![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816160331.png)
沉默王二's avatar
沉默王二 已提交
143

沉默王二's avatar
沉默王二 已提交
144
Segment 的写入是需要上锁的,因此对同一 Segment 的并发写入会被阻塞。
沉默王二's avatar
沉默王二 已提交
145

沉默王二's avatar
沉默王二 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
由此可见,ConcurrentHashMap 中每个 Segment 各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

ConcurrentHashMap 读写过程如下:

get 方法

- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。

put 方法

- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 获取可重入锁
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- 插入或覆盖 HashEntry 对象。
- 释放锁。
沉默王二's avatar
沉默王二 已提交
164 165 166

**JDK 1.8**

沉默王二's avatar
沉默王二 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
而在 JDK 1.8 中,ConcurrentHashMap 主要做了两个优化:

-[HashMap](https://javabetter.cn/collection/hashmap.html) 一样,链表也会在长度达到 8 的时候转化为红黑树,这样可以提升大量冲突时候的查询效率;
- 以某个位置的头结点(链表的头结点或红黑树的 root 结点)为锁,配合自旋+ [CAS](https://javabetter.cn/thread/cas.html) 避免不必要的锁开销,进一步提升并发性能。

![](https://cdn.tobebetterjavaer.com/stutymore/map-20230816155924.png)

相比 JDK1.7 中的 ConcurrentHashMap,JDK1.8 中的 ConcurrentHashMap 取消了 Segment 分段锁,采用 CAS + synchronized 来保证并发安全性,整个容器只分为一个 Segment,即 table 数组。

JDK1.8 中的 ConcurrentHashMap 对节点 Node 类中的共享变量,和 JDK1.7 一样,使用 volatile 关键字,保证多线程操作时,变量的可见性!

```java
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
......
}
```

简单分析一下 JDK1.8 中的 ConcurrentHashMap 的 put 方法:

```java
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key和value都不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 如果tab未初始化或者个数为0,则初始化node数组
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                    new Node<K,V>(hash, key, value, null)))
                // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
                // 如果使用CAS插入元素成功,则break跳出循环,流程结束
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果要插入的元素所在的tab的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素
            tab = helpTransfer(tab, f);
        else {
            // 如果这个tab不为空且不在迁移元素,则锁住这个tab(分段锁)
            // 并查找要插入的元素是否在这个tab中
            // 存在,则替换值(onlyIfAbsent=false)
            // 不存在,则插入到链表结尾或插入树中
            V oldVal = null;
            synchronized (f) {
                // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
                if (tabAt(tab, i) == f) {
                    // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
                    // 那就是tab中的元素使用的是链表方式存储
                    if (fh >= 0) {
                        // tab中元素个数赋值为1
                        binCount = 1;
                        // 遍历整个tab,每次结束binCount加1
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                                // 并退出循环
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果到链表尾部还没有找到元素
                                // 就把它插入到链表结尾并退出循环
                                pred.next = new Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果第一个元素是树节点
                        Node<K,V> p;
                        // tab中元素个数赋值为2
                        binCount = 2;
                        // 调用红黑树的插入方法插入元素
                        // 如果成功插入则返回null
                        // 否则返回寻找到的节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                            // 并退出循环
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
            if (binCount != 0) {
                // 如果链表元素个数达到了8,则尝试树化
                // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数
                // 所以不会重复树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果要插入的元素已经存在,则返回旧值
                if (oldVal != null)
                    return oldVal;
                // 退出外层大循环,流程结束
                break;
            }
        }
    }
    // 成功插入元素,元素个数加1(是否要扩容在这个里面)
    addCount(1L, binCount);
    // 成功插入元素返回null
    return null;
}
```

当进行 put 操作时,流程大概可以分如下几个步骤:
沉默王二's avatar
沉默王二 已提交
302

沉默王二's avatar
沉默王二 已提交
303 304 305 306 307 308 309
- 首先会判断 key、value 是否为空,如果为空就抛异常;
- 接着会判断容器数组是否为空,如果为空就初始化数组;
- 进一步判断,要插入的元素 f,在当前数组下标是否第一次插入,如果是就通过 CAS 方式插入;
- 再接着判断`f.hash == -1`是否成立,如果成立,说明当前 f 是 ForwardingNode 节点,表示有其它线程正在扩容,则一起进行扩容操作;
- 其他的情况,就是把新的 Node 节点按链表或红黑树的方式插入到合适的位置;
- 节点插入完成之后,接着判断链表长度是否超过 8,如果超过 8 个,就将链表转化为红黑树结构;
- 最后,插入完成之后,进行扩容判断。
沉默王二's avatar
沉默王二 已提交
310

沉默王二's avatar
沉默王二 已提交
311
initTable 初始化数组
沉默王二's avatar
沉默王二 已提交
312

沉默王二's avatar
沉默王二 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
```java
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化
            // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环
            // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU
            // 如果下一次循环更新完毕了,则table.length!=0,退出循环
            try {
                // 再次检查table是否为空,防止ABA问题
                if ((tab = table) == null || tab.length == 0) {
                    // 如果sc为0则使用默认值16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 新建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 把tab数组赋值给table
                    table = tab = nt;
                    // 设置sc为数组长度的0.75倍
                    // n - (n >>> 2) = n - n/4 = 0.75n
                    // 可见这里装载因子和扩容门槛都是写死了的
                    // 这也正是没有threshold和loadFactor属性的原因
                    sc = n - (n >>> 2);
                }
            } finally {
                // 把sc赋值给sizeCtl,这时存储的是扩容门槛
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
```
沉默王二's avatar
沉默王二 已提交
351

沉默王二's avatar
沉默王二 已提交
352 353 354
- 使用 CAS 锁控制只有一个线程初始化 tab 数组;
- sizeCtl 在初始化后存储的是扩容门槛;
- 扩容门槛写死的是 tab 数组大小的 0.75 倍,tab 数组大小即 map 的容量,也就是最多存储多少个元素。
沉默王二's avatar
沉默王二 已提交
355

沉默王二's avatar
沉默王二 已提交
356
helpTransfer 协助扩容
沉默王二's avatar
沉默王二 已提交
357

沉默王二's avatar
沉默王二 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
```java
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 如果tab数组不为空,并且当前tab第一个元素为ForwardingNode类型,并且nextTab不为空
    // 说明当前tab已经迁移完毕了,才去帮忙迁移其它tab的元素
    // 扩容时会把旧tab的第一个元素置为ForwardingNode,并让其nextTab指向新tab数组
    if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        // sizeCtl<0,说明正在扩容
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 扩容线程数加1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 当前线程帮忙迁移元素
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}
```
沉默王二's avatar
沉默王二 已提交
385

沉默王二's avatar
沉默王二 已提交
386
操作步骤如下:
沉默王二's avatar
沉默王二 已提交
387

沉默王二's avatar
沉默王二 已提交
388 389 390 391
- 第 1 步,对 table、node 节点、node 节点的 nextTable,进行数据校验;
- 第 2 步,根据数组的 length 得到一个标识符号;
- 第 3 步,进一步校验 nextTab、tab、sizeCtl 值,如果 nextTab 没有被并发修改并且 tab 也没有被并发修改,同时 `sizeCtl < 0`,说明还在扩容;
- 第 4 步,对 sizeCtl 参数值进行分析判断,如果不满足任何一个判断,将`sizeCtl + 1`, 增加了一个线程帮助其扩容。
沉默王二's avatar
沉默王二 已提交
392

沉默王二's avatar
沉默王二 已提交
393
addCount 扩容判断
沉默王二's avatar
沉默王二 已提交
394

沉默王二's avatar
沉默王二 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
```java
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算元素个数
        s = sumCount();
    }
    //检查是否需要扩容,默认check=1,需要检查
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果元素个数达到了扩容门槛,则进行扩容
        // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // rs是扩容时的一个标识
            int rs = resizeStamp(n);
            if (sc < 0) {
                // sc<0说明正在扩容中
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    // 扩容已经完成了,退出循环
                    break;
                // 扩容未完成,则当前线程加入迁移元素中
                // 并把扩容线程数加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                // 进入迁移元素
                transfer(tab, null);
            // 重新计算元素个数
            s = sumCount();
        }
    }
}
```

操作步骤如下:

- 第 1 步,利用 CAS 将方法更新 baseCount 的值
- 第 2 步,检查是否需要扩容,默认 check = 1,需要检查;
- 第 3 步,如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容;
- 第 4 步,如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。

以上就是整个 put 方法的流程,可以从中发现,里面大量的使用了 CAS 方法,CAS 表示比较与替换,里面有 3 个参数,分别是目标内存地址、旧值、新值,每次判断的时候,会将旧值与目标内存地址中的值进行比较,如果相等,就将新值更新到内存地址里,如果不相等,就继续循环,直到操作成功为止!

get 方法
沉默王二's avatar
沉默王二 已提交
457 458

```java
沉默王二's avatar
沉默王二 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算hash
    int h = spread(key.hashCode());
    // 判断数组是否为空,通过key定位到数组下标是否为空;
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果第一个元素就是要找的元素,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            // hash小于0,说明是树或者正在扩容
            // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历整个链表寻找元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
沉默王二's avatar
沉默王二 已提交
484 485
```

沉默王二's avatar
沉默王二 已提交
486
步骤如下:
沉默王二's avatar
沉默王二 已提交
487

沉默王二's avatar
沉默王二 已提交
488 489 490 491
- 第 1 步,判断数组是否为空,通过 key 定位到数组下标是否为空;
- 第 2 步,判断 node 节点第一个元素是不是要找到,如果是直接返回;
- 第 3 步,如果是红黑树结构,就从红黑树里面查询;
- 第 4 步,如果是链表结构,循环遍历判断。
沉默王二's avatar
沉默王二 已提交
492

沉默王二's avatar
沉默王二 已提交
493
remove 方法
沉默王二's avatar
沉默王二 已提交
494

沉默王二's avatar
沉默王二 已提交
495 496 497 498 499
```java
public V remove(Object key) {
    // 调用替换节点方法
    return replaceNode(key, null, null);
}
沉默王二's avatar
沉默王二 已提交
500

沉默王二's avatar
沉默王二 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
final V replaceNode(Object key, V value, Object cv) {
    // 计算hash
    int hash = spread(key.hashCode());
    // 循环遍历数组
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //校验参数
        if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            // 如果正在扩容中,协助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 标记是否处理过
            boolean validated = false;
            //用 synchronized 同步锁,保证并发时元素移除安全
            synchronized (f) {
                // 再次验证当前tab元素是否被修改过
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // fh>=0表示是链表节点
                        validated = true;
                        // 遍历链表寻找目标节点
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            // 遍历到链表尾部还没找到元素,跳出循环
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果是树节点
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        // 遍历树找到了目标节点
                        if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            // 如果处理过,不管有没有找到元素都返回
            if (validated) {
                // 如果找到了元素,返回其旧值
                if (oldVal != null) {
                    // 如果要替换的值为空,元素个数减1
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    // 没找到元素返回空
    return null;
}
```

步骤如下:

- 第 1 步,循环遍历数组,接着校验参数;
- 第 2 步,判断是否有别的线程正在扩容,如果是一起扩容;
- 第 3 步,用 synchronized 同步锁,保证并发时元素移除安全;
- 第 4 步,因为 `check= -1`,所以不会进行扩容操作,利用 CAS 操作修改 baseCount 值。

后面我们还会单独再开一篇来详细介绍 [ConcurrentHashMap](https://javabetter.cn/thread/ConcurrentHashMap.html),这篇就先分析到这里。
沉默王二's avatar
沉默王二 已提交
597

沉默王二's avatar
沉默王二 已提交
598
#### ConcurrentSkipListMap
沉默王二's avatar
沉默王二 已提交
599

沉默王二's avatar
沉默王二 已提交
600
ConcurrentNavigableMap 接口继承了 NavigableMap 接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。
沉默王二's avatar
沉默王二 已提交
601

沉默王二's avatar
沉默王二 已提交
602
ConcurrentNavigableMap 接口的主要实现类是 ConcurrentSkipListMap 类。从名字上来看,它的底层使用的是跳表(SkipList)。跳表是一种”空间换时间“的数据结构,可以使用 [CAS](https://javabetter.cn/thread/cas.html) 来保证并发安全性。
沉默王二's avatar
沉默王二 已提交
603

沉默王二's avatar
沉默王二 已提交
604
与 ConcurrentHashMap 的读密集操作相比,ConcurrentSkipListMap 的读和写操作的性能相对较低。这是由其数据结构导致的,因为跳表的插入和删除需要更复杂的指针操作。然而,ConcurrentSkipListMap 提供了有序性,这是 ConcurrentHashMap 所没有的。
沉默王二's avatar
沉默王二 已提交
605

沉默王二's avatar
沉默王二 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
ConcurrentSkipListMap 适用于需要线程安全的同时又需要元素有序的场合。如果不需要有序,ConcurrentHashMap 可能是更好的选择,因为它通常具有更高的性能。

[時光以北这篇 ConcurrentSkipListMap 讲的很不错](https://juejin.cn/post/6844903958499033095),可以学习。

### 并发 Queue

JDK 并没有提供线程安全的 List 类,因为对 List 来说,**很难去开发一个通用并且没有并发瓶颈的线程安全的 List**。因为即使简单的读操作,比如 `contains()`,也需要再搜索的时候锁住整个 list。

所以退一步,JDK 提供了队列和双端队列的线程安全类:ConcurrentLinkedQueue 和 ConcurrentLinkedDeque。因为队列相对于 List 来说,有更多的限制。这两个类是使用 CAS 来实现线程安全的。

我们会在后面单独开一篇来详细介绍[ConcurrentLinkedQueue](https://javabetter.cn/thread/ConcurrentLinkedQueue.html)

### 并发 Set

ConcurrentSkipListSet 是线程安全的有序集合。底层是使用 ConcurrentSkipListMap 来实现。

谷歌的 [Guava](https://javabetter.cn/common-tool/guava.html)实现了一个线程安全的 ConcurrentHashSet:

```java
Set<String> s = Sets.newConcurrentHashSet();
```

### 阻塞队列

我们假设一种场景,[生产者一直生产资源,消费者一直消费资源](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的**生产者-消费者模式**

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

我们自己 coding 实现这个模式的时候,因为需要让**多个线程操作共享变量**(即资源),所以很容易引发**线程安全问题**,造成**重复消费****死锁**,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个**等待-唤醒**逻辑都需要自己实现。

这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列(BlockingQueue),**你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。**

> BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了**线程安全的队列访问方式**,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

BlockingQueue 一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。**BlockingQueue 就是存放元素的容器**

#### BlockingQueue 的操作方法
沉默王二's avatar
沉默王二 已提交
643 644 645 646 647 648 649 650 651

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

| 方法\处理方式 | 抛出异常  | 返回特殊值 |  一直阻塞  |      超时退出      |
| :-----------: | :-------: | :--------: | :--------: | :----------------: |
|   插入方法    |  add(e)   |  offer(e)  | **put(e)** | offer(e,time,unit) |
|   移除方法    | remove()  |   poll()   | **take()** |  poll(time,unit)   |
|   检查方法    | element() |   peek()   |     -      |         -          |

沉默王二's avatar
沉默王二 已提交
652 653 654 655 656 657
- 抛出异常:如果操作无法立即执行,会抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出 `IllegalStateException(“Queue full”)`异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
- 返回特殊值:如果操作无法立即执行,会返回一个特殊值,通常是 true / false。
- 一直阻塞:如果操作无法立即执行,则一直阻塞或者响应中断。
- 超时退出:如果操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

**注意:**
沉默王二's avatar
沉默王二 已提交
658

沉默王二's avatar
沉默王二 已提交
659 660
- 不能往阻塞队列中插入 null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用 `remove(o)`可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
沉默王二's avatar
沉默王二 已提交
661

沉默王二's avatar
沉默王二 已提交
662
我们会在后面单独开一篇[BlockingQueue](https://javabetter.cn/thread/BlockingQueue.html)来细讲。
沉默王二's avatar
沉默王二 已提交
663

沉默王二's avatar
沉默王二 已提交
664
#### BlockingQueue 的实现类
沉默王二's avatar
沉默王二 已提交
665 666 667

**ArrayBlockingQueue**

沉默王二's avatar
沉默王二 已提交
668
**数组**结构组成的**有界**阻塞队列。内部结构是数组,具有数组的特性。
沉默王二's avatar
沉默王二 已提交
669

沉默王二's avatar
沉默王二 已提交
670
```java
沉默王二's avatar
沉默王二 已提交
671
public ArrayBlockingQueue(int capacity, boolean fair){
沉默王二's avatar
沉默王二 已提交
672
 //..省略代码
沉默王二's avatar
沉默王二 已提交
673
}
沉默王二's avatar
沉默王二 已提交
674
```
沉默王二's avatar
沉默王二 已提交
675

沉默王二's avatar
沉默王二 已提交
676
可以初始化队列大小,一旦初始化将不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是**非公平锁**
沉默王二's avatar
沉默王二 已提交
677 678 679

**LinkedBlockingQueue**

沉默王二's avatar
沉默王二 已提交
680
**链表**结构组成的**有界**阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是`Integer.MAX_VALUE`,也可以指定大小。此队列按照**先进先出**的原则对元素进行排序。
沉默王二's avatar
沉默王二 已提交
681 682 683

**DelayQueue**

沉默王二's avatar
沉默王二 已提交
684 685 686
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。注入其中的元素必须实现 `java.util.concurrent.Delayed` 接口。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
沉默王二's avatar
沉默王二 已提交
687

沉默王二's avatar
沉默王二 已提交
688
**PriorityBlockingQueue**
沉默王二's avatar
沉默王二 已提交
689

沉默王二's avatar
沉默王二 已提交
690
基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。
沉默王二's avatar
沉默王二 已提交
691

沉默王二's avatar
沉默王二 已提交
692
> 网上大部分博客上**PriorityBlockingQueue**为公平锁,其实是不对的,查阅源码(感谢 github:**ambition0802**同学的指出):
沉默王二's avatar
沉默王二 已提交
693

沉默王二's avatar
沉默王二 已提交
694
```java
沉默王二's avatar
沉默王二 已提交
695
public PriorityBlockingQueue(int initialCapacity,
沉默王二's avatar
沉默王二 已提交
696 697 698 699
                                 Comparator<? super E> comparator) {
    this.lock = new ReentrantLock(); //默认构造方法-非公平锁
    ...//其余代码略
}
沉默王二's avatar
沉默王二 已提交
700 701 702 703
```

**SynchronousQueue**

沉默王二's avatar
沉默王二 已提交
704
这个队列比较特殊,**没有任何内部容量**,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。
沉默王二's avatar
沉默王二 已提交
705

沉默王二's avatar
沉默王二 已提交
706
需要区别容量为 1 的 ArrayBlockingQueue、LinkedBlockingQueue。
沉默王二's avatar
沉默王二 已提交
707

沉默王二's avatar
沉默王二 已提交
708
以下方法的返回值,可以帮助理解这个队列:
沉默王二's avatar
沉默王二 已提交
709

沉默王二's avatar
沉默王二 已提交
710 711 712 713 714 715 716 717
- `iterator()` 永远返回空,因为里面没有东西
- `peek()` 永远返回 null
- `put()` 往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。
- `offer()` 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。
- `take()` 取出并且 remove 掉 queue 里的 element,取不到东西他会一直等。
- `poll()` 取出并且 remove 掉 queue 里的 element,只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。
- `isEmpty()` 永远返回 true
- `remove()&removeAll()` 永远返回 false
沉默王二's avatar
沉默王二 已提交
718 719 720

**注意**

沉默王二's avatar
沉默王二 已提交
721
**PriorityBlockingQueue**不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时阻塞数据的消费者。因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。**对于使用默认大小的**LinkedBlockingQueue**也是一样的。
沉默王二's avatar
沉默王二 已提交
722

沉默王二's avatar
沉默王二 已提交
723
#### 阻塞队列的原理
沉默王二's avatar
沉默王二 已提交
724

沉默王二's avatar
沉默王二 已提交
725
阻塞队列的原理很简单,利用了 Lock 锁的多条件([Condition](https://javabetter.cn/thread/condition.html))阻塞控制。接下来我们分析 ArrayBlockingQueue JDK 1.8 的源码。
沉默王二's avatar
沉默王二 已提交
726

沉默王二's avatar
沉默王二 已提交
727
首先是构造方法,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是 notEmpty 和 notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是 put 操作时,给他加上监视器 notFull,标记这个线程是一个生产者;当线程是 take 操作时,给他加上监视器 notEmpty,标记这个线程是消费者。
沉默王二's avatar
沉默王二 已提交
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742

```java
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
沉默王二's avatar
沉默王二 已提交
743
private final Condition notFull;
沉默王二's avatar
沉默王二 已提交
744 745 746 747 748 749 750 751 752

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
```

沉默王二's avatar
沉默王二 已提交
753
**put 操作的源码**
沉默王二's avatar
沉默王二 已提交
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786

```java
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执行while循环。
            notFull.await();
        // 3.如果没有满,则进入队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒一个等待的线程
    notEmpty.signal();
}

```

沉默王二's avatar
沉默王二 已提交
787
总结 put 的流程:
沉默王二's avatar
沉默王二 已提交
788

沉默王二's avatar
沉默王二 已提交
789 790 791 792
1. 所有执行 put 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
2. 判断阻塞队列是否满了,如果满了,则调用 await 方法阻塞这个线程,并标记为 notFull(生产者)线程,同时释放 lock 锁,等待被消费者线程唤醒。
3. 如果没有满,则调用 enqueue 方法将元素 put 进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
4. 唤醒一个标记为 notEmpty(消费者)的线程。
沉默王二's avatar
沉默王二 已提交
793

沉默王二's avatar
沉默王二 已提交
794
**take 操作的源码**
沉默王二's avatar
沉默王二 已提交
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824

```java
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
```

沉默王二's avatar
沉默王二 已提交
825
take 操作和 put 操作类似,总结一下 take 操作的流程:
沉默王二's avatar
沉默王二 已提交
826

沉默王二's avatar
沉默王二 已提交
827 828 829 830
1. 所有执行 take 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
2. 判断阻塞队列是否为空,如果是空,则调用 await 方法阻塞这个线程,并标记为 notEmpty(消费者)线程,同时释放 lock 锁,等待被生产者线程唤醒。
3. 如果没有空,则调用 dequeue 方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
4. 唤醒一个标记为 notFull(生产者)的线程。
沉默王二's avatar
沉默王二 已提交
831 832 833

**注意**

沉默王二's avatar
沉默王二 已提交
834 835 836
1. put 和 take 操作都需要**先获取锁**,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
2. 就算拿到锁了之后,也**不一定**会顺利进行 put/take 操作,需要判断**队列是否可用**(是否满/空),如果不可用,则会被阻塞,**并释放锁**
3. 在第 2 点被阻塞的线程会被唤醒,但是在唤醒之后,**依然需要拿到锁**才能继续往下执行,否则,自旋拿锁,拿到锁了再 while 判断队列是否可用(这也是为什么不用 if 判断,而使用 while 判断的原因)。
沉默王二's avatar
沉默王二 已提交
837

沉默王二's avatar
沉默王二 已提交
838
#### 示例和使用场景
沉默王二's avatar
沉默王二 已提交
839

沉默王二's avatar
沉默王二 已提交
840
生产者-消费者模型:
沉默王二's avatar
沉默王二 已提交
841 842 843 844 845

```java
public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
沉默王二's avatar
沉默王二 已提交
846

沉默王二's avatar
沉默王二 已提交
847 848 849 850
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
沉默王二's avatar
沉默王二 已提交
851

沉默王二's avatar
沉默王二 已提交
852 853 854
        producer.start();
        consumer.start();
    }
沉默王二's avatar
沉默王二 已提交
855

沉默王二's avatar
沉默王二 已提交
856
    class Consumer extends Thread{
沉默王二's avatar
沉默王二 已提交
857

沉默王二's avatar
沉默王二 已提交
858 859 860 861
        @Override
        public void run() {
            consume();
        }
沉默王二's avatar
沉默王二 已提交
862

沉默王二's avatar
沉默王二 已提交
863 864 865 866 867 868 869 870 871 872 873
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
沉默王二's avatar
沉默王二 已提交
874

沉默王二's avatar
沉默王二 已提交
875
    class Producer extends Thread{
沉默王二's avatar
沉默王二 已提交
876

沉默王二's avatar
沉默王二 已提交
877 878 879 880
        @Override
        public void run() {
            produce();
        }
沉默王二's avatar
沉默王二 已提交
881

沉默王二's avatar
沉默王二 已提交
882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
```

下面是这个例子的输出片段:

```
从队列取走一个元素,队列剩余0个元素
从队列取走一个元素,队列剩余0个元素
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:8
向队列取中插入一个元素,队列剩余空间:7
向队列取中插入一个元素,队列剩余空间:6
向队列取中插入一个元素,队列剩余空间:5
向队列取中插入一个元素,队列剩余空间:4
向队列取中插入一个元素,队列剩余空间:3
向队列取中插入一个元素,队列剩余空间:2
向队列取中插入一个元素,队列剩余空间:1
向队列取中插入一个元素,队列剩余空间:0
从队列取走一个元素,队列剩余1个元素
从队列取走一个元素,队列剩余9个元素
```

沉默王二's avatar
沉默王二 已提交
917
注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于**System.out.println 语句没有锁**。考虑到这样的情况:线程 1 在执行完 put/take 操作后立即失去 CPU 时间片,然后切换到线程 2 执行 put/take 操作,执行完毕后回到线程 1 的 System.out.println 语句并输出,发现这个时候阻塞队列的 size 已经被线程 2 改变了,所以这个时候输出的 size 并不是当时线程 1 执行完 put/take 操作之后阻塞队列的 size,但可以确保的是 size 不会超过 10 个。实际上使用阻塞队列是没有问题的。
沉默王二's avatar
沉默王二 已提交
918 919 920 921

#### 线程池中使用阻塞队列

```java
沉默王二's avatar
沉默王二 已提交
922
public ThreadPoolExecutor(int corePoolSize,
沉默王二's avatar
沉默王二 已提交
923 924 925 926
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
沉默王二's avatar
沉默王二 已提交
927
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
沉默王二's avatar
沉默王二 已提交
928 929 930 931
             Executors.defaultThreadFactory(), defaultHandler);
}
```

沉默王二's avatar
沉默王二 已提交
932 933 934
Java 中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用 Executors 类中已经提供的线程池,还是自己通过 ThreadPoolExecutor 实现线程池,都会更加得心应手。

> 注:上面提到的生产者-消费者模式,大家可以参考[生产者-消费者模型](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),可以更好的理解阻塞队列。
沉默王二's avatar
沉默王二 已提交
935

沉默王二's avatar
沉默王二 已提交
936
### CopyOnWrite 容器
沉默王二's avatar
沉默王二 已提交
937

沉默王二's avatar
沉默王二 已提交
938
在聊 CopyOnWrite 容器之前我们先来谈谈什么是 CopyOnWrite 机制,CopyOnWrite 是计算机设计领域的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制。
沉默王二's avatar
沉默王二 已提交
939

沉默王二's avatar
沉默王二 已提交
940
什么是写入时复制呢?
沉默王二's avatar
沉默王二 已提交
941

沉默王二's avatar
沉默王二 已提交
942
就是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。
沉默王二's avatar
沉默王二 已提交
943

沉默王二's avatar
沉默王二 已提交
944
CopyOnWrite 容器即**写时复制的容器**,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行 copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。
沉默王二's avatar
沉默王二 已提交
945

沉默王二's avatar
沉默王二 已提交
946 947 948
这样做的好处在于,我们可以在并发的场景下对容器进行"读操作"而不需要"加锁",从而达到读写分离的目的。从 JDK 1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,分别是 [CopyOnWriteArrayList](https://javabetter.cn/thread/CopyOnWriteArrayList.html) 和 CopyOnWriteArraySet。

#### CopyOnWriteArrayList
沉默王二's avatar
沉默王二 已提交
949 950

**优点**
沉默王二's avatar
沉默王二 已提交
951 952 953 954

CopyOnWriteArrayList 经常被用于“读多写少”的并发场景,是因为 CopyOnWriteArrayList 无需任何同步措施,大大增强了读的性能。在 Java 中遍历线程非安全的 List(如:[ArrayList 和 LinkedList](https://javabetter.cn/collection/list-war-2.html))的时候,若中途有别的线程对 List 容器进行了修改,那么会抛出 ConcurrentModificationException 异常。

CopyOnWriteArrayList 由于其"读写分离",遍历和修改操作分别作用在不同的 List 容器,所以在使用迭代器遍历的时候,不会抛出异常。
沉默王二's avatar
沉默王二 已提交
955 956 957

**缺点**

沉默王二's avatar
沉默王二 已提交
958
第一个缺点是 CopyOnWriteArrayList 每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁 Full GC。比如这些对象占用的内存 200M 左右,那么再写入 100M 数据进去,内存就会多占用 300M。
沉默王二's avatar
沉默王二 已提交
959

沉默王二's avatar
沉默王二 已提交
960
第二个缺点是 CopyOnWriteArrayList 由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。
沉默王二's avatar
沉默王二 已提交
961

沉默王二's avatar
沉默王二 已提交
962 963 964
现在我们来看一下 CopyOnWriteArrayList 的 add 操作源码,它的逻辑很清晰,就是先把原容器进行 copy,然后在新的副本上进行“写操作”,最后再切换引用,在此过程中是加了锁的。

```java
沉默王二's avatar
沉默王二 已提交
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983
public boolean add(E e) {
    // ReentrantLock加锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝原容器,长度为原容器长度加一
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 在新副本上执行添加操作
        newElements[len] = e;
        // 将原容器引用指向新副本
        setArray(newElements);
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}
沉默王二's avatar
沉默王二 已提交
984 985 986 987 988
```

我们再来看一下 remove 操作的源码,remove 的逻辑是将要 remove 元素之外的其他元素拷贝到新的副本中,然后再将原容器的引用指向新的副本中,因为 remove 操作也是“写操作”所以也是要加锁的。

```java
沉默王二's avatar
沉默王二 已提交
989
public E remove(int index) {
沉默王二's avatar
沉默王二 已提交
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;
        if (numMoved == 0)
            // 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            // 否则,将要删除元素之外的其他元素拷贝到新副本中,并切换引用
            Object[] newElements = new Object[len - 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                                numMoved);
            setArray(newElements);
沉默王二's avatar
沉默王二 已提交
1008
        }
沉默王二's avatar
沉默王二 已提交
1009 1010 1011 1012
        return oldValue;
    } finally {
        // 解锁
        lock.unlock();
沉默王二's avatar
沉默王二 已提交
1013
    }
沉默王二's avatar
沉默王二 已提交
1014 1015 1016 1017 1018 1019
}
```

我们再来看看 CopyOnWriteArrayList 效率最高的读操作的源码

```java
沉默王二's avatar
沉默王二 已提交
1020 1021 1022
public E get(int index) {
    return get(getArray(), index);
}
沉默王二's avatar
沉默王二 已提交
1023 1024 1025
```

```java
沉默王二's avatar
沉默王二 已提交
1026 1027 1028
 private E get(Object[] a, int index) {
     return (E) a[index];
 }
沉默王二's avatar
沉默王二 已提交
1029
```
沉默王二's avatar
沉默王二 已提交
1030

沉默王二's avatar
沉默王二 已提交
1031
由上可见“读操作”是没有加锁的,直接读取。
沉默王二's avatar
沉默王二 已提交
1032

沉默王二's avatar
沉默王二 已提交
1033
#### CopyOnWrite 使用
沉默王二's avatar
沉默王二 已提交
1034

沉默王二's avatar
沉默王二 已提交
1035
接下来,我们结合具体业务场景来实现一个 CopyOnWriteMap 的并发容器并且使用它。
沉默王二's avatar
沉默王二 已提交
1036

沉默王二's avatar
沉默王二 已提交
1037
```java
沉默王二's avatar
沉默王二 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
import java.util.Collection;
import java.util.Map;
import java.util.Set;

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;

    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }

    public V put(K key, V value) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }

    public V get(Object key) {
        return internalMap.get(key);
    }

    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}
沉默王二's avatar
沉默王二 已提交
1070 1071 1072
```

上面就是参考 CopyOnWriteArrayList 实现的 CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的 CopyOnWrite 的复制思想,它最适用于“读多写少”的并发场景。
沉默王二's avatar
沉默王二 已提交
1073

沉默王二's avatar
沉默王二 已提交
1074
**场景:** 假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。
沉默王二's avatar
沉默王二 已提交
1075

沉默王二's avatar
沉默王二 已提交
1076
```java
沉默王二's avatar
沉默王二 已提交
1077 1078 1079
// 黑名单服务
public class BlackListServiceImpl {
    // 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
沉默王二's avatar
沉默王二 已提交
1080
    private static CopyOnWriteMap<String, Boolean> blackListMap =
沉默王二's avatar
沉默王二 已提交
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
        new CopyOnWriteMap<String, Boolean>(1000);

    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }

    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }

    /**
     * 批量添加黑名单
     * (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
     * 如使用上面代码里的addBlackList方法)
     * @param ids
     */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }

}
沉默王二's avatar
沉默王二 已提交
1102
```
沉默王二's avatar
沉默王二 已提交
1103

沉默王二's avatar
沉默王二 已提交
1104
这里需要各位小伙伴特别特别注意一个问题,此处的场景是每晚凌晨“黑名单”定时更新,原因是 CopyOnWrite 容器有**数据一致性**的问题,它只能保证**最终数据一致性**
沉默王二's avatar
沉默王二 已提交
1105

沉默王二's avatar
沉默王二 已提交
1106
所以如果我们希望写入的数据马上能准确地读取,请不要使用 CopyOnWrite 容器。
沉默王二's avatar
沉默王二 已提交
1107

沉默王二's avatar
沉默王二 已提交
1108
### 总结
沉默王二's avatar
jvm  
沉默王二 已提交
1109

沉默王二's avatar
沉默王二 已提交
1110
本文主要介绍了并发包中的三个重要的容器类,Map、阻塞队列和 CopyOnWrite 容器,Map 用于存储键值对,阻塞队列用于生产者-消费者模型,而 CopyOnWrite 容器用于“读多写少”的并发场景。
沉默王二's avatar
沉默王二 已提交
1111

沉默王二's avatar
沉默王二 已提交
1112 1113 1114
> 编辑:沉默王二,部分内容来源于朋友小七萤火虫开源的这个仓库:[深入浅出 Java 多线程](http://concurrent.redspider.group/),部分内容来自于这篇[初念初恋-ConcurrentHashMap](https://juejin.cn/post/7064061605185028110),图片画的特别漂亮。

---
沉默王二's avatar
沉默王二 已提交
1115

沉默王二's avatar
沉默王二 已提交
1116
GitHub 上标星 9000+ 的开源知识库《[二哥的 Java 进阶之路](https://github.com/itwanger/toBeBetterJavaer)》第一版 PDF 终于来了!包括 Java 基础语法、数组&字符串、OOP、集合框架、Java IO、异常处理、Java 新特性、网络编程、NIO、并发编程、JVM 等等,共计 32 万余字,可以说是通俗易懂、风趣幽默……详情戳:[太赞了,GitHub 上标星 9000+ 的 Java 教程](https://javabetter.cn/overview/)
沉默王二's avatar
沉默王二 已提交
1117 1118

微信搜 **沉默王二** 或扫描下方二维码关注二哥的原创公众号沉默王二,回复 **222** 即可免费领取。
沉默王二's avatar
沉默王二 已提交
1119

沉默王二's avatar
沉默王二 已提交
1120
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/gongzhonghao.png)