提交 ddc002ae 编写于 作者: T tvaleev

8154387: Parallel unordered Stream.limit() tries to collect 128 elements even if limit is less

Reviewed-by: psandoz
上级 ab8cdddb
...@@ -28,6 +28,7 @@ import java.util.Comparator; ...@@ -28,6 +28,7 @@ import java.util.Comparator;
import java.util.Objects; import java.util.Objects;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
...@@ -905,6 +906,7 @@ class StreamSpliterators { ...@@ -905,6 +906,7 @@ class StreamSpliterators {
// The spliterator to slice // The spliterator to slice
protected final T_SPLITR s; protected final T_SPLITR s;
protected final boolean unlimited; protected final boolean unlimited;
protected final int chunkSize;
private final long skipThreshold; private final long skipThreshold;
private final AtomicLong permits; private final AtomicLong permits;
...@@ -912,6 +914,8 @@ class StreamSpliterators { ...@@ -912,6 +914,8 @@ class StreamSpliterators {
this.s = s; this.s = s;
this.unlimited = limit < 0; this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0; this.skipThreshold = limit >= 0 ? limit : 0;
this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
} }
...@@ -921,6 +925,7 @@ class StreamSpliterators { ...@@ -921,6 +925,7 @@ class StreamSpliterators {
this.unlimited = parent.unlimited; this.unlimited = parent.unlimited;
this.permits = parent.permits; this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold; this.skipThreshold = parent.skipThreshold;
this.chunkSize = parent.chunkSize;
} }
/** /**
...@@ -1029,13 +1034,13 @@ class StreamSpliterators { ...@@ -1029,13 +1034,13 @@ class StreamSpliterators {
PermitStatus permitStatus; PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE // Optimistically traverse elements up to a threshold of chunkSize
if (sb == null) if (sb == null)
sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE); sb = new ArrayBuffer.OfRef<>(chunkSize);
else else
sb.reset(); sb.reset();
long permitsRequested = 0; long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE); do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0) if (permitsRequested == 0)
return; return;
sb.forEach(action, acquirePermits(permitsRequested)); sb.forEach(action, acquirePermits(permitsRequested));
...@@ -1102,15 +1107,15 @@ class StreamSpliterators { ...@@ -1102,15 +1107,15 @@ class StreamSpliterators {
PermitStatus permitStatus; PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE // Optimistically traverse elements up to a threshold of chunkSize
if (sb == null) if (sb == null)
sb = bufferCreate(CHUNK_SIZE); sb = bufferCreate(chunkSize);
else else
sb.reset(); sb.reset();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T_CONS sbc = (T_CONS) sb; T_CONS sbc = (T_CONS) sb;
long permitsRequested = 0; long permitsRequested = 0;
do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE); do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize);
if (permitsRequested == 0) if (permitsRequested == 0)
return; return;
sb.forEach(action, acquirePermits(permitsRequested)); sb.forEach(action, acquirePermits(permitsRequested));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册