提交 ffb64db0 编写于 作者: P psandoz

8016324: filter/flatMap pipeline sinks should pass size information to downstream sink

Reviewed-by: chegar, mduigou
Contributed-by: NBrian Goetz <brian.goetz@oracle.com>
上级 e25108e4
...@@ -258,6 +258,12 @@ abstract class DoublePipeline<E_IN> ...@@ -258,6 +258,12 @@ abstract class DoublePipeline<E_IN>
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(double t) { public void accept(double t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
DoubleStream result = mapper.apply(t); DoubleStream result = mapper.apply(t);
...@@ -289,6 +295,11 @@ abstract class DoublePipeline<E_IN> ...@@ -289,6 +295,11 @@ abstract class DoublePipeline<E_IN>
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override @Override
public void accept(double t) { public void accept(double t) {
if (predicate.test(t)) if (predicate.test(t))
......
...@@ -294,6 +294,12 @@ abstract class IntPipeline<E_IN> ...@@ -294,6 +294,12 @@ abstract class IntPipeline<E_IN>
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(int t) { public void accept(int t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
IntStream result = mapper.apply(t); IntStream result = mapper.apply(t);
...@@ -325,6 +331,11 @@ abstract class IntPipeline<E_IN> ...@@ -325,6 +331,11 @@ abstract class IntPipeline<E_IN>
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override @Override
public void accept(int t) { public void accept(int t) {
if (predicate.test(t)) if (predicate.test(t))
......
...@@ -275,6 +275,12 @@ abstract class LongPipeline<E_IN> ...@@ -275,6 +275,12 @@ abstract class LongPipeline<E_IN>
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(long t) { public void accept(long t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
LongStream result = mapper.apply(t); LongStream result = mapper.apply(t);
...@@ -306,6 +312,11 @@ abstract class LongPipeline<E_IN> ...@@ -306,6 +312,11 @@ abstract class LongPipeline<E_IN>
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override @Override
public void accept(long t) { public void accept(long t) {
if (predicate.test(t)) if (predicate.test(t))
......
...@@ -165,6 +165,11 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -165,6 +165,11 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override @Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
if (predicate.test(u)) if (predicate.test(u))
...@@ -252,6 +257,12 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -252,6 +257,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
Stream<? extends R> result = mapper.apply(u); Stream<? extends R> result = mapper.apply(u);
...@@ -273,6 +284,12 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -273,6 +284,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT>(sink) {
IntConsumer downstreamAsInt = downstream::accept; IntConsumer downstreamAsInt = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
IntStream result = mapper.apply(u); IntStream result = mapper.apply(u);
...@@ -294,6 +311,12 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -294,6 +311,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT>(sink) {
DoubleConsumer downstreamAsDouble = downstream::accept; DoubleConsumer downstreamAsDouble = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
DoubleStream result = mapper.apply(u); DoubleStream result = mapper.apply(u);
...@@ -315,6 +338,12 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -315,6 +338,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT>(sink) {
LongConsumer downstreamAsLong = downstream::accept; LongConsumer downstreamAsLong = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
LongStream result = mapper.apply(u); LongStream result = mapper.apply(u);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册