--- title: 聊聊Java的并发集合容器ConcurrentHashMap、阻塞队列和 CopyOnWrite 容器 shortTitle: Java的并发容器 description: Java 的并发集合容器提供了在多线程环境中高效访问和操作的数据结构。这些容器通过内部的同步机制实现了线程安全,使得开发者无需显式同步代码就能在并发环境下安全使用。 category: - Java核心 tag: - Java并发编程 head: - - meta - name: keywords content: Java,并发编程,多线程,Thread,并发集合容器 --- # 14.19 Java的并发容器 java.util 包下提供了一些容器类([集合框架](https://javabetter.cn/collection/gailan.html)),其中 Vector 和 Hashtable 是线程安全的,实现方式比较粗暴,通过在方法上加「[sychronized](https://javabetter.cn/thread/synchronized-1.html)」关键字实现。 但即便是 Vector 这样线程安全的类,在应对多线程的复合操作时也需要在客户端继续加锁以保证原子性。来看下面的例子: ```java public class TestVector { private Vector vector; //方法一 public Object getLast(Vector vector) { int lastIndex = vector.size() - 1; return vector.get(lastIndex); } //方法二 public void deleteLast(Vector vector) { int lastIndex = vector.size() - 1; vector.remove(lastIndex); } //方法三 public Object getLastSysnchronized(Vector vector) { synchronized(vector){ int lastIndex = vector.size() - 1; return vector.get(lastIndex); } } //方法四 public void deleteLastSysnchronized(Vector vector) { synchronized (vector){ int lastIndex = vector.size() - 1; vector.remove(lastIndex); } } } ``` 如果方法一和方法二是一个组合的话,那么当方法一获取到了`vector`的 size 之后,方法二已经执行完毕,这样就会导致程序出现错误。 如果方法三与方法四组合的话,就还需在内部加锁来保证 `vector` 上的原子性操作。 于是并发容器就应用而生了,它们是线程安全的,可以在多线程环境下高效地访问和操作数据,而不需要额外的同步措施。 ### 并发容器类 整体架构如下图所示: ![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/thread/map-a6a020a3-4573-4cf8-b5ae-1541ae45801c.png) ### 并发 Map #### ConcurrentMap 接口 ConcurrentMap 接口继承了 Map 接口,在 Map 接口的基础上又定义了四个方法: ```java public interface ConcurrentMap extends Map { //插入元素 V putIfAbsent(K key, V value); //移除元素 boolean remove(Object key, Object value); //替换元素 boolean replace(K key, V oldValue, V newValue); //替换元素 V replace(K key, V value); } ``` **putIfAbsent:** 与原有 put 方法不同的是,putIfAbsent 如果插入的 key 相同,则不替换原有的 value 值; **remove:** 与原有 remove 方法不同的是,新 remove 方法中增加了对 value 的判断,如果要删除的 key-value 不能与 Map 中原有的 key-value 对应上,则不会删除该元素; **replace(K,V,V):** 增加了对 value 值的判断,如果 key-oldValue 能与 Map 中原有的 key-value 对应上,才进行替换操作; **replace(K,V):** 与上面的 replace 不同的是,此 replace 不会对 Map 中原有的 key-value 进行比较,如果 key 存在则直接替换; #### ConcurrentHashMap ConcurrentHashMap 同 [HashMap](https://javabetter.cn/collection/hashmap.html) 一样,也是基于散列表的 map,但是它提供了一种与 Hashtable 完全不同的加锁策略,提供了更高效的并发性和伸缩性。 简单分析一下 JDK1.8 中的 ConcurrentHashMap 的 put 方法: ```java public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value都不能为null if (key == null || value == null) throw new NullPointerException(); // 计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 如果tab未初始化或者个数为0,则初始化node数组 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作 // 如果使用CAS插入元素成功,则break跳出循环,流程结束 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 如果要插入的元素所在的tab的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素 tab = helpTransfer(tab, f); else { // 如果这个tab不为空且不在迁移元素,则锁住这个tab(分段锁) // 并查找要插入的元素是否在这个tab中 // 存在,则替换值(onlyIfAbsent=false) // 不存在,则插入到链表结尾或插入树中 V oldVal = null; synchronized (f) { // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过 if (tabAt(tab, i) == f) { // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树) // 那就是tab中的元素使用的是链表方式存储 if (fh >= 0) { // tab中元素个数赋值为1 binCount = 1; // 遍历整个tab,每次结束binCount加1 for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false) // 并退出循环 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { // 如果到链表尾部还没有找到元素 // 就把它插入到链表结尾并退出循环 pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 如果第一个元素是树节点 Node p; // tab中元素个数赋值为2 binCount = 2; // 调用红黑树的插入方法插入元素 // 如果成功插入则返回null // 否则返回寻找到的节点 if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false) // 并退出循环 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 如果binCount不为0,说明成功插入了元素或者寻找到了元素 if (binCount != 0) { // 如果链表元素个数达到了8,则尝试树化 // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数 // 所以不会重复树化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果要插入的元素已经存在,则返回旧值 if (oldVal != null) return oldVal; // 退出外层大循环,流程结束 break; } } } // 成功插入元素,元素个数加1(是否要扩容在这个里面) addCount(1L, binCount); // 成功插入元素返回null return null; } ``` 当进行 put 操作时,流程大概可以分如下几个步骤: - 首先会判断 key、value 是否为空,如果为空就抛异常; - 接着会判断容器数组是否为空,如果为空就初始化数组; - 进一步判断,要插入的元素 f,在当前数组下标是否第一次插入,如果是就通过 CAS 方式插入; - 再接着判断`f.hash == -1`是否成立,如果成立,说明当前 f 是 ForwardingNode 节点,表示有其它线程正在扩容,则一起进行扩容操作; - 其他的情况,就是把新的 Node 节点按链表或红黑树的方式插入到合适的位置; - 节点插入完成之后,接着判断链表长度是否超过 8,如果超过 8 个,就将链表转化为红黑树结构; - 最后,插入完成之后,进行扩容判断。 initTable 初始化数组 ```java private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化 // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环 // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU // 如果下一次循环更新完毕了,则table.length!=0,退出循环 try { // 再次检查table是否为空,防止ABA问题 if ((tab = table) == null || tab.length == 0) { // 如果sc为0则使用默认值16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 新建数组 @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; // 把tab数组赋值给table table = tab = nt; // 设置sc为数组长度的0.75倍 // n - (n >>> 2) = n - n/4 = 0.75n // 可见这里装载因子和扩容门槛都是写死了的 // 这也正是没有threshold和loadFactor属性的原因 sc = n - (n >>> 2); } } finally { // 把sc赋值给sizeCtl,这时存储的是扩容门槛 sizeCtl = sc; } break; } } return tab; } ``` - 使用 CAS 锁控制只有一个线程初始化 tab 数组; - sizeCtl 在初始化后存储的是扩容门槛; - 扩容门槛写死的是 tab 数组大小的 0.75 倍,tab 数组大小即 map 的容量,也就是最多存储多少个元素。 helpTransfer 协助扩容 ```java final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; // 如果tab数组不为空,并且当前tab第一个元素为ForwardingNode类型,并且nextTab不为空 // 说明当前tab已经迁移完毕了,才去帮忙迁移其它tab的元素 // 扩容时会把旧tab的第一个元素置为ForwardingNode,并让其nextTab指向新tab数组 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0,说明正在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 当前线程帮忙迁移元素 transfer(tab, nextTab); break; } } return nextTab; } return table; } ``` 操作步骤如下: - 第 1 步,对 table、node 节点、node 节点的 nextTable,进行数据校验; - 第 2 步,根据数组的 length 得到一个标识符号; - 第 3 步,进一步校验 nextTab、tab、sizeCtl 值,如果 nextTab 没有被并发修改并且 tab 也没有被并发修改,同时 `sizeCtl < 0`,说明还在扩容; - 第 4 步,对 sizeCtl 参数值进行分析判断,如果不满足任何一个判断,将`sizeCtl + 1`, 增加了一个线程帮助其扩容。 addCount 扩容判断 ```java private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; // 计算元素个数 s = sumCount(); } //检查是否需要扩容,默认check=1,需要检查 if (check >= 0) { Node[] tab, nt; int n, sc; // 如果元素个数达到了扩容门槛,则进行扩容 // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs是扩容时的一个标识 int rs = resizeStamp(n); if (sc < 0) { // sc<0说明正在扩容中 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 扩容已经完成了,退出循环 break; // 扩容未完成,则当前线程加入迁移元素中 // 并把扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 进入迁移元素 transfer(tab, null); // 重新计算元素个数 s = sumCount(); } } } ``` 操作步骤如下: - 第 1 步,利用 CAS 将方法更新 baseCount 的值 - 第 2 步,检查是否需要扩容,默认 check = 1,需要检查; - 第 3 步,如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容; - 第 4 步,如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。 以上就是整个 put 方法的流程,可以从中发现,里面大量的使用了 CAS 方法,CAS 表示比较与替换,里面有 3 个参数,分别是目标内存地址、旧值、新值,每次判断的时候,会将旧值与目标内存地址中的值进行比较,如果相等,就将新值更新到内存地址里,如果不相等,就继续循环,直到操作成功为止! get 方法 ```java public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; // 计算hash int h = spread(key.hashCode()); // 判断数组是否为空,通过key定位到数组下标是否为空; if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果第一个元素就是要找的元素,直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) // hash小于0,说明是树或者正在扩容 // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式 return (p = e.find(h, key)) != null ? p.val : null; // 遍历整个链表寻找元素 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } ``` 步骤如下: - 第 1 步,判断数组是否为空,通过 key 定位到数组下标是否为空; - 第 2 步,判断 node 节点第一个元素是不是要找到,如果是直接返回; - 第 3 步,如果是红黑树结构,就从红黑树里面查询; - 第 4 步,如果是链表结构,循环遍历判断。 remove 方法 ```java public V remove(Object key) { // 调用替换节点方法 return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { // 计算hash int hash = spread(key.hashCode()); // 循环遍历数组 for (Node[] tab = table;;) { Node f; int n, i, fh; //校验参数 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) // 如果正在扩容中,协助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; // 标记是否处理过 boolean validated = false; //用 synchronized 同步锁,保证并发时元素移除安全 synchronized (f) { // 再次验证当前tab元素是否被修改过 if (tabAt(tab, i) == f) { if (fh >= 0) { // fh>=0表示是链表节点 validated = true; // 遍历链表寻找目标节点 for (Node e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; // 遍历到链表尾部还没找到元素,跳出循环 if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { // 如果是树节点 validated = true; TreeBin t = (TreeBin)f; TreeNode r, p; // 遍历树找到了目标节点 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } // 如果处理过,不管有没有找到元素都返回 if (validated) { // 如果找到了元素,返回其旧值 if (oldVal != null) { // 如果要替换的值为空,元素个数减1 if (value == null) addCount(-1L, -1); return oldVal; } break; } } } // 没找到元素返回空 return null; } ``` 步骤如下: - 第 1 步,循环遍历数组,接着校验参数; - 第 2 步,判断是否有别的线程正在扩容,如果是一起扩容; - 第 3 步,用 synchronized 同步锁,保证并发时元素移除安全; - 第 4 步,因为 `check= -1`,所以不会进行扩容操作,利用 CAS 操作修改 baseCount 值。 后面我们还会单独再开一篇来详细介绍 [ConcurrentHashMap](https://javabetter.cn/thread/ConcurrentHashMap.html),这篇就先分析到这里。 #### ConcurrentSkipListMap ConcurrentNavigableMap 接口继承了 NavigableMap 接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。 ConcurrentNavigableMap 接口的主要实现类是 ConcurrentSkipListMap 类。从名字上来看,它的底层使用的是跳表(SkipList)。跳表是一种”空间换时间“的数据结构,可以使用 [CAS](https://javabetter.cn/thread/cas.html) 来保证并发安全性。 与 ConcurrentHashMap 的读密集操作相比,ConcurrentSkipListMap 的读和写操作的性能相对较低。这是由其数据结构导致的,因为跳表的插入和删除需要更复杂的指针操作。然而,ConcurrentSkipListMap 提供了有序性,这是 ConcurrentHashMap 所没有的。 ConcurrentSkipListMap 适用于需要线程安全的同时又需要元素有序的场合。如果不需要有序,ConcurrentHashMap 可能是更好的选择,因为它通常具有更高的性能。 [時光以北这篇 ConcurrentSkipListMap 讲的很不错](https://juejin.cn/post/6844903958499033095),可以学习。 ### 并发 Queue JDK 并没有提供线程安全的 List 类,因为对 List 来说,**很难去开发一个通用并且没有并发瓶颈的线程安全的 List**。因为即使简单的读操作,比如 `contains()`,也需要再搜索的时候锁住整个 list。 所以退一步,JDK 提供了队列和双端队列的线程安全类:ConcurrentLinkedQueue 和 ConcurrentLinkedDeque。因为队列相对于 List 来说,有更多的限制。这两个类是使用 CAS 来实现线程安全的。 我们会在后面单独开一篇来详细介绍[ConcurrentLinkedQueue](https://javabetter.cn/thread/ConcurrentLinkedQueue.html)。 ### 并发 Set ConcurrentSkipListSet 是线程安全的有序集合。底层是使用 ConcurrentSkipListMap 来实现。 谷歌的 [Guava](https://javabetter.cn/common-tool/guava.html)实现了一个线程安全的 ConcurrentHashSet: ```java Set s = Sets.newConcurrentHashSet(); ``` ### 阻塞队列 我们假设一种场景,[生产者一直生产资源,消费者一直消费资源](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的**生产者-消费者模式**。 该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。 我们自己 coding 实现这个模式的时候,因为需要让**多个线程操作共享变量**(即资源),所以很容易引发**线程安全问题**,造成**重复消费**和**死锁**,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个**等待-唤醒**逻辑都需要自己实现。 这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列(BlockingQueue),**你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。** > BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了**线程安全的队列访问方式**,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。 BlockingQueue 一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。**BlockingQueue 就是存放元素的容器**。 #### BlockingQueue 的操作方法 阻塞队列提供了四组不同的方法用于插入、移除、检查元素: | 方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 | | :-----------: | :-------: | :--------: | :--------: | :----------------: | | 插入方法 | add(e) | offer(e) | **put(e)** | offer(e,time,unit) | | 移除方法 | remove() | poll() | **take()** | poll(time,unit) | | 检查方法 | element() | peek() | - | - | - 抛出异常:如果操作无法立即执行,会抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出 `IllegalStateException(“Queue full”)`异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。 - 返回特殊值:如果操作无法立即执行,会返回一个特殊值,通常是 true / false。 - 一直阻塞:如果操作无法立即执行,则一直阻塞或者响应中断。 - 超时退出:如果操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。 **注意:** - 不能往阻塞队列中插入 null,会抛出空指针异常。 - 可以访问阻塞队列中的任意元素,调用 `remove(o)`可以将队列之中的特定对象移除,但并不高效,尽量避免使用。 我们会在后面单独开一篇[BlockingQueue](https://javabetter.cn/thread/BlockingQueue.html)来细讲。 #### BlockingQueue 的实现类 **ArrayBlockingQueue** 由**数组**结构组成的**有界**阻塞队列。内部结构是数组,具有数组的特性。 ```java public ArrayBlockingQueue(int capacity, boolean fair){ //..省略代码 } ``` 可以初始化队列大小,一旦初始化将不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是**非公平锁**。 **LinkedBlockingQueue** 由**链表**结构组成的**有界**阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是`Integer.MAX_VALUE`,也可以指定大小。此队列按照**先进先出**的原则对元素进行排序。 **DelayQueue** 该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。注入其中的元素必须实现 `java.util.concurrent.Delayed` 接口。 DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。 **PriorityBlockingQueue** 基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。 > 网上大部分博客上**PriorityBlockingQueue**为公平锁,其实是不对的,查阅源码(感谢 github:**ambition0802**同学的指出): ```java public PriorityBlockingQueue(int initialCapacity, Comparator comparator) { this.lock = new ReentrantLock(); //默认构造方法-非公平锁 ...//其余代码略 } ``` **SynchronousQueue** 这个队列比较特殊,**没有任何内部容量**,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。 需要区别容量为 1 的 ArrayBlockingQueue、LinkedBlockingQueue。 以下方法的返回值,可以帮助理解这个队列: - `iterator()` 永远返回空,因为里面没有东西 - `peek()` 永远返回 null - `put()` 往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。 - `offer()` 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。 - `take()` 取出并且 remove 掉 queue 里的 element,取不到东西他会一直等。 - `poll()` 取出并且 remove 掉 queue 里的 element,只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。 - `isEmpty()` 永远返回 true - `remove()&removeAll()` 永远返回 false **注意** **PriorityBlockingQueue**不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时阻塞数据的消费者。因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。**对于使用默认大小的**LinkedBlockingQueue**也是一样的。 #### 阻塞队列的原理 阻塞队列的原理很简单,利用了 Lock 锁的多条件([Condition](https://javabetter.cn/thread/condition.html))阻塞控制。接下来我们分析 ArrayBlockingQueue JDK 1.8 的源码。 首先是构造方法,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是 notEmpty 和 notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是 put 操作时,给他加上监视器 notFull,标记这个线程是一个生产者;当线程是 take 操作时,给他加上监视器 notEmpty,标记这个线程是消费者。 ```java //数据元素数组 final Object[] items; //下一个待取出元素索引 int takeIndex; //下一个待添加元素索引 int putIndex; //元素个数 int count; //内部锁 final ReentrantLock lock; //消费者监视器 private final Condition notEmpty; //生产者监视器 private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { //..省略其他代码 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } ``` **put 操作的源码** ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 1.自旋拿锁 lock.lockInterruptibly(); try { // 2.判断队列是否满了 while (count == items.length) // 2.1如果满了,阻塞该线程,并标记为notFull线程, // 等待notFull的唤醒,唤醒之后继续执行while循环。 notFull.await(); // 3.如果没有满,则进入队列 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 4 唤醒一个等待的线程 notEmpty.signal(); } ``` 总结 put 的流程: 1. 所有执行 put 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。 2. 判断阻塞队列是否满了,如果满了,则调用 await 方法阻塞这个线程,并标记为 notFull(生产者)线程,同时释放 lock 锁,等待被消费者线程唤醒。 3. 如果没有满,则调用 enqueue 方法将元素 put 进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。 4. 唤醒一个标记为 notEmpty(消费者)的线程。 **take 操作的源码** ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } ``` take 操作和 put 操作类似,总结一下 take 操作的流程: 1. 所有执行 take 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。 2. 判断阻塞队列是否为空,如果是空,则调用 await 方法阻塞这个线程,并标记为 notEmpty(消费者)线程,同时释放 lock 锁,等待被生产者线程唤醒。 3. 如果没有空,则调用 dequeue 方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。 4. 唤醒一个标记为 notFull(生产者)的线程。 **注意** 1. put 和 take 操作都需要**先获取锁**,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。 2. 就算拿到锁了之后,也**不一定**会顺利进行 put/take 操作,需要判断**队列是否可用**(是否满/空),如果不可用,则会被阻塞,**并释放锁**。 3. 在第 2 点被阻塞的线程会被唤醒,但是在唤醒之后,**依然需要拿到锁**才能继续往下执行,否则,自旋拿锁,拿到锁了再 while 判断队列是否可用(这也是为什么不用 if 判断,而使用 while 判断的原因)。 #### 示例和使用场景 生产者-消费者模型: ```java public class Test { private int queueSize = 10; private ArrayBlockingQueue queue = new ArrayBlockingQueue(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ try { queue.take(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } } ``` 下面是这个例子的输出片段: ``` 从队列取走一个元素,队列剩余0个元素 从队列取走一个元素,队列剩余0个元素 向队列取中插入一个元素,队列剩余空间:9 向队列取中插入一个元素,队列剩余空间:9 向队列取中插入一个元素,队列剩余空间:9 向队列取中插入一个元素,队列剩余空间:8 向队列取中插入一个元素,队列剩余空间:7 向队列取中插入一个元素,队列剩余空间:6 向队列取中插入一个元素,队列剩余空间:5 向队列取中插入一个元素,队列剩余空间:4 向队列取中插入一个元素,队列剩余空间:3 向队列取中插入一个元素,队列剩余空间:2 向队列取中插入一个元素,队列剩余空间:1 向队列取中插入一个元素,队列剩余空间:0 从队列取走一个元素,队列剩余1个元素 从队列取走一个元素,队列剩余9个元素 ``` 注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于**System.out.println 语句没有锁**。考虑到这样的情况:线程 1 在执行完 put/take 操作后立即失去 CPU 时间片,然后切换到线程 2 执行 put/take 操作,执行完毕后回到线程 1 的 System.out.println 语句并输出,发现这个时候阻塞队列的 size 已经被线程 2 改变了,所以这个时候输出的 size 并不是当时线程 1 执行完 put/take 操作之后阻塞队列的 size,但可以确保的是 size 不会超过 10 个。实际上使用阻塞队列是没有问题的。 #### 线程池中使用阻塞队列 ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ``` Java 中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用 Executors 类中已经提供的线程池,还是自己通过 ThreadPoolExecutor 实现线程池,都会更加得心应手。 > 注:上面提到的生产者-消费者模式,大家可以参考[生产者-消费者模型](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),可以更好的理解阻塞队列。 ### CopyOnWrite 容器 在聊 CopyOnWrite 容器之前我们先来谈谈什么是 CopyOnWrite 机制,CopyOnWrite 是计算机设计领域的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制。 什么是写入时复制呢? 就是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。 CopyOnWrite 容器即**写时复制的容器**,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行 copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。 这样做的好处在于,我们可以在并发的场景下对容器进行"读操作"而不需要"加锁",从而达到读写分离的目的。从 JDK 1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,分别是 [CopyOnWriteArrayList](https://javabetter.cn/thread/CopyOnWriteArrayList.html) 和 CopyOnWriteArraySet。 #### CopyOnWriteArrayList **优点**: CopyOnWriteArrayList 经常被用于“读多写少”的并发场景,是因为 CopyOnWriteArrayList 无需任何同步措施,大大增强了读的性能。在 Java 中遍历线程非安全的 List(如:[ArrayList 和 LinkedList](https://javabetter.cn/collection/list-war-2.html))的时候,若中途有别的线程对 List 容器进行了修改,那么会抛出 ConcurrentModificationException 异常。 CopyOnWriteArrayList 由于其"读写分离",遍历和修改操作分别作用在不同的 List 容器,所以在使用迭代器遍历的时候,不会抛出异常。 **缺点**: 第一个缺点是 CopyOnWriteArrayList 每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁 Full GC。比如这些对象占用的内存 200M 左右,那么再写入 100M 数据进去,内存就会多占用 300M。 第二个缺点是 CopyOnWriteArrayList 由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。 现在我们来看一下 CopyOnWriteArrayList 的 add 操作源码,它的逻辑很清晰,就是先把原容器进行 copy,然后在新的副本上进行“写操作”,最后再切换引用,在此过程中是加了锁的。 ```java public boolean add(E e) { // ReentrantLock加锁,保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // 拷贝原容器,长度为原容器长度加一 Object[] newElements = Arrays.copyOf(elements, len + 1); // 在新副本上执行添加操作 newElements[len] = e; // 将原容器引用指向新副本 setArray(newElements); return true; } finally { // 解锁 lock.unlock(); } } ``` 我们再来看一下 remove 操作的源码,remove 的逻辑是将要 remove 元素之外的其他元素拷贝到新的副本中,然后再将原容器的引用指向新的副本中,因为 remove 操作也是“写操作”所以也是要加锁的。 ```java public E remove(int index) { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) // 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用 setArray(Arrays.copyOf(elements, len - 1)); else { // 否则,将要删除元素之外的其他元素拷贝到新副本中,并切换引用 Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { // 解锁 lock.unlock(); } } ``` 我们再来看看 CopyOnWriteArrayList 效率最高的读操作的源码 ```java public E get(int index) { return get(getArray(), index); } ``` ```java private E get(Object[] a, int index) { return (E) a[index]; } ``` 由上可见“读操作”是没有加锁的,直接读取。 #### CopyOnWrite 使用 接下来,我们结合具体业务场景来实现一个 CopyOnWriteMap 的并发容器并且使用它。 ```java import java.util.Collection; import java.util.Map; import java.util.Set; public class CopyOnWriteMap implements Map, Cloneable { private volatile Map internalMap; public CopyOnWriteMap() { internalMap = new HashMap(); } public V put(K key, V value) { synchronized (this) { Map newMap = new HashMap(internalMap); V val = newMap.put(key, value); internalMap = newMap; return val; } } public V get(Object key) { return internalMap.get(key); } public void putAll(Map newData) { synchronized (this) { Map newMap = new HashMap(internalMap); newMap.putAll(newData); internalMap = newMap; } } } ``` 上面就是参考 CopyOnWriteArrayList 实现的 CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的 CopyOnWrite 的复制思想,它最适用于“读多写少”的并发场景。 **场景:** 假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。 ```java // 黑名单服务 public class BlackListServiceImpl { // 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。 private static CopyOnWriteMap blackListMap = new CopyOnWriteMap(1000); public static boolean isBlackList(String id) { return blackListMap.get(id) == null ? false : true; } public static void addBlackList(String id) { blackListMap.put(id, Boolean.TRUE); } /** * 批量添加黑名单 * (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。 * 如使用上面代码里的addBlackList方法) * @param ids */ public static void addBlackList(Map ids) { blackListMap.putAll(ids); } } ``` 这里需要各位小伙伴特别特别注意一个问题,此处的场景是每晚凌晨“黑名单”定时更新,原因是 CopyOnWrite 容器有**数据一致性**的问题,它只能保证**最终数据一致性**。 所以如果我们希望写入的数据马上能准确地读取,请不要使用 CopyOnWrite 容器。 ### 总结 本文主要介绍了并发包中的三个重要的容器类,Map、阻塞队列和 CopyOnWrite 容器,Map 用于存储键值对,阻塞队列用于生产者-消费者模型,而 CopyOnWrite 容器用于“读多写少”的并发场景。 > 编辑:沉默王二,部分内容来源于朋友小七萤火虫开源的这个仓库:[深入浅出 Java 多线程](http://concurrent.redspider.group/),部分内容来自于这篇[初念初恋-ConcurrentHashMap](https://juejin.cn/post/7064061605185028110)。 --- GitHub 上标星 9000+ 的开源知识库《[二哥的 Java 进阶之路](https://github.com/itwanger/toBeBetterJavaer)》第一版 PDF 终于来了!包括 Java 基础语法、数组&字符串、OOP、集合框架、Java IO、异常处理、Java 新特性、网络编程、NIO、并发编程、JVM 等等,共计 32 万余字,可以说是通俗易懂、风趣幽默……详情戳:[太赞了,GitHub 上标星 9000+ 的 Java 教程](https://javabetter.cn/overview/) 微信搜 **沉默王二** 或扫描下方二维码关注二哥的原创公众号沉默王二,回复 **222** 即可免费领取。 ![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/gongzhonghao.png)