提交 169ce6c7 编写于 作者: L lana

Merge

/* /*
* Copyright (c) 2005, 2006, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2005, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
...@@ -59,7 +59,7 @@ import java.io.IOException; ...@@ -59,7 +59,7 @@ import java.io.IOException;
* {@link java.lang.instrument} for a detailed description on how these agents * {@link java.lang.instrument} for a detailed description on how these agents
* are loaded and started). The {@link #loadAgentLibrary loadAgentLibrary} and * are loaded and started). The {@link #loadAgentLibrary loadAgentLibrary} and
* {@link #loadAgentPath loadAgentPath} methods are used to load agents that * {@link #loadAgentPath loadAgentPath} methods are used to load agents that
* are deployed in a dynamic library and make use of the <a * are deployed either in a dynamic library or statically linked into the VM and make use of the <a
* href="../../../../../../../../technotes/guides/jvmti/index.html">JVM Tools * href="../../../../../../../../technotes/guides/jvmti/index.html">JVM Tools
* Interface</a>. </p> * Interface</a>. </p>
* *
...@@ -298,25 +298,29 @@ public abstract class VirtualMachine { ...@@ -298,25 +298,29 @@ public abstract class VirtualMachine {
* <p> A <a href="../../../../../../../../technotes/guides/jvmti/index.html">JVM * <p> A <a href="../../../../../../../../technotes/guides/jvmti/index.html">JVM
* TI</a> client is called an <i>agent</i>. It is developed in a native language. * TI</a> client is called an <i>agent</i>. It is developed in a native language.
* A JVM TI agent is deployed in a platform specific manner but it is typically the * A JVM TI agent is deployed in a platform specific manner but it is typically the
* platform equivalent of a dynamic library. This method causes the given agent * platform equivalent of a dynamic library. Alternatively, it may be statically linked into the VM.
* library to be loaded into the target VM (if not already loaded). * This method causes the given agent library to be loaded into the target
* VM (if not already loaded or if not statically linked into the VM).
* It then causes the target VM to invoke the <code>Agent_OnAttach</code> function * It then causes the target VM to invoke the <code>Agent_OnAttach</code> function
* or, for a statically linked agent named 'L', the <code>Agent_OnAttach_L</code> function
* as specified in the * as specified in the
* <a href="../../../../../../../../technotes/guides/jvmti/index.html"> JVM Tools * <a href="../../../../../../../../technotes/guides/jvmti/index.html"> JVM Tools
* Interface</a> specification. Note that the <code>Agent_OnAttach</code> * Interface</a> specification. Note that the <code>Agent_OnAttach[_L]</code>
* function is invoked even if the agent library was loaded prior to invoking * function is invoked even if the agent library was loaded prior to invoking
* this method. * this method.
* *
* <p> The agent library provided is the name of the agent library. It is interpreted * <p> The agent library provided is the name of the agent library. It is interpreted
* in the target virtual machine in an implementation-dependent manner. Typically an * in the target virtual machine in an implementation-dependent manner. Typically an
* implementation will expand the library name into an operating system specific file * implementation will expand the library name into an operating system specific file
* name. For example, on UNIX systems, the name <tt>foo</tt> might be expanded to * name. For example, on UNIX systems, the name <tt>L</tt> might be expanded to
* <tt>libfoo.so</tt>, and located using the search path specified by the * <tt>libL.so</tt>, and located using the search path specified by the
* <tt>LD_LIBRARY_PATH</tt> environment variable.</p> * <tt>LD_LIBRARY_PATH</tt> environment variable. If the agent named 'L' is
* statically linked into the VM then the VM must export a function named
* <code>Agent_OnAttach_L</code>.</p>
* *
* <p> If the <code>Agent_OnAttach</code> function in the agent library returns * <p> If the <code>Agent_OnAttach[_L]</code> function in the agent library returns
* an error then an {@link com.sun.tools.attach.AgentInitializationException} is * an error then an {@link com.sun.tools.attach.AgentInitializationException} is
* thrown. The return value from the <code>Agent_OnAttach</code> can then be * thrown. The return value from the <code>Agent_OnAttach[_L]</code> can then be
* obtained by invoking the {@link * obtained by invoking the {@link
* com.sun.tools.attach.AgentInitializationException#returnValue() returnValue} * com.sun.tools.attach.AgentInitializationException#returnValue() returnValue}
* method on the exception. </p> * method on the exception. </p>
...@@ -325,15 +329,16 @@ public abstract class VirtualMachine { ...@@ -325,15 +329,16 @@ public abstract class VirtualMachine {
* The name of the agent library. * The name of the agent library.
* *
* @param options * @param options
* The options to provide to the <code>Agent_OnAttach</code> * The options to provide to the <code>Agent_OnAttach[_L]</code>
* function (can be <code>null</code>). * function (can be <code>null</code>).
* *
* @throws AgentLoadException * @throws AgentLoadException
* If the agent library does not exist, or cannot be loaded for * If the agent library does not exist, the agent library is not
* another reason. * statically linked with the VM, or the agent library cannot be
* loaded for another reason.
* *
* @throws AgentInitializationException * @throws AgentInitializationException
* If the <code>Agent_OnAttach</code> function returns an error * If the <code>Agent_OnAttach[_L]</code> function returns an error.
* *
* @throws IOException * @throws IOException
* If an I/O error occurs * If an I/O error occurs
...@@ -359,11 +364,12 @@ public abstract class VirtualMachine { ...@@ -359,11 +364,12 @@ public abstract class VirtualMachine {
* The name of the agent library. * The name of the agent library.
* *
* @throws AgentLoadException * @throws AgentLoadException
* If the agent library does not exist, or cannot be loaded for * If the agent library does not exist, the agent library is not
* another reason. * statically linked with the VM, or the agent library cannot be
* loaded for another reason.
* *
* @throws AgentInitializationException * @throws AgentInitializationException
* If the <code>Agent_OnAttach</code> function returns an error * If the <code>Agent_OnAttach[_L]</code> function returns an error.
* *
* @throws IOException * @throws IOException
* If an I/O error occurs * If an I/O error occurs
...@@ -383,12 +389,23 @@ public abstract class VirtualMachine { ...@@ -383,12 +389,23 @@ public abstract class VirtualMachine {
* <p> A <a href="../../../../../../../../technotes/guides/jvmti/index.html">JVM * <p> A <a href="../../../../../../../../technotes/guides/jvmti/index.html">JVM
* TI</a> client is called an <i>agent</i>. It is developed in a native language. * TI</a> client is called an <i>agent</i>. It is developed in a native language.
* A JVM TI agent is deployed in a platform specific manner but it is typically the * A JVM TI agent is deployed in a platform specific manner but it is typically the
* platform equivalent of a dynamic library. This method causes the given agent * platform equivalent of a dynamic library. Alternatively, the native
* library to be loaded into the target VM (if not already loaded). * library specified by the agentPath parameter may be statically
* It then causes the target VM to invoke the <code>Agent_OnAttach</code> function * linked with the VM. The parsing of the agentPath parameter into
* as specified in the * a statically linked library name is done in a platform
* specific manner in the VM. For example, in UNIX, an agentPath parameter
* of <code>/a/b/libL.so</code> would name a library 'L'.
*
* See the JVM TI Specification for more details.
*
* This method causes the given agent library to be loaded into the target
* VM (if not already loaded or if not statically linked into the VM).
* It then causes the target VM to invoke the <code>Agent_OnAttach</code>
* function or, for a statically linked agent named 'L', the
* <code>Agent_OnAttach_L</code> function as specified in the
* <a href="../../../../../../../../technotes/guides/jvmti/index.html"> JVM Tools * <a href="../../../../../../../../technotes/guides/jvmti/index.html"> JVM Tools
* Interface</a> specification. Note that the <code>Agent_OnAttach</code> * Interface</a> specification.
* Note that the <code>Agent_OnAttach[_L]</code>
* function is invoked even if the agent library was loaded prior to invoking * function is invoked even if the agent library was loaded prior to invoking
* this method. * this method.
* *
...@@ -396,9 +413,9 @@ public abstract class VirtualMachine { ...@@ -396,9 +413,9 @@ public abstract class VirtualMachine {
* agent library. Unlike {@link #loadAgentLibrary loadAgentLibrary}, the library name * agent library. Unlike {@link #loadAgentLibrary loadAgentLibrary}, the library name
* is not expanded in the target virtual machine. </p> * is not expanded in the target virtual machine. </p>
* *
* <p> If the <code>Agent_OnAttach</code> function in the agent library returns * <p> If the <code>Agent_OnAttach[_L]</code> function in the agent library returns
* an error then an {@link com.sun.tools.attach.AgentInitializationException} is * an error then an {@link com.sun.tools.attach.AgentInitializationException} is
* thrown. The return value from the <code>Agent_OnAttach</code> can then be * thrown. The return value from the <code>Agent_OnAttach[_L]</code> can then be
* obtained by invoking the {@link * obtained by invoking the {@link
* com.sun.tools.attach.AgentInitializationException#returnValue() returnValue} * com.sun.tools.attach.AgentInitializationException#returnValue() returnValue}
* method on the exception. </p> * method on the exception. </p>
...@@ -407,15 +424,16 @@ public abstract class VirtualMachine { ...@@ -407,15 +424,16 @@ public abstract class VirtualMachine {
* The full path of the agent library. * The full path of the agent library.
* *
* @param options * @param options
* The options to provide to the <code>Agent_OnAttach</code> * The options to provide to the <code>Agent_OnAttach[_L]</code>
* function (can be <code>null</code>). * function (can be <code>null</code>).
* *
* @throws AgentLoadException * @throws AgentLoadException
* If the agent library does not exist, or cannot be loaded for * If the agent library does not exist, the agent library is not
* another reason. * statically linked with the VM, or the agent library cannot be
* loaded for another reason.
* *
* @throws AgentInitializationException * @throws AgentInitializationException
* If the <code>Agent_OnAttach</code> function returns an error * If the <code>Agent_OnAttach[_L]</code> function returns an error.
* *
* @throws IOException * @throws IOException
* If an I/O error occurs * If an I/O error occurs
...@@ -441,11 +459,12 @@ public abstract class VirtualMachine { ...@@ -441,11 +459,12 @@ public abstract class VirtualMachine {
* The full path to the agent library. * The full path to the agent library.
* *
* @throws AgentLoadException * @throws AgentLoadException
* If the agent library does not exist, or cannot be loaded for * If the agent library does not exist, the agent library is not
* another reason. * statically linked with the VM, or the agent library cannot be
* loaded for another reason.
* *
* @throws AgentInitializationException * @throws AgentInitializationException
* If the <code>Agent_OnAttach</code> function returns an error * If the <code>Agent_OnAttach[_L]</code> function returns an error.
* *
* @throws IOException * @throws IOException
* If an I/O error occurs * If an I/O error occurs
......
...@@ -137,6 +137,11 @@ public final class Collectors { ...@@ -137,6 +137,11 @@ public final class Collectors {
return (u,v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); }; return (u,v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); };
} }
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
/** /**
* Simple implementation class for {@code Collector}. * Simple implementation class for {@code Collector}.
* *
...@@ -166,7 +171,7 @@ public final class Collectors { ...@@ -166,7 +171,7 @@ public final class Collectors {
BiConsumer<A, T> accumulator, BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner, BinaryOperator<A> combiner,
Set<Characteristics> characteristics) { Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, i -> (R) i, characteristics); this(supplier, accumulator, combiner, castingIdentity(), characteristics);
} }
@Override @Override
...@@ -209,7 +214,7 @@ public final class Collectors { ...@@ -209,7 +214,7 @@ public final class Collectors {
*/ */
public static <T, C extends Collection<T>> public static <T, C extends Collection<T>>
Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
return new CollectorImpl<>(collectionFactory, Collection::add, return new CollectorImpl<>(collectionFactory, Collection<T>::add,
(r1, r2) -> { r1.addAll(r2); return r1; }, (r1, r2) -> { r1.addAll(r2); return r1; },
CH_ID); CH_ID);
} }
...@@ -1046,30 +1051,23 @@ public final class Collectors { ...@@ -1046,30 +1051,23 @@ public final class Collectors {
public static <T, D, A> public static <T, D, A>
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
Collector<? super T, A, D> downstream) { Collector<? super T, A, D> downstream) {
@SuppressWarnings("unchecked") BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<D, ? super T> downstreamAccumulator = (BiConsumer<D, ? super T>) downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) ->
BiConsumer<Map<Boolean, A>, T> accumulator = (result, t) -> { downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
Partition<D> asPartition = ((Partition<D>) result);
downstreamAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t);
};
BinaryOperator<A> op = downstream.combiner(); BinaryOperator<A> op = downstream.combiner();
BinaryOperator<Map<Boolean, A>> merger = (m1, m2) -> { BinaryOperator<Partition<A>> merger = (left, right) ->
Partition<A> left = (Partition<A>) m1; new Partition<>(op.apply(left.forTrue, right.forTrue),
Partition<A> right = (Partition<A>) m2; op.apply(left.forFalse, right.forFalse));
return new Partition<>(op.apply(left.forTrue, right.forTrue), Supplier<Partition<A>> supplier = () ->
op.apply(left.forFalse, right.forFalse)); new Partition<>(downstream.supplier().get(),
}; downstream.supplier().get());
Supplier<Map<Boolean, A>> supplier = () -> new Partition<>(downstream.supplier().get(),
downstream.supplier().get());
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
} }
else { else {
Function<Map<Boolean, A>, Map<Boolean, D>> finisher = (Map<Boolean, A> par) -> { Function<Partition<A>, Map<Boolean, D>> finisher = par ->
Partition<A> asAPartition = (Partition<A>) par; new Partition<>(downstream.finisher().apply(par.forTrue),
return new Partition<>(downstream.finisher().apply(asAPartition.forTrue), downstream.finisher().apply(par.forFalse));
downstream.finisher().apply(asAPartition.forFalse));
};
return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
} }
} }
......
...@@ -101,7 +101,7 @@ final class DistinctOps { ...@@ -101,7 +101,7 @@ final class DistinctOps {
if (StreamOpFlag.DISTINCT.isKnown(flags)) { if (StreamOpFlag.DISTINCT.isKnown(flags)) {
return sink; return sink;
} else if (StreamOpFlag.SORTED.isKnown(flags)) { } else if (StreamOpFlag.SORTED.isKnown(flags)) {
return new Sink.ChainedReference<T>(sink) { return new Sink.ChainedReference<T, T>(sink) {
boolean seenNull; boolean seenNull;
T lastSeen; T lastSeen;
...@@ -132,7 +132,7 @@ final class DistinctOps { ...@@ -132,7 +132,7 @@ final class DistinctOps {
} }
}; };
} else { } else {
return new Sink.ChainedReference<T>(sink) { return new Sink.ChainedReference<T, T>(sink) {
Set<T> seen; Set<T> seen;
@Override @Override
......
...@@ -191,7 +191,7 @@ abstract class DoublePipeline<E_IN> ...@@ -191,7 +191,7 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Double>(sink) {
@Override @Override
public void accept(double t) { public void accept(double t) {
downstream.accept(mapper.applyAsDouble(t)); downstream.accept(mapper.applyAsDouble(t));
...@@ -208,9 +208,8 @@ abstract class DoublePipeline<E_IN> ...@@ -208,9 +208,8 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<U> sink) { Sink<Double> opWrapSink(int flags, Sink<U> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<U>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(double t) { public void accept(double t) {
downstream.accept(mapper.apply(t)); downstream.accept(mapper.apply(t));
} }
...@@ -226,7 +225,7 @@ abstract class DoublePipeline<E_IN> ...@@ -226,7 +225,7 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Integer>(sink) {
@Override @Override
public void accept(double t) { public void accept(double t) {
downstream.accept(mapper.applyAsInt(t)); downstream.accept(mapper.applyAsInt(t));
...@@ -243,7 +242,7 @@ abstract class DoublePipeline<E_IN> ...@@ -243,7 +242,7 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Long> sink) { Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Long>(sink) {
@Override @Override
public void accept(double t) { public void accept(double t) {
downstream.accept(mapper.applyAsLong(t)); downstream.accept(mapper.applyAsLong(t));
...@@ -259,7 +258,7 @@ abstract class DoublePipeline<E_IN> ...@@ -259,7 +258,7 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Double>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -296,7 +295,7 @@ abstract class DoublePipeline<E_IN> ...@@ -296,7 +295,7 @@ abstract class DoublePipeline<E_IN>
StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Double>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -319,7 +318,7 @@ abstract class DoublePipeline<E_IN> ...@@ -319,7 +318,7 @@ abstract class DoublePipeline<E_IN>
0) { 0) {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Double>(sink) {
@Override @Override
public void accept(double t) { public void accept(double t) {
consumer.accept(t); consumer.accept(t);
......
...@@ -189,9 +189,8 @@ abstract class IntPipeline<E_IN> ...@@ -189,9 +189,8 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Long>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(int t) { public void accept(int t) {
downstream.accept((long) t); downstream.accept((long) t);
} }
...@@ -206,9 +205,8 @@ abstract class IntPipeline<E_IN> ...@@ -206,9 +205,8 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Double>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(int t) { public void accept(int t) {
downstream.accept((double) t); downstream.accept((double) t);
} }
...@@ -229,7 +227,7 @@ abstract class IntPipeline<E_IN> ...@@ -229,7 +227,7 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Integer>(sink) {
@Override @Override
public void accept(int t) { public void accept(int t) {
downstream.accept(mapper.applyAsInt(t)); downstream.accept(mapper.applyAsInt(t));
...@@ -246,9 +244,8 @@ abstract class IntPipeline<E_IN> ...@@ -246,9 +244,8 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<U> sink) { Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<U>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(int t) { public void accept(int t) {
downstream.accept(mapper.apply(t)); downstream.accept(mapper.apply(t));
} }
...@@ -264,7 +261,7 @@ abstract class IntPipeline<E_IN> ...@@ -264,7 +261,7 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Long>(sink) {
@Override @Override
public void accept(int t) { public void accept(int t) {
downstream.accept(mapper.applyAsLong(t)); downstream.accept(mapper.applyAsLong(t));
...@@ -281,7 +278,7 @@ abstract class IntPipeline<E_IN> ...@@ -281,7 +278,7 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Double>(sink) {
@Override @Override
public void accept(int t) { public void accept(int t) {
downstream.accept(mapper.applyAsDouble(t)); downstream.accept(mapper.applyAsDouble(t));
...@@ -297,7 +294,7 @@ abstract class IntPipeline<E_IN> ...@@ -297,7 +294,7 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Integer>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -334,7 +331,7 @@ abstract class IntPipeline<E_IN> ...@@ -334,7 +331,7 @@ abstract class IntPipeline<E_IN>
StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Integer>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -357,7 +354,7 @@ abstract class IntPipeline<E_IN> ...@@ -357,7 +354,7 @@ abstract class IntPipeline<E_IN>
0) { 0) {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Integer>(sink) {
@Override @Override
public void accept(int t) { public void accept(int t) {
consumer.accept(t); consumer.accept(t);
......
...@@ -186,7 +186,7 @@ abstract class LongPipeline<E_IN> ...@@ -186,7 +186,7 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Double> sink) { Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Double>(sink) {
@Override @Override
public void accept(long t) { public void accept(long t) {
downstream.accept((double) t); downstream.accept((double) t);
...@@ -208,9 +208,8 @@ abstract class LongPipeline<E_IN> ...@@ -208,9 +208,8 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Long>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(long t) { public void accept(long t) {
downstream.accept(mapper.applyAsLong(t)); downstream.accept(mapper.applyAsLong(t));
} }
...@@ -226,9 +225,8 @@ abstract class LongPipeline<E_IN> ...@@ -226,9 +225,8 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<U> sink) { Sink<Long> opWrapSink(int flags, Sink<U> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<U>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(long t) { public void accept(long t) {
downstream.accept(mapper.apply(t)); downstream.accept(mapper.apply(t));
} }
...@@ -244,9 +242,8 @@ abstract class LongPipeline<E_IN> ...@@ -244,9 +242,8 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Integer> sink) { Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Integer>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(long t) { public void accept(long t) {
downstream.accept(mapper.applyAsInt(t)); downstream.accept(mapper.applyAsInt(t));
} }
...@@ -262,7 +259,7 @@ abstract class LongPipeline<E_IN> ...@@ -262,7 +259,7 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Double> sink) { Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Double>(sink) {
@Override @Override
public void accept(long t) { public void accept(long t) {
downstream.accept(mapper.applyAsDouble(t)); downstream.accept(mapper.applyAsDouble(t));
...@@ -278,7 +275,7 @@ abstract class LongPipeline<E_IN> ...@@ -278,7 +275,7 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Long>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -315,7 +312,7 @@ abstract class LongPipeline<E_IN> ...@@ -315,7 +312,7 @@ abstract class LongPipeline<E_IN>
StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Long>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
...@@ -338,7 +335,7 @@ abstract class LongPipeline<E_IN> ...@@ -338,7 +335,7 @@ abstract class LongPipeline<E_IN>
0) { 0) {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Long>(sink) {
@Override @Override
public void accept(long t) { public void accept(long t) {
consumer.accept(t); consumer.accept(t);
......
...@@ -163,17 +163,16 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -163,17 +163,16 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
} }
@Override @Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) { public void accept(P_OUT u) {
if (predicate.test(u)) if (predicate.test(u))
downstream.accept((Object) u); downstream.accept(u);
} }
}; };
} }
...@@ -188,7 +187,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -188,7 +187,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override @Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
downstream.accept(mapper.apply(u)); downstream.accept(mapper.apply(u));
...@@ -205,7 +204,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -205,7 +204,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Integer>(sink) {
@Override @Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
downstream.accept(mapper.applyAsInt(u)); downstream.accept(mapper.applyAsInt(u));
...@@ -222,7 +221,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -222,7 +221,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Long>(sink) {
@Override @Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
downstream.accept(mapper.applyAsLong(u)); downstream.accept(mapper.applyAsLong(u));
...@@ -239,7 +238,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -239,7 +238,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Double>(sink) {
@Override @Override
public void accept(P_OUT u) { public void accept(P_OUT u) {
downstream.accept(mapper.applyAsDouble(u)); downstream.accept(mapper.applyAsDouble(u));
...@@ -257,14 +256,13 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -257,14 +256,13 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override @Override
public void begin(long size) { public void begin(long size) {
downstream.begin(-1); downstream.begin(-1);
} }
@Override @Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) { public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
Stream<? extends R> result = mapper.apply(u); Stream<? extends R> result = mapper.apply(u);
...@@ -284,7 +282,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -284,7 +282,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Integer>(sink) {
IntConsumer downstreamAsInt = downstream::accept; IntConsumer downstreamAsInt = downstream::accept;
@Override @Override
public void begin(long size) { public void begin(long size) {
...@@ -311,7 +309,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -311,7 +309,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Double>(sink) {
DoubleConsumer downstreamAsDouble = downstream::accept; DoubleConsumer downstreamAsDouble = downstream::accept;
@Override @Override
public void begin(long size) { public void begin(long size) {
...@@ -338,7 +336,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -338,7 +336,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, Long>(sink) {
LongConsumer downstreamAsLong = downstream::accept; LongConsumer downstreamAsLong = downstream::accept;
@Override @Override
public void begin(long size) { public void begin(long size) {
...@@ -364,9 +362,8 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -364,9 +362,8 @@ abstract class ReferencePipeline<P_IN, P_OUT>
0) { 0) {
@Override @Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT>(sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override @Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) { public void accept(P_OUT u) {
tee.accept(u); tee.accept(u);
downstream.accept(u); downstream.accept(u);
...@@ -495,6 +492,7 @@ abstract class ReferencePipeline<P_IN, P_OUT> ...@@ -495,6 +492,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
} }
@Override @Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, ? extends R> collector) { public final <R, A> R collect(Collector<? super P_OUT, A, ? extends R> collector) {
A container; A container;
if (isParallel() if (isParallel()
......
...@@ -241,11 +241,10 @@ interface Sink<T> extends Consumer<T> { ...@@ -241,11 +241,10 @@ interface Sink<T> extends Consumer<T> {
* implementation of the {@code accept()} method must call the correct * implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}. * {@code accept()} method on the downstream {@code Sink}.
*/ */
static abstract class ChainedReference<T> implements Sink<T> { static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
@SuppressWarnings("rawtypes") protected final Sink<? super E_OUT> downstream;
protected final Sink downstream;
public ChainedReference(Sink downstream) { public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream); this.downstream = Objects.requireNonNull(downstream);
} }
...@@ -274,11 +273,10 @@ interface Sink<T> extends Consumer<T> { ...@@ -274,11 +273,10 @@ interface Sink<T> extends Consumer<T> {
* The implementation of the {@code accept()} method must call the correct * The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}. * {@code accept()} method on the downstream {@code Sink}.
*/ */
static abstract class ChainedInt implements Sink.OfInt { static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
@SuppressWarnings("rawtypes") protected final Sink<? super E_OUT> downstream;
protected final Sink downstream;
public ChainedInt(Sink downstream) { public ChainedInt(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream); this.downstream = Objects.requireNonNull(downstream);
} }
...@@ -307,11 +305,10 @@ interface Sink<T> extends Consumer<T> { ...@@ -307,11 +305,10 @@ interface Sink<T> extends Consumer<T> {
* The implementation of the {@code accept()} method must call the correct * The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}. * {@code accept()} method on the downstream {@code Sink}.
*/ */
static abstract class ChainedLong implements Sink.OfLong { static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
@SuppressWarnings("rawtypes") protected final Sink<? super E_OUT> downstream;
protected final Sink downstream;
public ChainedLong(Sink downstream) { public ChainedLong(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream); this.downstream = Objects.requireNonNull(downstream);
} }
...@@ -340,11 +337,10 @@ interface Sink<T> extends Consumer<T> { ...@@ -340,11 +337,10 @@ interface Sink<T> extends Consumer<T> {
* The implementation of the {@code accept()} method must call the correct * The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}. * {@code accept()} method on the downstream {@code Sink}.
*/ */
static abstract class ChainedDouble implements Sink.OfDouble { static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
@SuppressWarnings("rawtypes") protected final Sink<? super E_OUT> downstream;
protected final Sink downstream;
public ChainedDouble(Sink downstream) { public ChainedDouble(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream); this.downstream = Objects.requireNonNull(downstream);
} }
......
...@@ -96,6 +96,11 @@ final class SliceOps { ...@@ -96,6 +96,11 @@ final class SliceOps {
} }
} }
@SuppressWarnings("unchecked")
private static <T> IntFunction<T[]> castingArray() {
return size -> (T[]) new Object[size];
}
/** /**
* Appends a "slice" operation to the provided stream. The slice operation * Appends a "slice" operation to the provided stream. The slice operation
* may be may be skip-only, limit-only, or skip-and-limit. * may be may be skip-only, limit-only, or skip-and-limit.
...@@ -107,12 +112,12 @@ final class SliceOps { ...@@ -107,12 +112,12 @@ final class SliceOps {
* is to be imposed * is to be imposed
*/ */
public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
long skip, long limit) { long skip, long limit) {
if (skip < 0) if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip); throw new IllegalArgumentException("Skip must be non-negative: " + skip);
return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE, return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
flags(limit)) { flags(limit)) {
Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
long skip, long limit, long sizeIfKnown) { long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) { if (skip <= sizeIfKnown) {
...@@ -146,7 +151,7 @@ final class SliceOps { ...@@ -146,7 +151,7 @@ final class SliceOps {
// cancellation will be more aggressive cancelling later tasks // cancellation will be more aggressive cancelling later tasks
// if the target slice size has been reached from a given task, // if the target slice size has been reached from a given task,
// cancellation should also clear local results if any // cancellation should also clear local results if any
return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit). return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
invoke().spliterator(); invoke().spliterator();
} }
} }
...@@ -182,7 +187,7 @@ final class SliceOps { ...@@ -182,7 +187,7 @@ final class SliceOps {
@Override @Override
Sink<T> opWrapSink(int flags, Sink<T> sink) { Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T>(sink) { return new Sink.ChainedReference<T, T>(sink) {
long n = skip; long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE; long m = limit >= 0 ? limit : Long.MAX_VALUE;
...@@ -291,7 +296,7 @@ final class SliceOps { ...@@ -291,7 +296,7 @@ final class SliceOps {
@Override @Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt(sink) { return new Sink.ChainedInt<Integer>(sink) {
long n = skip; long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE; long m = limit >= 0 ? limit : Long.MAX_VALUE;
...@@ -400,7 +405,7 @@ final class SliceOps { ...@@ -400,7 +405,7 @@ final class SliceOps {
@Override @Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong(sink) { return new Sink.ChainedLong<Long>(sink) {
long n = skip; long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE; long m = limit >= 0 ? limit : Long.MAX_VALUE;
...@@ -509,7 +514,7 @@ final class SliceOps { ...@@ -509,7 +514,7 @@ final class SliceOps {
@Override @Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble(sink) { return new Sink.ChainedDouble<Double>(sink) {
long n = skip; long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE; long m = limit >= 0 ? limit : Long.MAX_VALUE;
...@@ -560,13 +565,13 @@ final class SliceOps { ...@@ -560,13 +565,13 @@ final class SliceOps {
private volatile boolean completed; private volatile boolean completed;
SliceTask(AbstractPipeline<?, P_OUT, ?> op, SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
PipelineHelper<P_OUT> helper, PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator, Spliterator<P_IN> spliterator,
IntFunction<P_OUT[]> generator, IntFunction<P_OUT[]> generator,
long offset, long size) { long offset, long size) {
super(helper, spliterator); super(helper, spliterator);
this.op = (AbstractPipeline<P_OUT, P_OUT, ?>) op; this.op = op;
this.generator = generator; this.generator = generator;
this.targetOffset = offset; this.targetOffset = offset;
this.targetSize = size; this.targetSize = size;
......
...@@ -129,7 +129,7 @@ final class SortedOps { ...@@ -129,7 +129,7 @@ final class SortedOps {
} }
@Override @Override
public Sink<T> opWrapSink(int flags, Sink sink) { public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink); Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation // If the input is already naturally sorted and this operation
...@@ -280,12 +280,12 @@ final class SortedOps { ...@@ -280,12 +280,12 @@ final class SortedOps {
/** /**
* {@link ForkJoinTask} for implementing sort on SIZED reference streams. * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
*/ */
private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> { private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> {
private final Comparator<? super T> comparator; private final Comparator<? super T> comparator;
private T[] array; private T[] array;
private int offset; private int offset;
SizedRefSortingSink(Sink<T> sink, Comparator<? super T> comparator) { SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink); super(sink);
this.comparator = comparator; this.comparator = comparator;
} }
...@@ -320,11 +320,11 @@ final class SortedOps { ...@@ -320,11 +320,11 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on reference streams. * {@link Sink} for implementing sort on reference streams.
*/ */
private static final class RefSortingSink<T> extends Sink.ChainedReference<T> { private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
private final Comparator<? super T> comparator; private final Comparator<? super T> comparator;
private ArrayList<T> list; private ArrayList<T> list;
RefSortingSink(Sink<T> sink, Comparator<? super T> comparator) { RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink); super(sink);
this.comparator = comparator; this.comparator = comparator;
} }
...@@ -352,11 +352,11 @@ final class SortedOps { ...@@ -352,11 +352,11 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on SIZED int streams. * {@link Sink} for implementing sort on SIZED int streams.
*/ */
private static final class SizedIntSortingSink extends Sink.ChainedInt { private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> {
private int[] array; private int[] array;
private int offset; private int offset;
SizedIntSortingSink(Sink downstream) { SizedIntSortingSink(Sink<? super Integer> downstream) {
super(downstream); super(downstream);
} }
...@@ -386,10 +386,10 @@ final class SortedOps { ...@@ -386,10 +386,10 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on int streams. * {@link Sink} for implementing sort on int streams.
*/ */
private static final class IntSortingSink extends Sink.ChainedInt { private static final class IntSortingSink extends Sink.ChainedInt<Integer> {
private SpinedBuffer.OfInt b; private SpinedBuffer.OfInt b;
IntSortingSink(Sink sink) { IntSortingSink(Sink<? super Integer> sink) {
super(sink); super(sink);
} }
...@@ -417,11 +417,11 @@ final class SortedOps { ...@@ -417,11 +417,11 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on SIZED long streams. * {@link Sink} for implementing sort on SIZED long streams.
*/ */
private static final class SizedLongSortingSink extends Sink.ChainedLong { private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> {
private long[] array; private long[] array;
private int offset; private int offset;
SizedLongSortingSink(Sink downstream) { SizedLongSortingSink(Sink<? super Long> downstream) {
super(downstream); super(downstream);
} }
...@@ -451,10 +451,10 @@ final class SortedOps { ...@@ -451,10 +451,10 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on long streams. * {@link Sink} for implementing sort on long streams.
*/ */
private static final class LongSortingSink extends Sink.ChainedLong { private static final class LongSortingSink extends Sink.ChainedLong<Long> {
private SpinedBuffer.OfLong b; private SpinedBuffer.OfLong b;
LongSortingSink(Sink sink) { LongSortingSink(Sink<? super Long> sink) {
super(sink); super(sink);
} }
...@@ -482,11 +482,11 @@ final class SortedOps { ...@@ -482,11 +482,11 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on SIZED double streams. * {@link Sink} for implementing sort on SIZED double streams.
*/ */
private static final class SizedDoubleSortingSink extends Sink.ChainedDouble { private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> {
private double[] array; private double[] array;
private int offset; private int offset;
SizedDoubleSortingSink(Sink downstream) { SizedDoubleSortingSink(Sink<? super Double> downstream) {
super(downstream); super(downstream);
} }
...@@ -516,10 +516,10 @@ final class SortedOps { ...@@ -516,10 +516,10 @@ final class SortedOps {
/** /**
* {@link Sink} for implementing sort on double streams. * {@link Sink} for implementing sort on double streams.
*/ */
private static final class DoubleSortingSink extends Sink.ChainedDouble { private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> {
private SpinedBuffer.OfDouble b; private SpinedBuffer.OfDouble b;
DoubleSortingSink(Sink sink) { DoubleSortingSink(Sink<? super Double> sink) {
super(sink); super(sink);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册