提交 73694c2a 编写于 作者: C chegar

8012647: Add Arrays.parallelPrefix (prefix sum, scan, cumulative sum)

Reviewed-by: chegar, alanb, psandoz
Contributed-by: NDoug Lea &lt;dl@cs.oswego.edu&gt;, Tristan Yan &lt;tristan.yan@oracle.com&gt;, Chris Hegarty <chris.hegarty@oracle.com>
上级 05a8fbbd
/*
* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util;
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.CountedCompleter;
import java.util.function.BinaryOperator;
import java.util.function.IntBinaryOperator;
import java.util.function.LongBinaryOperator;
import java.util.function.DoubleBinaryOperator;
/**
* ForkJoin tasks to perform Arrays.parallelPrefix operations.
*
* @author Doug Lea
* @since 1.8
*/
class ArrayPrefixHelpers {
private ArrayPrefixHelpers() {}; // non-instantiable
/*
* Parallel prefix (aka cumulate, scan) task classes
* are based loosely on Guy Blelloch's original
* algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
* Keep dividing by two to threshold segment size, and then:
* Pass 1: Create tree of partial sums for each segment
* Pass 2: For each segment, cumulate with offset of left sibling
*
* This version improves performance within FJ framework mainly by
* allowing the second pass of ready left-hand sides to proceed
* even if some right-hand side first passes are still executing.
* It also combines first and second pass for leftmost segment,
* and skips the first pass for rightmost segment (whose result is
* not needed for second pass). It similarly manages to avoid
* requiring that users supply an identity basis for accumulations
* by tracking those segments/subtasks for which the first
* existing element is used as base.
*
* Managing this relies on ORing some bits in the pendingCount for
* phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
* main phase bit. When false, segments compute only their sum.
* When true, they cumulate array elements. CUMULATE is set at
* root at beginning of second pass and then propagated down. But
* it may also be set earlier for subtrees with lo==0 (the left
* spine of tree). SUMMED is a one bit join count. For leafs, it
* is set when summed. For internal nodes, it becomes true when
* one child is summed. When the second child finishes summing,
* we then moves up tree to trigger the cumulate phase. FINISHED
* is also a one bit join count. For leafs, it is set when
* cumulated. For internal nodes, it becomes true when one child
* is cumulated. When the second child finishes cumulating, it
* then moves up tree, completing at the root.
*
* To better exploit locality and reduce overhead, the compute
* method loops starting with the current task, moving if possible
* to one of its subtasks rather than forking.
*
* As usual for this sort of utility, there are 4 versions, that
* are simple copy/paste/adapt variants of each other. (The
* double and int versions differ from long version soley by
* replacing "long" (with case-matching)).
*/
// see above
static final int CUMULATE = 1;
static final int SUMMED = 2;
static final int FINISHED = 4;
/** The smallest subtask array partition size to use as threshold */
static final int MIN_PARTITION = 16;
static final class CumulateTask<T> extends CountedCompleter<Void> {
final T[] array;
final BinaryOperator<T> function;
CumulateTask<T> left, right;
T in, out;
final int lo, hi, origin, fence, threshold;
/** Root task constructor */
public CumulateTask(CumulateTask<T> parent,
BinaryOperator<T> function,
T[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/** Subtask constructor */
CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
T[] array, int origin, int fence, int threshold,
int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.origin = origin; this.fence = fence;
this.threshold = threshold;
this.lo = lo; this.hi = hi;
}
public final void compute() {
final BinaryOperator<T> fn;
final T[] a;
if ((fn = this.function) == null || (a = this.array) == null)
throw new NullPointerException(); // hoist checks
int th = threshold, org = origin, fnc = fence, l, h;
CumulateTask<T> t = this;
outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
if (h - l > th) {
CumulateTask<T> lt = t.left, rt = t.right, f;
if (lt == null) { // first pass
int mid = (l + h) >>> 1;
f = rt = t.right =
new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
t = lt = t.left =
new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
}
else { // possibly refork
T pin = t.in;
lt.in = pin;
f = t = null;
if (rt != null) {
T lout = lt.out;
rt.in = (l == org ? lout :
fn.apply(pin, lout));
for (int c;;) {
if (((c = rt.getPendingCount()) & CUMULATE) != 0)
break;
if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
t = rt;
break;
}
}
}
for (int c;;) {
if (((c = lt.getPendingCount()) & CUMULATE) != 0)
break;
if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
if (t != null)
f = t;
t = lt;
break;
}
}
if (t == null)
break;
}
if (f != null)
f.fork();
}
else {
int state; // Transition to sum, cumulate, or both
for (int b;;) {
if (((b = t.getPendingCount()) & FINISHED) != 0)
break outer; // already done
state = ((b & CUMULATE) != 0? FINISHED :
(l > org) ? SUMMED : (SUMMED|FINISHED));
if (t.compareAndSetPendingCount(b, b|state))
break;
}
T sum;
if (state != SUMMED) {
int first;
if (l == org) { // leftmost; no in
sum = a[org];
first = org + 1;
}
else {
sum = t.in;
first = l;
}
for (int i = first; i < h; ++i) // cumulate
a[i] = sum = fn.apply(sum, a[i]);
}
else if (h < fnc) { // skip rightmost
sum = a[l];
for (int i = l + 1; i < h; ++i) // sum only
sum = fn.apply(sum, a[i]);
}
else
sum = t.in;
t.out = sum;
for (CumulateTask<T> par;;) { // propagate
if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
if ((state & FINISHED) != 0) // enable join
t.quietlyComplete();
break outer;
}
int b = par.getPendingCount();
if ((b & state & FINISHED) != 0)
t = par; // both done
else if ((b & state & SUMMED) != 0) { // both summed
int nextState; CumulateTask<T> lt, rt;
if ((lt = par.left) != null &&
(rt = par.right) != null) {
T lout = lt.out;
par.out = (rt.hi == fnc ? lout :
fn.apply(lout, rt.out));
}
int refork = (((b & CUMULATE) == 0 &&
par.lo == org) ? CUMULATE : 0);
if ((nextState = b|state|refork) == b ||
par.compareAndSetPendingCount(b, nextState)) {
state = SUMMED; // drop finished
t = par;
if (refork != 0)
par.fork();
}
}
else if (par.compareAndSetPendingCount(b, b|state))
break outer; // sib not ready
}
}
}
}
}
static final class LongCumulateTask extends CountedCompleter<Void> {
final long[] array;
final LongBinaryOperator function;
LongCumulateTask left, right;
long in, out;
final int lo, hi, origin, fence, threshold;
/** Root task constructor */
public LongCumulateTask(LongCumulateTask parent,
LongBinaryOperator function,
long[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/** Subtask constructor */
LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
long[] array, int origin, int fence, int threshold,
int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.origin = origin; this.fence = fence;
this.threshold = threshold;
this.lo = lo; this.hi = hi;
}
public final void compute() {
final LongBinaryOperator fn;
final long[] a;
if ((fn = this.function) == null || (a = this.array) == null)
throw new NullPointerException(); // hoist checks
int th = threshold, org = origin, fnc = fence, l, h;
LongCumulateTask t = this;
outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
if (h - l > th) {
LongCumulateTask lt = t.left, rt = t.right, f;
if (lt == null) { // first pass
int mid = (l + h) >>> 1;
f = rt = t.right =
new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
t = lt = t.left =
new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
}
else { // possibly refork
long pin = t.in;
lt.in = pin;
f = t = null;
if (rt != null) {
long lout = lt.out;
rt.in = (l == org ? lout :
fn.applyAsLong(pin, lout));
for (int c;;) {
if (((c = rt.getPendingCount()) & CUMULATE) != 0)
break;
if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
t = rt;
break;
}
}
}
for (int c;;) {
if (((c = lt.getPendingCount()) & CUMULATE) != 0)
break;
if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
if (t != null)
f = t;
t = lt;
break;
}
}
if (t == null)
break;
}
if (f != null)
f.fork();
}
else {
int state; // Transition to sum, cumulate, or both
for (int b;;) {
if (((b = t.getPendingCount()) & FINISHED) != 0)
break outer; // already done
state = ((b & CUMULATE) != 0? FINISHED :
(l > org) ? SUMMED : (SUMMED|FINISHED));
if (t.compareAndSetPendingCount(b, b|state))
break;
}
long sum;
if (state != SUMMED) {
int first;
if (l == org) { // leftmost; no in
sum = a[org];
first = org + 1;
}
else {
sum = t.in;
first = l;
}
for (int i = first; i < h; ++i) // cumulate
a[i] = sum = fn.applyAsLong(sum, a[i]);
}
else if (h < fnc) { // skip rightmost
sum = a[l];
for (int i = l + 1; i < h; ++i) // sum only
sum = fn.applyAsLong(sum, a[i]);
}
else
sum = t.in;
t.out = sum;
for (LongCumulateTask par;;) { // propagate
if ((par = (LongCumulateTask)t.getCompleter()) == null) {
if ((state & FINISHED) != 0) // enable join
t.quietlyComplete();
break outer;
}
int b = par.getPendingCount();
if ((b & state & FINISHED) != 0)
t = par; // both done
else if ((b & state & SUMMED) != 0) { // both summed
int nextState; LongCumulateTask lt, rt;
if ((lt = par.left) != null &&
(rt = par.right) != null) {
long lout = lt.out;
par.out = (rt.hi == fnc ? lout :
fn.applyAsLong(lout, rt.out));
}
int refork = (((b & CUMULATE) == 0 &&
par.lo == org) ? CUMULATE : 0);
if ((nextState = b|state|refork) == b ||
par.compareAndSetPendingCount(b, nextState)) {
state = SUMMED; // drop finished
t = par;
if (refork != 0)
par.fork();
}
}
else if (par.compareAndSetPendingCount(b, b|state))
break outer; // sib not ready
}
}
}
}
}
static final class DoubleCumulateTask extends CountedCompleter<Void> {
final double[] array;
final DoubleBinaryOperator function;
DoubleCumulateTask left, right;
double in, out;
final int lo, hi, origin, fence, threshold;
/** Root task constructor */
public DoubleCumulateTask(DoubleCumulateTask parent,
DoubleBinaryOperator function,
double[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/** Subtask constructor */
DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
double[] array, int origin, int fence, int threshold,
int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.origin = origin; this.fence = fence;
this.threshold = threshold;
this.lo = lo; this.hi = hi;
}
public final void compute() {
final DoubleBinaryOperator fn;
final double[] a;
if ((fn = this.function) == null || (a = this.array) == null)
throw new NullPointerException(); // hoist checks
int th = threshold, org = origin, fnc = fence, l, h;
DoubleCumulateTask t = this;
outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
if (h - l > th) {
DoubleCumulateTask lt = t.left, rt = t.right, f;
if (lt == null) { // first pass
int mid = (l + h) >>> 1;
f = rt = t.right =
new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
t = lt = t.left =
new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
}
else { // possibly refork
double pin = t.in;
lt.in = pin;
f = t = null;
if (rt != null) {
double lout = lt.out;
rt.in = (l == org ? lout :
fn.applyAsDouble(pin, lout));
for (int c;;) {
if (((c = rt.getPendingCount()) & CUMULATE) != 0)
break;
if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
t = rt;
break;
}
}
}
for (int c;;) {
if (((c = lt.getPendingCount()) & CUMULATE) != 0)
break;
if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
if (t != null)
f = t;
t = lt;
break;
}
}
if (t == null)
break;
}
if (f != null)
f.fork();
}
else {
int state; // Transition to sum, cumulate, or both
for (int b;;) {
if (((b = t.getPendingCount()) & FINISHED) != 0)
break outer; // already done
state = ((b & CUMULATE) != 0? FINISHED :
(l > org) ? SUMMED : (SUMMED|FINISHED));
if (t.compareAndSetPendingCount(b, b|state))
break;
}
double sum;
if (state != SUMMED) {
int first;
if (l == org) { // leftmost; no in
sum = a[org];
first = org + 1;
}
else {
sum = t.in;
first = l;
}
for (int i = first; i < h; ++i) // cumulate
a[i] = sum = fn.applyAsDouble(sum, a[i]);
}
else if (h < fnc) { // skip rightmost
sum = a[l];
for (int i = l + 1; i < h; ++i) // sum only
sum = fn.applyAsDouble(sum, a[i]);
}
else
sum = t.in;
t.out = sum;
for (DoubleCumulateTask par;;) { // propagate
if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
if ((state & FINISHED) != 0) // enable join
t.quietlyComplete();
break outer;
}
int b = par.getPendingCount();
if ((b & state & FINISHED) != 0)
t = par; // both done
else if ((b & state & SUMMED) != 0) { // both summed
int nextState; DoubleCumulateTask lt, rt;
if ((lt = par.left) != null &&
(rt = par.right) != null) {
double lout = lt.out;
par.out = (rt.hi == fnc ? lout :
fn.applyAsDouble(lout, rt.out));
}
int refork = (((b & CUMULATE) == 0 &&
par.lo == org) ? CUMULATE : 0);
if ((nextState = b|state|refork) == b ||
par.compareAndSetPendingCount(b, nextState)) {
state = SUMMED; // drop finished
t = par;
if (refork != 0)
par.fork();
}
}
else if (par.compareAndSetPendingCount(b, b|state))
break outer; // sib not ready
}
}
}
}
}
static final class IntCumulateTask extends CountedCompleter<Void> {
final int[] array;
final IntBinaryOperator function;
IntCumulateTask left, right;
int in, out;
final int lo, hi, origin, fence, threshold;
/** Root task constructor */
public IntCumulateTask(IntCumulateTask parent,
IntBinaryOperator function,
int[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/** Subtask constructor */
IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
int[] array, int origin, int fence, int threshold,
int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.origin = origin; this.fence = fence;
this.threshold = threshold;
this.lo = lo; this.hi = hi;
}
public final void compute() {
final IntBinaryOperator fn;
final int[] a;
if ((fn = this.function) == null || (a = this.array) == null)
throw new NullPointerException(); // hoist checks
int th = threshold, org = origin, fnc = fence, l, h;
IntCumulateTask t = this;
outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
if (h - l > th) {
IntCumulateTask lt = t.left, rt = t.right, f;
if (lt == null) { // first pass
int mid = (l + h) >>> 1;
f = rt = t.right =
new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
t = lt = t.left =
new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
}
else { // possibly refork
int pin = t.in;
lt.in = pin;
f = t = null;
if (rt != null) {
int lout = lt.out;
rt.in = (l == org ? lout :
fn.applyAsInt(pin, lout));
for (int c;;) {
if (((c = rt.getPendingCount()) & CUMULATE) != 0)
break;
if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
t = rt;
break;
}
}
}
for (int c;;) {
if (((c = lt.getPendingCount()) & CUMULATE) != 0)
break;
if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
if (t != null)
f = t;
t = lt;
break;
}
}
if (t == null)
break;
}
if (f != null)
f.fork();
}
else {
int state; // Transition to sum, cumulate, or both
for (int b;;) {
if (((b = t.getPendingCount()) & FINISHED) != 0)
break outer; // already done
state = ((b & CUMULATE) != 0? FINISHED :
(l > org) ? SUMMED : (SUMMED|FINISHED));
if (t.compareAndSetPendingCount(b, b|state))
break;
}
int sum;
if (state != SUMMED) {
int first;
if (l == org) { // leftmost; no in
sum = a[org];
first = org + 1;
}
else {
sum = t.in;
first = l;
}
for (int i = first; i < h; ++i) // cumulate
a[i] = sum = fn.applyAsInt(sum, a[i]);
}
else if (h < fnc) { // skip rightmost
sum = a[l];
for (int i = l + 1; i < h; ++i) // sum only
sum = fn.applyAsInt(sum, a[i]);
}
else
sum = t.in;
t.out = sum;
for (IntCumulateTask par;;) { // propagate
if ((par = (IntCumulateTask)t.getCompleter()) == null) {
if ((state & FINISHED) != 0) // enable join
t.quietlyComplete();
break outer;
}
int b = par.getPendingCount();
if ((b & state & FINISHED) != 0)
t = par; // both done
else if ((b & state & SUMMED) != 0) { // both summed
int nextState; IntCumulateTask lt, rt;
if ((lt = par.left) != null &&
(rt = par.right) != null) {
int lout = lt.out;
par.out = (rt.hi == fnc ? lout :
fn.applyAsInt(lout, rt.out));
}
int refork = (((b & CUMULATE) == 0 &&
par.lo == org) ? CUMULATE : 0);
if ((nextState = b|state|refork) == b ||
par.compareAndSetPendingCount(b, nextState)) {
state = SUMMED; // drop finished
t = par;
if (refork != 0)
par.fork();
}
}
else if (par.compareAndSetPendingCount(b, b|state))
break outer; // sib not ready
}
}
}
}
}
}
\ No newline at end of file
......@@ -1559,6 +1559,183 @@ public class Arrays {
}
}
// Parallel prefix
/**
* Cumulates, in parallel, each element of the given array in place,
* using the supplied function. For example if the array initially
* holds {@code [2, 1, 0, 3]} and the operation performs addition,
* then upon return the array holds {@code [2, 3, 3, 6]}.
* Parallel prefix computation is usually more efficient than
* sequential loops for large arrays.
*
* @param array the array, which is modified in-place by this method
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static <T> void parallelPrefix(T[] array, BinaryOperator<T> op) {
if (array.length > 0)
new ArrayPrefixHelpers.CumulateTask<>
(null, op, array, 0, array.length).invoke();
}
/**
* Performs {@link #parallelPrefix(Object[], BinaryOperator)}
* for the given subrange of the array.
*
* @param array the array
* @param fromIndex the index of the first element, inclusive
* @param toIndex the index of the last element, exclusive
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws IllegalArgumentException if {@code fromIndex > toIndex}
* @throws ArrayIndexOutOfBoundsException
* if {@code fromIndex < 0} or {@code toIndex > array.length}
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static <T> void parallelPrefix(T[] array, int fromIndex,
int toIndex, BinaryOperator<T> op) {
rangeCheck(array.length, fromIndex, toIndex);
if (fromIndex < toIndex)
new ArrayPrefixHelpers.CumulateTask<>
(null, op, array, fromIndex, toIndex).invoke();
}
/**
* Cumulates, in parallel, each element of the given array in place,
* using the supplied function. For example if the array initially
* holds {@code [2, 1, 0, 3]} and the operation performs addition,
* then upon return the array holds {@code [2, 3, 3, 6]}.
* Parallel prefix computation is usually more efficient than
* sequential loops for large arrays.
*
* @param array the array, which is modified in-place by this method
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(long[] array, LongBinaryOperator op) {
if (array.length > 0)
new ArrayPrefixHelpers.LongCumulateTask
(null, op, array, 0, array.length).invoke();
}
/**
* Performs {@link #parallelPrefix(long[], LongBinaryOperator)}
* for the given subrange of the array.
*
* @param array the array
* @param fromIndex the index of the first element, inclusive
* @param toIndex the index of the last element, exclusive
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws IllegalArgumentException if {@code fromIndex > toIndex}
* @throws ArrayIndexOutOfBoundsException
* if {@code fromIndex < 0} or {@code toIndex > array.length}
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(long[] array, int fromIndex,
int toIndex, LongBinaryOperator op) {
rangeCheck(array.length, fromIndex, toIndex);
if (fromIndex < toIndex)
new ArrayPrefixHelpers.LongCumulateTask
(null, op, array, fromIndex, toIndex).invoke();
}
/**
* Cumulates, in parallel, each element of the given array in place,
* using the supplied function. For example if the array initially
* holds {@code [2.0, 1.0, 0.0, 3.0]} and the operation performs addition,
* then upon return the array holds {@code [2.0, 3.0, 3.0, 6.0]}.
* Parallel prefix computation is usually more efficient than
* sequential loops for large arrays.
*
* <p> Because floating-point operations may not be strictly associative,
* the returned result may not be identical to the value that would be
* obtained if the operation was performed sequentially.
*
* @param array the array, which is modified in-place by this method
* @param op a side-effect-free function to perform the cumulation
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(double[] array, DoubleBinaryOperator op) {
if (array.length > 0)
new ArrayPrefixHelpers.DoubleCumulateTask
(null, op, array, 0, array.length).invoke();
}
/**
* Performs {@link #parallelPrefix(double[], DoubleBinaryOperator)}
* for the given subrange of the array.
*
* @param array the array
* @param fromIndex the index of the first element, inclusive
* @param toIndex the index of the last element, exclusive
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws IllegalArgumentException if {@code fromIndex > toIndex}
* @throws ArrayIndexOutOfBoundsException
* if {@code fromIndex < 0} or {@code toIndex > array.length}
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(double[] array, int fromIndex,
int toIndex, DoubleBinaryOperator op) {
rangeCheck(array.length, fromIndex, toIndex);
if (fromIndex < toIndex)
new ArrayPrefixHelpers.DoubleCumulateTask
(null, op, array, fromIndex, toIndex).invoke();
}
/**
* Cumulates, in parallel, each element of the given array in place,
* using the supplied function. For example if the array initially
* holds {@code [2, 1, 0, 3]} and the operation performs addition,
* then upon return the array holds {@code [2, 3, 3, 6]}.
* Parallel prefix computation is usually more efficient than
* sequential loops for large arrays.
*
* @param array the array, which is modified in-place by this method
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(int[] array, IntBinaryOperator op) {
if (array.length > 0)
new ArrayPrefixHelpers.IntCumulateTask
(null, op, array, 0, array.length).invoke();
}
/**
* Performs {@link #parallelPrefix(int[], IntBinaryOperator)}
* for the given subrange of the array.
*
* @param array the array
* @param fromIndex the index of the first element, inclusive
* @param toIndex the index of the last element, exclusive
* @param op a side-effect-free, associative function to perform the
* cumulation
* @throws IllegalArgumentException if {@code fromIndex > toIndex}
* @throws ArrayIndexOutOfBoundsException
* if {@code fromIndex < 0} or {@code toIndex > array.length}
* @throws NullPointerException if the specified array or function is null
* @since 1.8
*/
public static void parallelPrefix(int[] array, int fromIndex,
int toIndex, IntBinaryOperator op) {
rangeCheck(array.length, fromIndex, toIndex);
if (fromIndex < toIndex)
new ArrayPrefixHelpers.IntCumulateTask
(null, op, array, fromIndex, toIndex).invoke();
}
// Searching
/**
......
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* @test
* @summary unit test for Arrays.ParallelPrefix().
* @author Tristan Yan
* @run testng ParallelPrefix
*/
import java.util.Arrays;
import java.util.function.BinaryOperator;
import java.util.function.DoubleBinaryOperator;
import java.util.function.Function;
import java.util.function.IntBinaryOperator;
import java.util.function.LongBinaryOperator;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static org.testng.Assert.*;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class ParallelPrefix {
//Array size less than MIN_PARTITION
private final static int SMALL_ARRAY_SIZE = 1 << 3;
//Array size equals MIN_PARTITION
private final static int THRESHOLD_ARRAY_SIZE = 1 << 4;
//Array size greater than MIN_PARTITION
private final static int MEDIUM_ARRAY_SIZE = 1 << 8;
//Array size much greater than MIN_PARTITION
private final static int LARGE_ARRAY_SIZE = 1 << 12;
private final static int[] ARRAY_SIZE_COLLECTION = new int[]{
SMALL_ARRAY_SIZE, THRESHOLD_ARRAY_SIZE,MEDIUM_ARRAY_SIZE, LARGE_ARRAY_SIZE};
@DataProvider
public static Object[][] intSet(){
return genericData(size -> IntStream.range(0, size).toArray(), new IntBinaryOperator[]{Integer::sum, Integer::min});
}
@DataProvider
public static Object[][] longSet(){
return genericData(size -> LongStream.range(0, size).toArray(), new LongBinaryOperator[]{Long::sum, Long::min});
}
@DataProvider
public static Object[][] doubleSet(){
return genericData(size -> IntStream.range(0, size).mapToDouble(i -> (double)i).toArray(),
new DoubleBinaryOperator[]{Double::sum, Double::min});
}
@DataProvider
public static Object[][] stringSet(){
Function<Integer, String[]> stringsFunc = size ->
IntStream.range(0, size).mapToObj(Integer::toString).toArray(String[]::new);
BinaryOperator<String> cancatBop = String::concat;
return genericData(stringsFunc, new BinaryOperator[]{cancatBop});
}
private static <T, OPS> Object[][] genericData(Function<Integer, T> generateFunc, OPS[] ops) {
//test arrays which size is equals n-1, n, n+1, test random data
Object[][] data = new Object[ARRAY_SIZE_COLLECTION.length * 3 * ops.length][4];
for(int n = 0; n < ARRAY_SIZE_COLLECTION.length; n++ ) {
for(int testValue = -1 ; testValue <= 1; testValue++) {
int array_size = ARRAY_SIZE_COLLECTION[n] + testValue;
for(int opsN = 0; opsN < ops.length; opsN++) {
int index = n * 3 * ops.length + (testValue + 1) * ops.length + opsN;
data[index][0] = generateFunc.apply(array_size);
data[index][1] = array_size / 3;
data[index][2] = 2 * array_size / 3;
data[index][3] = ops[opsN];
}
}
}
return data;
}
@Test(dataProvider="intSet")
public void testParallelPrefixForInt(int[] data, int fromIndex, int toIndex, IntBinaryOperator op) {
int[] sequentialResult = data.clone();
for (int index = fromIndex + 1; index < toIndex; index++) {
sequentialResult[index ] = op.applyAsInt(sequentialResult[index - 1], sequentialResult[index]);
}
int[] parallelResult = data.clone();
Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
assertEquals(parallelResult, sequentialResult);
int[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
Arrays.parallelPrefix(parallelRangeResult, op);
assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
}
@Test(dataProvider="longSet")
public void testParallelPrefixForLong(long[] data, int fromIndex, int toIndex, LongBinaryOperator op) {
long[] sequentialResult = data.clone();
for (int index = fromIndex + 1; index < toIndex; index++) {
sequentialResult[index ] = op.applyAsLong(sequentialResult[index - 1], sequentialResult[index]);
}
long[] parallelResult = data.clone();
Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
assertEquals(parallelResult, sequentialResult);
long[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
Arrays.parallelPrefix(parallelRangeResult, op);
assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
}
@Test(dataProvider="doubleSet")
public void testParallelPrefixForDouble(double[] data, int fromIndex, int toIndex, DoubleBinaryOperator op) {
double[] sequentialResult = data.clone();
for (int index = fromIndex + 1; index < toIndex; index++) {
sequentialResult[index ] = op.applyAsDouble(sequentialResult[index - 1], sequentialResult[index]);
}
double[] parallelResult = data.clone();
Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
assertEquals(parallelResult, sequentialResult);
double[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
Arrays.parallelPrefix(parallelRangeResult, op);
assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
}
@Test(dataProvider="stringSet")
public void testParallelPrefixForStringr(String[] data , int fromIndex, int toIndex, BinaryOperator<String> op) {
String[] sequentialResult = data.clone();
for (int index = fromIndex + 1; index < toIndex; index++) {
sequentialResult[index ] = op.apply(sequentialResult[index - 1], sequentialResult[index]);
}
String[] parallelResult = data.clone();
Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
assertEquals(parallelResult, sequentialResult);
String[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
Arrays.parallelPrefix(parallelRangeResult, op);
assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册